AnyEvent-Worker

 view release on metacpan or  search on metacpan

ex/worker-pool.pl  view on Meta::CPAN


my $cv = AE::cv;

my $j1;$j1 = sub {
	my $id = shift;
	$cv->begin;
	$pool->do( test => "Test:$id" , sub {
		#my $g = AnyEvent::Util::guard { $j1->(); $cv->end; };
		return warn "Request died: $@\n" if $@;
		warn "Received response: @_\n";
		my $t;$t = AnyEvent->timer(after => 1, cb => sub {
			undef $t;
			#undef $g;
			$cv->end;
			$j1->();
		});
	});
};
$j1->($_) for 1..5;


ex/worker.pl  view on Meta::CPAN

} );

my $cv = AE::cv;

my $j1;$j1 = sub {
	$cv->begin;
	$worker1->do( test => "P:Data" , sub {
		#guard { $j1->(); $cv->end; };
		return warn "Request died: $@\n" if $@;
		warn "Received response: @_\n";
		my $t;$t = AnyEvent->timer(after => 1, cb => sub {
			undef $t;
			$j1->();
			$cv->end;
		});
	});
};
$j1->();

{
	$cv->begin;

lib/AnyEvent/Worker.pm  view on Meta::CPAN

		Scalar::Util::weaken (my $self = $self);
		
		$self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub {
			return unless $self;
			
			$self->{last_activity} = AnyEvent->now;
			
			my $len = sysread $client, $rbuf, 65536, length $rbuf;
			
			if ($len > 0) {
				# we received data, so reset the timer
				
				while () {
					my $len = unpack "L", $rbuf;
					
					# full response available?
					last unless $len && $len + 4 <= length $rbuf;
					my $res = Storable::thaw substr $rbuf, 4;
					substr $rbuf, 0, $len + 4, ""; # remove length + request
					
					last unless $self;

lib/AnyEvent/Worker.pm  view on Meta::CPAN

		});
		
		$self->{tw_cb} = sub {
			if ($self->{timeout} && $self->{last_activity}) {
				if (AnyEvent->now > $self->{last_activity} + $self->{timeout}) {
					# we did time out
					my $req = $self->{queue}[0];
					$self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal
				} else {
					# we need to re-set the timeout watcher
					$self->{tw} = AnyEvent->timer (
						after => $self->{last_activity} + $self->{timeout} - AnyEvent->now,
						cb    => $self->{tw_cb},
					);
					Scalar::Util::weaken $self;
				}
			} else {
				# no timeout check wanted, or idle
				undef $self->{tw};
			}
		};

lib/AnyEvent/Worker.pm  view on Meta::CPAN

		local $SIG{__WARN__} = sub { $GD = 1 if $_[0] =~ / during global destruction\.\s*$/ };
		warn 'test';
	}
	#print STDERR "killing $child_pid / $GD\n";
	if ($child_pid) {
		# send SIGKILL in two seconds
		$TERM{$child_pid}++;
		kill 0 => $child_pid and
		kill TERM => $child_pid or $!{ESRCH} or warn "kill $child_pid: $!";
		return if $GD;
		# MAYBE: kill timer
		#my $murder_timer = AnyEvent->timer (
		#	after => 2,
		#	cb    => sub {
		#		kill 9, $child_pid
		#			and delete $TERM{$child_pid};
		#	},
		#);
		
		# reap process
		#print STDERR "start reaper $child_pid\n";
		$KIDW{$child_pid} = AnyEvent->child (
			pid => $child_pid,
			cb  => sub {
				# just hold on to this so it won't go away
				#print STDERR "reaped $child_pid\n";
				delete $TERM{$child_pid};
				delete $KIDW{$child_pid};
				# cancel SIGKILL
				#undef $murder_timer;
			},
		);
		
		close $self->{fh};
	}
}
sub END {
	my $GD = 0;
	{
		local $SIG{__WARN__} = sub { $GD = 1 if $_[0] =~ / during global destruction\.\s*$/ };

lib/AnyEvent/Worker.pm  view on Meta::CPAN

Sets (or clears, with C<undef>) the database timeout. Useful to extend the
timeout when you are about to make a really long query.

=cut

sub timeout {
	my ($self, $timeout) = @_;
	
	$self->{timeout} = $timeout;
	
	# reschedule timer if one was running
	$self->{tw_cb}->();
}

=item $worker->do ( @args, $cb->( $worker, @response ) )

Executes worker code and execure the callback, when response is ready

=cut

sub do {



( run in 1.322 second using v1.01-cache-2.11-cpan-49f99fa48dc )