AnyEvent-Worker
view release on metacpan or search on metacpan
lib/AnyEvent/Worker.pm view on Meta::CPAN
while () {
my $len = unpack "L", $rbuf;
# full request available?
last unless $len && $len + 4 <= length $rbuf;
my $req = Storable::thaw substr $rbuf, 4;
substr $rbuf, 0, $len + 4, ""; # remove length + request
local $@;
my $wbuf = eval {
++$N;
if (ref $WORKER eq 'CODE') {
local $0 = "$O : request $N";
pack "L/a*", Storable::freeze [ 1, $WORKER->(@$req) ];
} else {
my $method = shift @$req;
#warn ">> request $method";
local $0 = "$O : request $N : $method";
pack "L/a*", Storable::freeze [ 1, $WORKER->$method(@$req) ];
}
};
# warn if $@;
$0 = "$O : idle";
$wbuf = pack "L/a*", Storable::freeze [ undef, ref $@ ? $@ : "$@" ]
if $@;
#warn "<< response";
for (my $ofs = 0; $ofs < length $wbuf; ) {
my $wr = syswrite $fh, $wbuf, length($wbuf), $ofs;
defined $wr or $!{EINTR} or die "unable to write results: $!";
$ofs += $wr;
}
}
}
};
warn if $@;
}
sub serve_fd($$) {
open my $fh, ">>&=$_[0]"
or die "Couldn't open server file descriptor: $!";
serve_fh $fh, $_[1];
}
# stupid Storable autoloading, total loss-loss situation
Storable::thaw Storable::freeze [];
=head1 METHODS
=over 4
=cut
sub new {
my ($class, $cb, %arg) = @_;
my ($client, $server) = AnyEvent::Util::portable_socketpair
or croak "unable to create Anyevent::Worker communications pipe: $!";
binmode $client, ':raw';
binmode $server, ':raw';
my $self = bless \%arg, $class;
$self->{fh} = $client;
AnyEvent::Util::fh_nonblocking $client, 1;
my $rbuf;
my @caller = (caller)[1,2]; # the "default" caller
{
Scalar::Util::weaken (my $self = $self);
$self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub {
return unless $self;
$self->{last_activity} = AnyEvent->now;
my $len = sysread $client, $rbuf, 65536, length $rbuf;
if ($len > 0) {
# we received data, so reset the timer
while () {
my $len = unpack "L", $rbuf;
# full response available?
last unless $len && $len + 4 <= length $rbuf;
my $res = Storable::thaw substr $rbuf, 4;
substr $rbuf, 0, $len + 4, ""; # remove length + request
last unless $self;
my $req = shift @{ $self->{queue} };
if (defined $res->[0]) {
$res->[0] = $self;
$req->[0](@$res);
} else {
my $cb = shift @$req;
{
local $@ = $res->[1];
$@ =~ s{\n$}{};
$cb->($self);
}
}
# no more queued requests, so become idle
undef $self->{last_activity}
if $self && !@{ $self->{queue} };
}
}
elsif (defined $len) {
# todo, caller?
$self->_error ("unexpected eof", @caller, 1);
}
elsif ($! != Errno::EAGAIN) {
# todo, caller?
$self->_error ("read error ".(0+$!).": $!", @caller, 1);
}
});
( run in 2.538 seconds using v1.01-cache-2.11-cpan-cdf2f3d4e48 )