AnyEvent-Worker
view release on metacpan or search on metacpan
lib/AnyEvent/Worker.pm view on Meta::CPAN
# warn if $@;
$0 = "$O : idle";
$wbuf = pack "L/a*", Storable::freeze [ undef, ref $@ ? $@ : "$@" ]
if $@;
#warn "<< response";
for (my $ofs = 0; $ofs < length $wbuf; ) {
my $wr = syswrite $fh, $wbuf, length($wbuf), $ofs;
defined $wr or $!{EINTR} or die "unable to write results: $!";
$ofs += $wr;
}
}
}
};
warn if $@;
}
sub serve_fd($$) {
open my $fh, ">>&=$_[0]"
or die "Couldn't open server file descriptor: $!";
serve_fh $fh, $_[1];
}
# stupid Storable autoloading, total loss-loss situation
Storable::thaw Storable::freeze [];
=head1 METHODS
=over 4
=cut
sub new {
my ($class, $cb, %arg) = @_;
my ($client, $server) = AnyEvent::Util::portable_socketpair
or croak "unable to create Anyevent::Worker communications pipe: $!";
binmode $client, ':raw';
binmode $server, ':raw';
my $self = bless \%arg, $class;
$self->{fh} = $client;
AnyEvent::Util::fh_nonblocking $client, 1;
my $rbuf;
my @caller = (caller)[1,2]; # the "default" caller
{
Scalar::Util::weaken (my $self = $self);
$self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub {
return unless $self;
$self->{last_activity} = AnyEvent->now;
my $len = sysread $client, $rbuf, 65536, length $rbuf;
if ($len > 0) {
# we received data, so reset the timer
while () {
my $len = unpack "L", $rbuf;
# full response available?
last unless $len && $len + 4 <= length $rbuf;
my $res = Storable::thaw substr $rbuf, 4;
substr $rbuf, 0, $len + 4, ""; # remove length + request
last unless $self;
my $req = shift @{ $self->{queue} };
if (defined $res->[0]) {
$res->[0] = $self;
$req->[0](@$res);
} else {
my $cb = shift @$req;
{
local $@ = $res->[1];
$@ =~ s{\n$}{};
$cb->($self);
}
}
# no more queued requests, so become idle
undef $self->{last_activity}
if $self && !@{ $self->{queue} };
}
}
elsif (defined $len) {
# todo, caller?
$self->_error ("unexpected eof", @caller, 1);
}
elsif ($! != Errno::EAGAIN) {
# todo, caller?
$self->_error ("read error ".(0+$!).": $!", @caller, 1);
}
});
$self->{tw_cb} = sub {
if ($self->{timeout} && $self->{last_activity}) {
if (AnyEvent->now > $self->{last_activity} + $self->{timeout}) {
# we did time out
my $req = $self->{queue}[0];
$self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal
} else {
# we need to re-set the timeout watcher
$self->{tw} = AnyEvent->timer (
after => $self->{last_activity} + $self->{timeout} - AnyEvent->now,
cb => $self->{tw_cb},
);
Scalar::Util::weaken $self;
}
} else {
# no timeout check wanted, or idle
undef $self->{tw};
}
};
$self->{ww_cb} = sub {
return unless $self;
$self->{last_activity} = AnyEvent->now;
my $len = syswrite $client, $self->{wbuf}
or return delete $self->{ww};
substr $self->{wbuf}, 0, $len, "";
};
}
my $pid = fork;
if ($pid) {
# parent
close $server;
}
elsif (defined $pid) {
# child
$SIG{INT} = 'IGNORE';
my $serv_fno = fileno $server;
($_ != $serv_fno) && POSIX::close $_ for $^F+1..$FD_MAX;
if (ref $cb eq 'CODE'){
$WORKER = $cb;
}
elsif ( ref $cb eq 'ARRAY') {
my ( $class,@args ) = @$cb;
eval qq{ use $class; 1 } or croak($@) unless $class->can('new');
$WORKER = $class->new(@args);
}
elsif ( ref $cb eq 'HASH') {
my $class = $cb->{class} or croak "You should define class to construct";
my $new = $cb->{new} || 'new';
eval qq{ use $class; 1 } or croak($@) unless $class->can($new);
$WORKER = $class->$new(@{ $cb->{args} || [] });
}
else {
croak "Bad argument: $cb";
}
serve_fh $server, $VERSION;
# no other way on the broken windows platform, even this leaks
# memory and might fail.
kill 9, $$ if AnyEvent::WIN32;
# and this kills the parent process on windows
POSIX::_exit 0;
}
else {
croak "fork: $!";
}
$self->{child_pid} = $pid;
$self
}
sub _server_pid {
shift->{child_pid}
}
our %KIDW;
our %TERM;
sub kill_child {
my $self = shift;
my $child_pid = delete $self->{child_pid};
my $GD = 0;
{
local $SIG{__WARN__} = sub { $GD = 1 if $_[0] =~ / during global destruction\.\s*$/ };
warn 'test';
}
#print STDERR "killing $child_pid / $GD\n";
if ($child_pid) {
# send SIGKILL in two seconds
$TERM{$child_pid}++;
kill 0 => $child_pid and
kill TERM => $child_pid or $!{ESRCH} or warn "kill $child_pid: $!";
return if $GD;
# MAYBE: kill timer
#my $murder_timer = AnyEvent->timer (
# after => 2,
# cb => sub {
# kill 9, $child_pid
# and delete $TERM{$child_pid};
# },
#);
# reap process
#print STDERR "start reaper $child_pid\n";
$KIDW{$child_pid} = AnyEvent->child (
pid => $child_pid,
cb => sub {
# just hold on to this so it won't go away
#print STDERR "reaped $child_pid\n";
delete $TERM{$child_pid};
delete $KIDW{$child_pid};
# cancel SIGKILL
#undef $murder_timer;
},
);
close $self->{fh};
}
}
sub END {
my $GD = 0;
{
local $SIG{__WARN__} = sub { $GD = 1 if $_[0] =~ / during global destruction\.\s*$/ };
warn 'test';
}
#print STDERR "END $!/$? GD=$GD\n";
for (keys %TERM) {
delete $KIDW{$_};
#print STDERR "END kill $_\n";
kill 0 => $_ and do {
kill KILL => $_ or warn "kill $_ failed: $!";
#print STDERR "END waitpid $_\n";
my $wp = waitpid $_,0;
#print STDERR "END waitpid $_ = $wp\n";
};
#print STDERR "END $_ ($!/$?/${^CHILD_ERROR_NATIVE})\n";
}
undef $!;undef $?;
}
sub DESTROY {
shift->kill_child;
}
sub _error {
my ($self, $error, $filename, $line, $fatal) = @_;
my $caller = '';
my @caller = ($filename,$line);
if ($fatal) {
delete $self->{tw};
delete $self->{rw};
delete $self->{ww};
delete $self->{fh};
# for fatal errors call all enqueued callbacks with error
while (my $req = shift @{$self->{queue}}) {
@caller = ($req->[1],$req->[2]) unless $caller;
$caller ||= " after $req->[1] line $req->[2],";
local $@ = "$error at $req->[1] line $req->[2].\n";
$req->[0]->($self);
}
$self->kill_child;
}
local $@ = $error;
if ($self->{on_error}) {
$self->{on_error}($self, $error, $fatal, @caller);
}
else {
my $e = "$error$caller";
if ($fatal) {
die "$e at $filename, line $line\n";
} else {
warn "$e at $filename, line $line\n";
}
}
}
=item $worker->on_error ($cb->($worker, $filename, $line, $fatal))
Sets (or clears, with C<undef>) the C<on_error> handler.
=cut
sub on_error {
$_[0]{on_error} = $_[1];
}
=item $worker->timeout ($seconds)
Sets (or clears, with C<undef>) the database timeout. Useful to extend the
timeout when you are about to make a really long query.
=cut
sub timeout {
my ($self, $timeout) = @_;
$self->{timeout} = $timeout;
# reschedule timer if one was running
$self->{tw_cb}->();
}
=item $worker->do ( @args, $cb->( $worker, @response ) )
Executes worker code and execure the callback, when response is ready
=cut
sub do {
my $self = shift;
my $cb = pop;
my ($filename,$line) = (caller)[1,2];
unless ($self->{fh}) {
local $@ = my $err = 'no worker connection';
$cb->($self);
$self->_error ($err, $filename, $line, 1);
return;
}
push @{ $self->{queue} }, [$cb, $filename, $line];
# re-start timeout if necessary
if ($self->{timeout} && !$self->{tw}) {
$self->{last_activity} = AnyEvent->now;
$self->{tw_cb}->();
}
$self->{wbuf} .= pack "L/a*", Storable::freeze \@_;
unless ($self->{ww}) {
my $len = syswrite $self->{fh}, $self->{wbuf};
substr $self->{wbuf}, 0, $len, "";
# still any left? then install a write watcher
$self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb})
if length $self->{wbuf};
}
}
=back
=head1 AUTHOR
Mons Anderson, C<< <mons@cpan.org> >>
=head1 ACKNOWLEDGEMENTS
This module based on Marc Lehmann's L<AnyEvent::DBI>
Thanks to Vladimir Timofeev C<< <vovkasm@cpan.org> >> for bugfixes and useful notes
=cut
1; # End of AnyEvent::Worker
( run in 2.888 seconds using v1.01-cache-2.11-cpan-13bb782fe5a )