AnyEvent-Worker

 view release on metacpan or  search on metacpan

lib/AnyEvent/Worker.pm  view on Meta::CPAN

		or croak "unable to create Anyevent::Worker communications pipe: $!";
	binmode $client, ':raw';
	binmode $server, ':raw';
	
	my $self = bless \%arg, $class;
	$self->{fh} = $client;
	
	AnyEvent::Util::fh_nonblocking $client, 1;
	
	my $rbuf;
	my @caller = (caller)[1,2]; # the "default" caller
	
	{
		Scalar::Util::weaken (my $self = $self);
		
		$self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub {
			return unless $self;
			
			$self->{last_activity} = AnyEvent->now;
			
			my $len = sysread $client, $rbuf, 65536, length $rbuf;

lib/AnyEvent/Worker.pm  view on Meta::CPAN


=item $worker->do ( @args, $cb->( $worker, @response ) )

Executes worker code and execure the callback, when response is ready

=cut

sub do {
	my $self = shift;
	my $cb = pop;
	my ($filename,$line) = (caller)[1,2];
	
	unless ($self->{fh}) {
		local $@ = my $err = 'no worker connection';
		$cb->($self);
		$self->_error ($err, $filename, $line, 1);
		return;
	}
	
	push @{ $self->{queue} }, [$cb, $filename, $line];
	

lib/AnyEvent/Worker/Pool.pm  view on Meta::CPAN

		$worker->do(@args, sub {
			$self->ret_worker($worker);
			goto &$cb;
		});
	});
	return;
}

sub take_worker {
	my $self = shift;
	my $cb = shift or die "cb required for take_worker at @{[(caller)[1,2]]}\n";
	#warn("take wrk, left ".$#{$self->{pool}}." for @{[(caller)[1,2]]}\n");
	if (@{$self->{pool}}) {
		$cb->(shift @{$self->{pool}});
	} else {
		#warn("no worker for @{[(caller 1)[1,2]]}, maybe increase pool?");
		push @{$self->{waiting_db}},$cb
	}
}

sub ret_worker {
	my $self = shift;
	#warn("ret wrk, got ".@{$self->{pool}}.'+'.@_." for @{[(caller)[1,2]]}\n");
	push @{ $self->{pool} }, @_;
	$self->take_worker(shift @{ $self->{waiting_db} }) if @{ $self->{waiting_db} };
}

=head1 AUTHOR

Mons Anderson, C<< <mons@cpan.org> >>

=head1 COPYRIGHT & LICENSE



( run in 1.202 second using v1.01-cache-2.11-cpan-1e74a51a04c )