AnyEvent-Task
view release on metacpan or search on metacpan
lib/AnyEvent/Task/Server/Worker.pm view on Meta::CPAN
package AnyEvent::Task::Server::Worker;
use common::sense;
use AnyEvent::Util;
use Guard;
use POSIX; ## POSIX::_exit is used so we don't unlink the unix socket file created by our parent before the fork
use IO::Select;
use JSON::XS;
use Scalar::Util qw/blessed/;
my $setup_has_been_run;
my $json;
my $sel;
sub handle_worker {
eval {
handle_worker_wrapped(@_);
};
POSIX::_exit(1);
}
sub handle_worker_wrapped {
my ($server, $fh, $monitor_fh) = @_;
AnyEvent::Util::fh_nonblocking $fh, 0;
AnyEvent::Util::fh_nonblocking $monitor_fh, 0;
$json = JSON::XS->new->utf8;
$sel = IO::Select->new;
$sel->add($fh, $monitor_fh);
while(1) {
my @all_ready = $sel->can_read;
foreach my $ready (@all_ready) {
if ($ready == $monitor_fh) {
## Lost connection to server
$sel->remove($monitor_fh);
} elsif ($ready == $fh) {
process_data($server, $fh);
}
}
}
}
sub process_data {
my ($server, $fh) = @_;
scope_guard { alarm 0 };
local $SIG{ALRM} = sub { print STDERR "Killing hung worker ($$)\n"; POSIX::_exit(1); };
alarm $server->{hung_worker_timeout} if $server->{hung_worker_timeout};
my $read_rv = sysread $fh, my $buf, 4096;
if (!defined $read_rv) {
return if $!{EINTR};
POSIX::_exit(1);
} elsif ($read_rv == 0) {
POSIX::_exit(1);
}
for my $input ($json->incr_parse($buf)) {
my $output;
my $output_meta = {};
my $cmd = shift @$input;
my $input_meta = shift @$input;
if ($cmd eq 'do') {
my $val;
local $AnyEvent::Task::Logger::log_defer_object;
eval {
if (!$setup_has_been_run) {
$server->{setup}->();
$setup_has_been_run = 1;
}
$val = scalar $server->{interface}->(@$input);
};
my $err = $@;
$output_meta->{ld} = $AnyEvent::Task::Logger::log_defer_object->{msg}
( run in 0.795 second using v1.01-cache-2.11-cpan-39bf76dae61 )