AnyEvent-ForkObject
view release on metacpan or search on metacpan
lib/AnyEvent/ForkObject.pm view on Meta::CPAN
} => sub {
return unless $self;
return if $self->{destroyed} or $self->{fatal};
$self->{handle}->push_write("$_[0]\n");
return unless $self;
return if $self->{destroyed} or $self->{fatal};
$self->{handle}->push_read(line => "\n", sub {
deserialize $_[1] => sub {
return unless $self;
return if $self->{destroyed} or $self->{fatal};
my ($o, $error, $tail) = @_;
if ($error) {
$cb->(fatal => $error);
delete $self->{cb};
undef $guard;
return;
}
my $status = shift @$o;
if ($status eq 'ok') {
for (@$o) {
if (exists $_->{obj}) {
$_ = bless {
no => "$_->{obj}",
fo => \$self,
} => 'AnyEvent::ForkObject::OneObject';
next;
}
$_ = $_->{res};
}
$cb->(ok => @$o);
} else {
$cb->($status => @$o);
}
delete $self->{cb};
undef $guard;
};
return;
});
return;
};
});
return;
}
sub DESTROY
{
my ($self) = @_;
$self->{destroyed} = 1;
$self->{handle}->push_write("'bye'\n") if $self->{handle};
delete $self->{handle};
return if in_global_destruction;
# kill zombies
my $cw;
$cw = AE::child $self->{pid} => sub {
my ($pid, $code) = @_;
undef $cw;
};
}
sub _start_server
{
my ($self) = @_;
croak "Something wrong" if $self->{pid};
my $err_code = 0;
require Data::StreamSerializer;
my $socket = $self->{socket};
$socket->autoflush(1);
while(<$socket>) {
my $response;
next unless /\S/;
my $cmd = eval $_;
if ($@) {
$err_code = 1;
last;
}
unless (ref $cmd) {
if ($cmd eq 'bye') {
undef $_ for values %{ $self->{object} };
delete $self->{object};
last;
}
eval $cmd;
if ($@) {
$response = [ die => $@ ];
goto RESPONSE;
}
$response = [ 'ok' ];
goto RESPONSE;
}
# require
if ($cmd->{r}) {
eval "require $cmd->{r}";
if ($@) {
$response = [ die => $@ ];
goto RESPONSE;
}
$response = [ 'ok' ];
goto RESPONSE;
}
my ($invocant, $method, $args, $wantarray) = @$cmd{qw(i m a wa)};
lib/AnyEvent/ForkObject.pm view on Meta::CPAN
$_ = { res => $_ };
}
$response = [ ok => @o ];
RESPONSE:
my $sr = new Data::StreamSerializer($response);
while(defined(my $part = $sr->next)) {
print $socket $part;
}
print $socket "\n";
}
# destroy internal objects
delete $self->{object};
# we don't want to call any other destructors
POSIX::_exit($err_code);
}
package AnyEvent::ForkObject::OneObject;
use Carp;
use Scalar::Util qw(blessed);
use Devel::GlobalDestruction;
sub AUTOLOAD
{
our $AUTOLOAD;
my ($foo) = $AUTOLOAD =~ /([^:]+)$/;
my ($self, @args) = @_;
my $cb = pop @args;
my $wantarray = 0;
if ('CODE' ne ref $cb) {
$wantarray = $cb;
$cb = pop @args;
}
croak "Callback is required" unless 'CODE' eq ref $cb;
my $fo = $self->{fo};
unless ($$fo) {
$cb->(fatal => 'Child process was already destroyed');
return;
}
$$fo -> do(
_invocant => $self->{no},
method => $foo,
args => \@args,
cb => $cb,
wantarray => $wantarray
);
return;
}
sub DESTROY
{
# You can call DESTROY by hand
my ($self, $cb) = @_;
return if in_global_destruction;
$cb ||= sub { };
my $fo = $self->{fo};
unless (blessed $$fo) {
$cb->(fatal => 'Child process was already destroyed');
return;
}
$$fo -> do(
_invocant => $self->{no},
method => 'DESTROY',
cb => $cb,
wantarray => undef,
);
return;
}
1;
__END__
=head1 NAME
AnyEvent::ForkObject - Async access on objects.
=head1 SYNOPSIS
use AnyEvent::ForkObject;
use DBI;
my $fo = new AnyEvent::ForkObject;
$fo->do(
module => 'DBI',
method => 'connect',
args => [ 'dbi:mysql...' ],
cb => sub {
my ($status, $dbh) = @_;
$dbh->selectrow_array('SELECT ?', undef, 1 + 1, sub {
my ($status, $result) = @_;
print "$result\n"; # prints 2
});
}
);
use AnyEvent::Tools qw(async_repeat);
$dbh->prepare('SELECT * FROM tbl', sub {
my ($status, $sth) = @_;
$sth->execute(sub {
my ($status, $rv) = @_;
# fetch 30 rows
async_repeat 30 => sub {
my ($guard) = @_;
$sth->fetchrow_hashref(sub {
my ($status, $row) = @_;
undef $guard;
( run in 3.333 seconds using v1.01-cache-2.11-cpan-524268b4103 )