IPC-Fork-Simple

 view release on metacpan or  search on metacpan

lib/IPC/Fork/Simple.pm  view on Meta::CPAN

                            $data_name,
                            0    # send queued data?
                        );
                    }
                    foreach my $data_name ( keys( %{ $self->{'child_info'}->{$cid}->{'data_queue'} } ) ) {
                        _handler_data_to_socket(
                            $self->{'handler_child_socket'},
                            $self->{'child_info'},
                            $cid,
                            $data_name,
                            1    # send queued data?
                        );
                    }
                }
                $self->{'handler_child_socket'}
                 ->send( pack( HEADER_PACKING, FLAG_PACKET_DATA_HANDLER_DATA_CHECKPOINT ) )
                 || die "Failed to report checkpoint to master: $!";
                if ( $r == FLAG_PACKET_GET_CHILD_DATA_AND_EXIT ) {
                    last;
                }

                foreach my $cid ( keys( %{ $self->{'child_info'} } ) ) {
                    $self->{'child_info'}->{$cid}->{'data'}       = {};
                    $self->{'child_info'}->{$cid}->{'data_queue'} = {};
                }
            } elsif ( ( $r != FLAG_PACKET_DATA ) && ( $r != FLAG_RETURN_CHILD_DISCONNECTED ) ) {
                warn "Should not be here! Got packet for: $r";
            }
        }
        # Data handler fork has done its job... exit!
        exit 0;
    }
}

=head2 collect_data_from_handler

Only usable by the master when using the data handler method.

When using the data hander method of operation (see above), this function
will cause the data hander fork to return all data it has received from
children to the master and will cause the data hander to clear its cache
of child data.

The first, optional, parameter defines whether or not the data handler
should stay running after returning all data. For backwards compatibility, the
default (false) is to exit after collecting all data.

If this parameter is set to true, the data handler will not exit after the
data is sent, allowing the caller to collect data again at a later time.

If this parameter is set to false,  no more child processes will be able to
send data back to the master, as the data handler will have exited. This
should only be called after all children have ended.

The second, optional, parameter is one of the BLOCK flags, as used by
L<process_child_data>. See EXAMPLES for details on the meaning of these flags.

=cut

sub collect_data_from_handler {
    my ( $self, $keep_alive, $block ) = @_;
    my ( $r, $msg );

    if ( !$self->{'handler_pid'} ) { return; }
    local $SIG{'PIPE'} = 'IGNORE';

    if ( $keep_alive ) {
        if ( $block == BLOCK_NEVER ) {
            $msg = FLAG_PACKET_GET_CHILD_DATA;
        } elsif ( $block == BLOCK_UNTIL_DATA ) {
            $msg = FLAG_PACKET_GET_CHILD_DATA_BLOCK;
        } elsif ( $block == BLOCK_UNTIL_CHILD ) {
            $msg = FLAG_PACKET_GET_CHILD_DATA_FINISHED_BLOCK;
        } else {
            carp "Invalid value for BLOCK!";
        }
    } else {
        $msg = FLAG_PACKET_GET_CHILD_DATA_AND_EXIT;
    }
    $self->{'handler_socket_comm'}->send( pack( HEADER_PACKING, $msg ) )
     || die "Failed to send data to data handler: $!";

    # _data_from_socket will return when
    # FLAG_PACKET_DATA_HANDLER_DATA_CHECKPOINT is received.
    do {
        $r = $self->_data_from_socket( $self->{'handler_select'}, BLOCK_UNTIL_CHILD );
    } until ( $r == FLAG_PACKET_DATA_HANDLER_DATA_CHECKPOINT );

    if ( !$keep_alive ) {
        # _data_from_socket will return when the remote socket is closed.
        $self->_data_from_socket( $self->{'handler_select'}, BLOCK_UNTIL_CHILD );
        waitpid( $self->{'handler_pid'}, 0 );
        $self->{'handler_pid'} = 0;
    }
    return 1;
}

=head2 init_child

Only usable by a child.

Only to be called by a child after a fork, this method configured this
child for communication with the master (or data handler). Will die on failure.

The first, optional, parameter is a symbolic name for this child with which
the master can retrieve data. Each child will automatically be assigned a
unique id (cid), but the optional symbolic name can be used to simplify
development. If not set, the symbolic name will be set to the process ID. The
symbolic name can not be a zero-length string.

Note: If a symbolic name is re-used, fetching data by symbolic name will fetch
data for one randomly chosen child that shares that name. If symbolic names
will be re-used, it's suggested that data is fetched instead by cid.

Be aware that PIDs, the default symbolic name, may be re-used on a system,
leading to a collision of symbolic names. In order to avoid this issue, do not
call wait (or otherwise reap the child process) until you have fetched (and
then cleared) all of its data. Alternately, address child processes by cid
instead.

=cut

sub init_child {
    my ( $self, $symbolic_name ) = @_;

    # We can't really protect against being called on the master...
    return if $self->{'is_child'};
    local $SIG{'PIPE'} = 'IGNORE';
    delete $self->{'master_socket'};
    delete $self->{'child_info'};

    if ( ( !defined $symbolic_name ) || ( length( $symbolic_name ) == 0 ) ) {
        $symbolic_name = $$;
    }

    $self->{'symbolic_name'} = $symbolic_name;
    $self->{'is_child'}      = 1;
    $self->{'child_socket'} =
     IO::Socket::INET->new( Type => SOCK_STREAM, PeerAddr => $self->{'master_ip'}, PeerPort => $self->{'master_port'} );
    if ( !$self->{'child_socket'} ) {
        die "Failed to connect to master socket " . $self->{'master_port'} . ": $!";
    }

    $self->{'child_socket'}->send(
        pack(
            HEADER_PACKING . HEADER_CHILD_HELLO_PACKING,    # Packing
            FLAG_PACKET_CHILD_HELLO,
            $self->{'shared_key'},
            length( $self->{'symbolic_name'} )



( run in 0.851 second using v1.01-cache-2.11-cpan-39bf76dae61 )