AnyEvent-Worker
view release on metacpan or search on metacpan
lib/AnyEvent/Worker.pm view on Meta::CPAN
or croak "unable to create Anyevent::Worker communications pipe: $!";
binmode $client, ':raw';
binmode $server, ':raw';
my $self = bless \%arg, $class;
$self->{fh} = $client;
AnyEvent::Util::fh_nonblocking $client, 1;
my $rbuf;
my @caller = (caller)[1,2]; # the "default" caller
{
Scalar::Util::weaken (my $self = $self);
$self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub {
return unless $self;
$self->{last_activity} = AnyEvent->now;
my $len = sysread $client, $rbuf, 65536, length $rbuf;
lib/AnyEvent/Worker.pm view on Meta::CPAN
=item $worker->do ( @args, $cb->( $worker, @response ) )
Executes worker code and execure the callback, when response is ready
=cut
sub do {
my $self = shift;
my $cb = pop;
my ($filename,$line) = (caller)[1,2];
unless ($self->{fh}) {
local $@ = my $err = 'no worker connection';
$cb->($self);
$self->_error ($err, $filename, $line, 1);
return;
}
push @{ $self->{queue} }, [$cb, $filename, $line];
lib/AnyEvent/Worker/Pool.pm view on Meta::CPAN
$worker->do(@args, sub {
$self->ret_worker($worker);
goto &$cb;
});
});
return;
}
sub take_worker {
my $self = shift;
my $cb = shift or die "cb required for take_worker at @{[(caller)[1,2]]}\n";
#warn("take wrk, left ".$#{$self->{pool}}." for @{[(caller)[1,2]]}\n");
if (@{$self->{pool}}) {
$cb->(shift @{$self->{pool}});
} else {
#warn("no worker for @{[(caller 1)[1,2]]}, maybe increase pool?");
push @{$self->{waiting_db}},$cb
}
}
sub ret_worker {
my $self = shift;
#warn("ret wrk, got ".@{$self->{pool}}.'+'.@_." for @{[(caller)[1,2]]}\n");
push @{ $self->{pool} }, @_;
$self->take_worker(shift @{ $self->{waiting_db} }) if @{ $self->{waiting_db} };
}
=head1 AUTHOR
Mons Anderson, C<< <mons@cpan.org> >>
=head1 COPYRIGHT & LICENSE
( run in 1.202 second using v1.01-cache-2.11-cpan-1e74a51a04c )