AnyEvent-ForkObject

 view release on metacpan or  search on metacpan

lib/AnyEvent/ForkObject.pm  view on Meta::CPAN

package AnyEvent::ForkObject;

use 5.010001;
use strict;
use warnings;

use Carp;
use AnyEvent;
use AnyEvent::Util;
use AnyEvent::Handle;
use Scalar::Util qw(weaken blessed reftype);
use POSIX;
use IO::Handle;
use AnyEvent::Serialize qw(:all);
use AnyEvent::Tools qw(mutex);
use Devel::GlobalDestruction;


our $VERSION = '0.09';

sub new
{
    my ($class) = @_;

    my $self = bless { } => ref($class) || $class;
    my ($s1, $s2) = portable_socketpair;

    if ($self->{pid} = fork) {
        # parent
        $self->{mutex} = mutex;
        close $s2;
        fh_nonblocking $s1, 1;
        {
            weaken(my $self = $self);
            $self->{handle} = new AnyEvent::Handle
                fh => $s1,
                on_error => sub {
                    return unless $self;
                    return if $self->{destroyed};
                    delete $self->{handle};
                    $self->{fatal} = $!;
                    $self->{cb}(fatal => $self->{fatal}) if $self->{cb};
                };
        }
    } elsif (defined $self->{pid}) {
        # child
        close $s1;
        $self->{socket} = $s2;
        $self->{object} = {};
        $self->{no} = 0;
        $self->_start_server;
    } else {
        die $!;
    }

    return $self;
}

sub do :method
{
    my ($self, %opts) = @_;
    my $method = $opts{method} || 'new';
    my $invocant = $opts{module} || $opts{_invocant};
    my $cb = $opts{cb} || sub {  };
    my $args = $opts{args} || [];
    my $wantarray = $opts{wantarray};
    my $require = $opts{require};
    $wantarray = 0 unless exists $opts{wantarray};

    weaken $self;
    $self->{mutex}->lock(sub {
        my ($guard) = @_;
        return unless $self;
        return if $self->{destroyed};

        $self->{cb} = $cb;

        unless ($self->{handle}) {
            $cb->(fatal => 'Child process was destroyed');
            undef $guard;
            return;
        }

        if ($self->{fatal}) {
            $cb->(fatal => $self->{fatal});
            delete $self->{cb};
            undef $guard;
            return;
        }

        serialize {
                $require ? (r => $require) : (
                    i   => $invocant,
                    m   => $method,
                    a   => $args,
                    wa  => $wantarray
                )
            } => sub {
                return unless $self;
                return if $self->{destroyed} or $self->{fatal};

                $self->{handle}->push_write("$_[0]\n");
                return unless $self;
                return if $self->{destroyed} or $self->{fatal};

                $self->{handle}->push_read(line => "\n", sub {
                    deserialize $_[1] => sub {
                        return unless $self;
                        return if $self->{destroyed} or $self->{fatal};

                        my ($o, $error, $tail) = @_;

                        if ($error) {
                            $cb->(fatal => $error);
                            delete $self->{cb};
                            undef $guard;
                            return;
                        }

                        my $status = shift @$o;
                        if ($status eq 'ok') {
                            for (@$o) {
                                if (exists $_->{obj}) {
                                    $_ = bless {
                                        no => "$_->{obj}",
                                        fo => \$self,
                                    } => 'AnyEvent::ForkObject::OneObject';
                                    next;
                                }



( run in 2.130 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )