AnyEvent-ForkObject

 view release on metacpan or  search on metacpan

lib/AnyEvent/ForkObject.pm  view on Meta::CPAN

            } => sub {
                return unless $self;
                return if $self->{destroyed} or $self->{fatal};

                $self->{handle}->push_write("$_[0]\n");
                return unless $self;
                return if $self->{destroyed} or $self->{fatal};

                $self->{handle}->push_read(line => "\n", sub {
                    deserialize $_[1] => sub {
                        return unless $self;
                        return if $self->{destroyed} or $self->{fatal};

                        my ($o, $error, $tail) = @_;

                        if ($error) {
                            $cb->(fatal => $error);
                            delete $self->{cb};
                            undef $guard;
                            return;
                        }

                        my $status = shift @$o;
                        if ($status eq 'ok') {
                            for (@$o) {
                                if (exists $_->{obj}) {
                                    $_ = bless {
                                        no => "$_->{obj}",
                                        fo => \$self,
                                    } => 'AnyEvent::ForkObject::OneObject';
                                    next;
                                }

                                $_ = $_->{res};
                            }
                            $cb->(ok => @$o);
                        } else {
                            $cb->($status => @$o);
                        }
                        delete $self->{cb};
                        undef $guard;
                    };
                    return;
                });

                return;
            };
    });

    return;
}


sub DESTROY
{
    my ($self) = @_;
    $self->{destroyed} = 1;
    $self->{handle}->push_write("'bye'\n") if $self->{handle};
    delete $self->{handle};

    return if in_global_destruction;

    # kill zombies
    my $cw;
    $cw = AE::child $self->{pid} => sub {
        my ($pid, $code) = @_;
        undef $cw;
    };
}

sub _start_server
{
    my ($self) = @_;
    croak "Something wrong" if $self->{pid};
    my $err_code = 0;

    require Data::StreamSerializer;

    my $socket = $self->{socket};
    $socket->autoflush(1);
    while(<$socket>) {
        my $response;
        next unless /\S/;
        my $cmd = eval $_;
        if ($@) {
            $err_code = 1;
            last;
        }

        unless (ref $cmd) {
            if ($cmd eq 'bye') {
                undef $_ for values %{ $self->{object} };
                delete $self->{object};
                last;
            }

            eval $cmd;

            if ($@) {
                $response = [ die => $@ ];
                goto RESPONSE;
            }

            $response = [ 'ok' ];
            goto RESPONSE;
        }

        # require
        if ($cmd->{r}) {
            eval "require $cmd->{r}";
            if ($@) {
                $response = [ die => $@ ];
                goto RESPONSE;
            }

            $response = [ 'ok' ];
            goto RESPONSE;
        }


        my ($invocant, $method, $args, $wantarray) = @$cmd{qw(i m a wa)};

lib/AnyEvent/ForkObject.pm  view on Meta::CPAN

            $_ = { res => $_ };
        }

        $response = [ ok => @o ];

        RESPONSE:
            my $sr = new Data::StreamSerializer($response);
            while(defined(my $part = $sr->next)) {
                print $socket $part;
            }
            print $socket "\n";
    }

    # destroy internal objects
    delete $self->{object};

    # we don't want to call any other destructors
    POSIX::_exit($err_code);
}

package AnyEvent::ForkObject::OneObject;
use Carp;
use Scalar::Util qw(blessed);
use Devel::GlobalDestruction;

sub AUTOLOAD
{
    our $AUTOLOAD;
    my ($foo) = $AUTOLOAD =~ /([^:]+)$/;

    my ($self, @args) = @_;
    my $cb = pop @args;
    my $wantarray = 0;
    if ('CODE' ne ref $cb) {
        $wantarray = $cb;
        $cb = pop @args;
    }
    croak "Callback is required" unless 'CODE' eq ref $cb;

    my $fo = $self->{fo};

    unless ($$fo) {
        $cb->(fatal => 'Child process was already destroyed');
        return;
    }

    $$fo -> do(
        _invocant => $self->{no},
        method    => $foo,
        args      => \@args,
        cb        => $cb,
        wantarray => $wantarray
    );
    return;
}

sub DESTROY
{
    # You can call DESTROY by hand
    my ($self, $cb) = @_;
    return if in_global_destruction;
    $cb ||= sub {  };
    my $fo = $self->{fo};
    unless (blessed $$fo) {
        $cb->(fatal => 'Child process was already destroyed');
        return;
    }

    $$fo -> do(
        _invocant   => $self->{no},
        method      => 'DESTROY',
        cb          => $cb,
        wantarray   => undef,
    );
    return;
}

1;
__END__

=head1 NAME

AnyEvent::ForkObject - Async access on objects.

=head1 SYNOPSIS

    use AnyEvent::ForkObject;
    use DBI;

    my $fo = new AnyEvent::ForkObject;

    $fo->do(
        module => 'DBI',
        method => 'connect',
        args => [ 'dbi:mysql...' ],
        cb => sub {
            my ($status, $dbh) = @_;


            $dbh->selectrow_array('SELECT ?', undef, 1 + 1, sub {
                my ($status, $result) = @_;
                print "$result\n";   # prints 2
            });
        }
    );


    use AnyEvent::Tools qw(async_repeat);

    $dbh->prepare('SELECT * FROM tbl', sub {
        my ($status, $sth) = @_;
        $sth->execute(sub {
            my ($status, $rv) = @_;

            # fetch 30 rows
            async_repeat 30 => sub {
                my ($guard) = @_;

                $sth->fetchrow_hashref(sub {
                    my ($status, $row) = @_;
                    undef $guard;



( run in 3.333 seconds using v1.01-cache-2.11-cpan-524268b4103 )