AnyEvent-Worker
view release on metacpan or search on metacpan
ex/worker-pool.pl view on Meta::CPAN
my $cv = AE::cv;
my $j1;$j1 = sub {
my $id = shift;
$cv->begin;
$pool->do( test => "Test:$id" , sub {
#my $g = AnyEvent::Util::guard { $j1->(); $cv->end; };
return warn "Request died: $@\n" if $@;
warn "Received response: @_\n";
my $t;$t = AnyEvent->timer(after => 1, cb => sub {
undef $t;
#undef $g;
$cv->end;
$j1->();
});
});
};
$j1->($_) for 1..5;
ex/worker.pl view on Meta::CPAN
} );
my $cv = AE::cv;
my $j1;$j1 = sub {
$cv->begin;
$worker1->do( test => "P:Data" , sub {
#guard { $j1->(); $cv->end; };
return warn "Request died: $@\n" if $@;
warn "Received response: @_\n";
my $t;$t = AnyEvent->timer(after => 1, cb => sub {
undef $t;
$j1->();
$cv->end;
});
});
};
$j1->();
{
$cv->begin;
lib/AnyEvent/Worker.pm view on Meta::CPAN
Scalar::Util::weaken (my $self = $self);
$self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub {
return unless $self;
$self->{last_activity} = AnyEvent->now;
my $len = sysread $client, $rbuf, 65536, length $rbuf;
if ($len > 0) {
# we received data, so reset the timer
while () {
my $len = unpack "L", $rbuf;
# full response available?
last unless $len && $len + 4 <= length $rbuf;
my $res = Storable::thaw substr $rbuf, 4;
substr $rbuf, 0, $len + 4, ""; # remove length + request
last unless $self;
lib/AnyEvent/Worker.pm view on Meta::CPAN
});
$self->{tw_cb} = sub {
if ($self->{timeout} && $self->{last_activity}) {
if (AnyEvent->now > $self->{last_activity} + $self->{timeout}) {
# we did time out
my $req = $self->{queue}[0];
$self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal
} else {
# we need to re-set the timeout watcher
$self->{tw} = AnyEvent->timer (
after => $self->{last_activity} + $self->{timeout} - AnyEvent->now,
cb => $self->{tw_cb},
);
Scalar::Util::weaken $self;
}
} else {
# no timeout check wanted, or idle
undef $self->{tw};
}
};
lib/AnyEvent/Worker.pm view on Meta::CPAN
local $SIG{__WARN__} = sub { $GD = 1 if $_[0] =~ / during global destruction\.\s*$/ };
warn 'test';
}
#print STDERR "killing $child_pid / $GD\n";
if ($child_pid) {
# send SIGKILL in two seconds
$TERM{$child_pid}++;
kill 0 => $child_pid and
kill TERM => $child_pid or $!{ESRCH} or warn "kill $child_pid: $!";
return if $GD;
# MAYBE: kill timer
#my $murder_timer = AnyEvent->timer (
# after => 2,
# cb => sub {
# kill 9, $child_pid
# and delete $TERM{$child_pid};
# },
#);
# reap process
#print STDERR "start reaper $child_pid\n";
$KIDW{$child_pid} = AnyEvent->child (
pid => $child_pid,
cb => sub {
# just hold on to this so it won't go away
#print STDERR "reaped $child_pid\n";
delete $TERM{$child_pid};
delete $KIDW{$child_pid};
# cancel SIGKILL
#undef $murder_timer;
},
);
close $self->{fh};
}
}
sub END {
my $GD = 0;
{
local $SIG{__WARN__} = sub { $GD = 1 if $_[0] =~ / during global destruction\.\s*$/ };
lib/AnyEvent/Worker.pm view on Meta::CPAN
Sets (or clears, with C<undef>) the database timeout. Useful to extend the
timeout when you are about to make a really long query.
=cut
sub timeout {
my ($self, $timeout) = @_;
$self->{timeout} = $timeout;
# reschedule timer if one was running
$self->{tw_cb}->();
}
=item $worker->do ( @args, $cb->( $worker, @response ) )
Executes worker code and execure the callback, when response is ready
=cut
sub do {
( run in 1.322 second using v1.01-cache-2.11-cpan-49f99fa48dc )