AnyEvent-Worker

 view release on metacpan or  search on metacpan

lib/AnyEvent/Worker.pm  view on Meta::CPAN

			
			while () {
				my $len = unpack "L", $rbuf;
				
				# full request available?
				last unless $len && $len + 4 <= length $rbuf;
				
				my $req = Storable::thaw substr $rbuf, 4;
				substr $rbuf, 0, $len + 4, ""; # remove length + request
				local $@;
				my $wbuf = eval {
					++$N;
					if (ref $WORKER eq 'CODE') {
						local $0 = "$O : request $N";
						pack "L/a*", Storable::freeze [ 1, $WORKER->(@$req) ];
					} else {
						my $method = shift @$req;
						#warn ">> request $method";
						local $0 = "$O : request $N : $method";
						pack "L/a*", Storable::freeze [ 1, $WORKER->$method(@$req) ];
					}
				};
				# warn if $@;
				$0 = "$O : idle";
				$wbuf = pack "L/a*", Storable::freeze [ undef, ref $@ ? $@ : "$@" ]
					if $@;
				
				#warn "<< response";
				for (my $ofs = 0; $ofs < length $wbuf; ) {
					my $wr = syswrite $fh, $wbuf, length($wbuf), $ofs;
					defined $wr or $!{EINTR} or die "unable to write results: $!";
					$ofs += $wr;
				}
			}
		}
	};
	warn if $@;
}

sub serve_fd($$) {
	open my $fh, ">>&=$_[0]"
		or die "Couldn't open server file descriptor: $!";

	serve_fh $fh, $_[1];
}

# stupid Storable autoloading, total loss-loss situation
Storable::thaw Storable::freeze [];

=head1 METHODS

=over 4

=cut

sub new {
	my ($class, $cb, %arg) = @_;
	
	my ($client, $server) = AnyEvent::Util::portable_socketpair
		or croak "unable to create Anyevent::Worker communications pipe: $!";
	binmode $client, ':raw';
	binmode $server, ':raw';
	
	my $self = bless \%arg, $class;
	$self->{fh} = $client;
	
	AnyEvent::Util::fh_nonblocking $client, 1;
	
	my $rbuf;
	my @caller = (caller)[1,2]; # the "default" caller
	
	{
		Scalar::Util::weaken (my $self = $self);
		
		$self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub {
			return unless $self;
			
			$self->{last_activity} = AnyEvent->now;
			
			my $len = sysread $client, $rbuf, 65536, length $rbuf;
			
			if ($len > 0) {
				# we received data, so reset the timer
				
				while () {
					my $len = unpack "L", $rbuf;
					
					# full response available?
					last unless $len && $len + 4 <= length $rbuf;
					my $res = Storable::thaw substr $rbuf, 4;
					substr $rbuf, 0, $len + 4, ""; # remove length + request
					
					last unless $self;
					my $req = shift @{ $self->{queue} };
					
					if (defined $res->[0]) {
						$res->[0] = $self;
						$req->[0](@$res);
					} else {
						my $cb = shift @$req;
						{
							local $@ = $res->[1];
							$@ =~ s{\n$}{};
							$cb->($self);
						}
					}
					
					# no more queued requests, so become idle
					undef $self->{last_activity}
						if $self && !@{ $self->{queue} };
				}
			
			}
			elsif (defined $len) {
				# todo, caller?
				$self->_error ("unexpected eof", @caller, 1);
			}
			elsif ($! != Errno::EAGAIN) {
				# todo, caller?
				$self->_error ("read error ".(0+$!).": $!", @caller, 1);
			}
		});



( run in 2.538 seconds using v1.01-cache-2.11-cpan-cdf2f3d4e48 )