Coro

 view release on metacpan or  search on metacpan

Coro/Util.pm  view on Meta::CPAN

package Coro::Util;

use common::sense;

use Socket ();

use AnyEvent ();
use AnyEvent::Socket ();

use Coro::State;
use Coro::Handle;
use Coro::Storable ();
use Coro::AnyEvent ();
use Coro::Semaphore;

use base 'Exporter';

our @EXPORT = qw(gethostbyname gethostbyaddr);
our @EXPORT_OK = qw(inet_aton fork_eval);

our $VERSION = 6.514;

our $MAXPARALLEL = 16; # max. number of parallel jobs

my $jobs = new Coro::Semaphore $MAXPARALLEL;

sub _do_asy(&;@) {
   my $sub = shift;
   $jobs->down;
   my $fh;

   my $pid = open $fh, "-|";

   if (!defined $pid) {
      die "fork: $!";
   } elsif (!$pid) {
      syswrite STDOUT, join "\0", map { unpack "H*", $_ } &$sub;
      Coro::Util::_exit 0;
   }

   my $buf;
   my $wakeup = Coro::rouse_cb;
   my $w; $w = AE::io $fh, 0, sub {
      sysread $fh, $buf, 16384, length $buf
         and return;

      undef $w;
      $wakeup->();
   };

   Coro::rouse_wait;

   $jobs->up;
   my @r = map { pack "H*", $_ } split /\0/, $buf;
   wantarray ? @r : $r[0];
}

=item $ipn = Coro::Util::inet_aton $hostname || $ip

Works almost exactly like its C<Socket::inet_aton> counterpart, except
that it does not block other coroutines.

Does not handle multihomed hosts or IPv6 - consider using
C<AnyEvent::Socket::resolve_sockaddr> with the L<Coro> rouse functions
instead.

=cut

sub inet_aton {
   AnyEvent::Socket::inet_aton $_[0], Coro::rouse_cb;
   (grep length == 4, Coro::rouse_wait)[0]
}

=item gethostbyname, gethostbyaddr

Work similarly to their Perl counterparts, but do not block. Uses
C<AnyEvent::Util::inet_aton> internally.

Does not handle multihomed hosts or IPv6 - consider using
C<AnyEvent::Socket::resolve_sockaddr> or C<AnyEvent::DNS::reverse_lookup>
with the L<Coro> rouse functions instead.

=cut

sub gethostbyname($) {
   AnyEvent::Socket::inet_aton $_[0], Coro::rouse_cb;

   ($_[0], $_[0], &Socket::AF_INET, 4, map +(AnyEvent::Socket::format_address $_), grep length == 4, Coro::rouse_wait)
}

sub gethostbyaddr($$) {
   _do_asy { gethostbyaddr $_[0], $_[1] } @_
}

=item @result = Coro::Util::fork_eval { ... }, @args

Executes the given code block or code reference with the given arguments
in a separate process, returning the results. The return values must be
serialisable with Coro::Storable. It may, of course, block.

Note that using event handling in the sub is not usually a good idea as
you will inherit a mixed set of watchers from the parent.

Exceptions will be correctly forwarded to the caller.

This function is useful for pushing cpu-intensive computations into a
different process, for example to take advantage of multiple CPU's. Its
also useful if you want to simply run some blocking functions (such as
C<system()>) and do not care about the overhead enough to code your own
pid watcher etc.

This function might keep a pool of processes in some future version, as
fork can be rather slow in large processes.

You should also look at C<AnyEvent::Util::fork_eval>, which is newer and
more compatible to totally broken Perl implementations such as the one
from ActiveState.

Example: execute some external program (convert image to rgba raw form)
and add a long computation (extract the alpha channel) in a separate
process, making sure that never more then $NUMCPUS processes are being



( run in 0.717 second using v1.01-cache-2.11-cpan-98e64b0badf )