Coro
view release on metacpan or search on metacpan
Coro/Util.pm view on Meta::CPAN
package Coro::Util;
use common::sense;
use Socket ();
use AnyEvent ();
use AnyEvent::Socket ();
use Coro::State;
use Coro::Handle;
use Coro::Storable ();
use Coro::AnyEvent ();
use Coro::Semaphore;
use base 'Exporter';
our @EXPORT = qw(gethostbyname gethostbyaddr);
our @EXPORT_OK = qw(inet_aton fork_eval);
our $VERSION = 6.514;
our $MAXPARALLEL = 16; # max. number of parallel jobs
my $jobs = new Coro::Semaphore $MAXPARALLEL;
sub _do_asy(&;@) {
my $sub = shift;
$jobs->down;
my $fh;
my $pid = open $fh, "-|";
if (!defined $pid) {
die "fork: $!";
} elsif (!$pid) {
syswrite STDOUT, join "\0", map { unpack "H*", $_ } &$sub;
Coro::Util::_exit 0;
}
my $buf;
my $wakeup = Coro::rouse_cb;
my $w; $w = AE::io $fh, 0, sub {
sysread $fh, $buf, 16384, length $buf
and return;
undef $w;
$wakeup->();
};
Coro::rouse_wait;
$jobs->up;
my @r = map { pack "H*", $_ } split /\0/, $buf;
wantarray ? @r : $r[0];
}
=item $ipn = Coro::Util::inet_aton $hostname || $ip
Works almost exactly like its C<Socket::inet_aton> counterpart, except
that it does not block other coroutines.
Does not handle multihomed hosts or IPv6 - consider using
C<AnyEvent::Socket::resolve_sockaddr> with the L<Coro> rouse functions
instead.
=cut
sub inet_aton {
AnyEvent::Socket::inet_aton $_[0], Coro::rouse_cb;
(grep length == 4, Coro::rouse_wait)[0]
}
=item gethostbyname, gethostbyaddr
Work similarly to their Perl counterparts, but do not block. Uses
C<AnyEvent::Util::inet_aton> internally.
Does not handle multihomed hosts or IPv6 - consider using
C<AnyEvent::Socket::resolve_sockaddr> or C<AnyEvent::DNS::reverse_lookup>
with the L<Coro> rouse functions instead.
=cut
sub gethostbyname($) {
AnyEvent::Socket::inet_aton $_[0], Coro::rouse_cb;
($_[0], $_[0], &Socket::AF_INET, 4, map +(AnyEvent::Socket::format_address $_), grep length == 4, Coro::rouse_wait)
}
sub gethostbyaddr($$) {
_do_asy { gethostbyaddr $_[0], $_[1] } @_
}
=item @result = Coro::Util::fork_eval { ... }, @args
Executes the given code block or code reference with the given arguments
in a separate process, returning the results. The return values must be
serialisable with Coro::Storable. It may, of course, block.
Note that using event handling in the sub is not usually a good idea as
you will inherit a mixed set of watchers from the parent.
Exceptions will be correctly forwarded to the caller.
This function is useful for pushing cpu-intensive computations into a
different process, for example to take advantage of multiple CPU's. Its
also useful if you want to simply run some blocking functions (such as
C<system()>) and do not care about the overhead enough to code your own
pid watcher etc.
This function might keep a pool of processes in some future version, as
fork can be rather slow in large processes.
You should also look at C<AnyEvent::Util::fork_eval>, which is newer and
more compatible to totally broken Perl implementations such as the one
from ActiveState.
Example: execute some external program (convert image to rgba raw form)
and add a long computation (extract the alpha channel) in a separate
process, making sure that never more then $NUMCPUS processes are being
( run in 0.717 second using v1.01-cache-2.11-cpan-98e64b0badf )