Async-Simple-Pool

 view release on metacpan or  search on metacpan

lib/Async/Simple/Task/Fork.pm  view on Meta::CPAN

);


#    Object that must have 2 methods: encode + decode.
#    Encoded data must be a singe string value
#    By default serialization uses Data::Serializer with "Storable".
has serializer => (
    is       => 'ro',
    isa      => 'Any',
    lazy     => 1,
    required => 1,
    builder  => 'get_serializer',
);


#  In child always has value = 0
#  For parent has Int value > 0
has pid => (
    is       => 'ro',
    isa      => 'Int',
    required => 1,
    lazy     => 1,
    builder  => 'fork_child',
);


=head2 new()

    my $task = Async::Simple::Task::Fork->new( %all_optional_params );


Possible keys for %all_optional_params:

    task         => coderef, function, called for each "data" passed to child process via $task->put( $data );

    timeout      => timeout in seconds between child checkings for new data passed. default 0.01

    kill_on_exit => kill (1) or not (0) subprocess on object destroy (1 by default).

=cut


=head2 BUILD

internal. Some tricks here:)

    1. Master process called $task->new with fork() inside
    2. After forking done we have two processes:
    2.1. Master gets one side of reader/writer pipes and pid of child
    2.2. Child - another side of pipes and extra logic with everlasting loop

=cut

sub BUILD {
    my ( $self ) = @_;

    # Return for master process
    # Only child tasks must go down and make a loop
    return $self  if $self->pid;

    # Child loop: untill parent is alive
    while ( 1 ) {
        $self->clear_answer;
        $self->get;

        unless ( $self->has_answer ) {
            sleep $self->timeout;
            next;
        }

        my $result = eval{ $self->task->( $self->answer ) };
        $self->clear_answer;
        $self->put( $result // $@ // '' );
    }

    exit(0);
};


=head2 fork_child

Makes child process and returns pid of child process to parent or 0 to child process

=cut

sub fork_child {
    my ( $self ) = @_;

    # This is here instead of BEGIN, because this package uses as "extends" in Async::Simple::Task::ForkTmpFile
    # TODO: Maybe it would be great to move this code(function) to separate package
    # if ( $^O =~ /^(dos|os2|MSWin32|NetWare)$/ ) {
    #     die 'Your OS does not support threads... Use Async::Simple::Task::ForkTmpFile instead.';
    # };

    # Pipes: parent -> child and child -> parent
    pipe my( $parent_reader, $child_writer  )  or die 'Child  to Parent pipe open error';
    pipe my( $child_reader,  $parent_writer )  or die 'Parent to Child  pipe open error';

    my $pid = fork() // die "fork() failed: $!";

    # child
    unless ( $pid ) {
        close $parent_reader;
        close $parent_writer;

        $child_writer->autoflush(1);

        $self->writer( $child_writer );
        $self->reader( $child_reader );

        # Important!
        # Just after that we trap into BUILD
        # with the infinitive loop for child process (pid=0)
        return 0;
    }

    # parent
    close $child_writer;
    close $child_reader;

    $parent_writer->autoflush(1);



( run in 0.508 second using v1.01-cache-2.11-cpan-df04353d9ac )