POE-Wheel-Spawner

 view release on metacpan or  search on metacpan

lib/POE/Wheel/Spawner.pm  view on Meta::CPAN

} ## end sub new

=head2 run(%arg)

optional C<%arg> arguments for L<POE::Session>:

=over

=item

debug

default 0

=item

trace

default 0

=back

create a L<POE::Session>

run L<POE::Kernel>

=cut

sub run {
    my ($self, %arg) = @_;

    ref($self->{workload}) eq 'CODE'
        || die "work_method is not a code reference";

    POE::Session->create(
        options => { debug => $arg{debug} || 0, trace => $arg{trace} || 0 },
        object_states => [
            $self => {
                _start     => '_handle_start',
                _next      => '_handle_start',
                _sig_child => '_handle_sig_child',
                _done      => '_handle_done',
                _stderr    => '_handle_stderr',
                _stdout    => '_handle_stdout',
            }
        ]
    );

    POE::Kernel->run();
} ## end sub run

=head2 spawn($pid)

request to spawn

=cut

sub spawn {
    my ($self, $pid) = @_;
    my $filter = POE::Filter::Reference->new();
    my $output = $filter->put([{ busy_worker_pid => $pid }]);

    print @$output;
} ## end sub spawn

#=head2 _handle_start
#
#handle C<_start> and C<_next> events defined in POE::Session, which is initialized in C<run>.
#
#start execution of C<workload> by C<pool_size> parallel running pids
#
#=cut

sub _handle_start {
    my ($self, $kernel, $heap) = @_[OBJECT, KERNEL, HEAP];

    my $pids_count = scalar(keys(%{ $heap->{worker_by_pid} }));
    ($pids_count >= $self->{pool_size}) && return;

    my $w = POE::Wheel::Run->new(
        Program => sub { &{ $self->{workload} } },
        StdoutFilter => POE::Filter::Reference->new(),
        StdoutEvent  => "_stdout",
        StderrEvent  => "_stderr",
        CloseEvent   => "_done",
    );

    $heap->{worker_by_pid}->{ $w->PID } = $w;
    $kernel->sig_child($w->PID, "_sig_child");
} ## end sub _handle_start

#=head2 _handle_sig_child
#
#Clear heap. Trigger '_next' if !stop_if_done and currently no child is busy
#
#=cut

sub _handle_sig_child {
    my ($self, $kernel, $heap, $pid, $exit_val)
        = @_[OBJECT, KERNEL, HEAP, ARG1, ARG2];

    ++$self->{_workers_sig_count};

    my $child = delete $heap->{worker_by_pid}{$pid};
    unless ($child) {
        POE::Kernel::_die("no child pid: $pid");
    }

    delete $heap->{busy_worker_pid}->{$pid};

    if ($self->{stop_if_done}) {
        ($self->{_workers_sig_count} >= $self->{pool_size}) && return;
    }
    else {
        (scalar(keys(%{ $heap->{busy_worker_pid} })))
            || $kernel->yield("_next");
    }
} ## end sub _handle_sig_child

#=head2 _handle_done
#
#is not implemented yet
#
#=cut

sub _handle_done { }

#=head2 _handle_stderr
#
#provide STDERR to POE::Kernel::_warn
#
#=cut

sub _handle_stderr {
    my ($self, $input, $wheel_id) = @_[OBJECT, ARG0, ARG1];
    POE::Kernel::_warn("wheel $wheel_id STDERR: $input");
}

#=head2 _handle_stdout
#
#evaluate from child to stdout printed result.
#
#trigger _next event if child asks - by using busy_worker_pid printed to stdout - for a sibling
#
#=cut

sub _handle_stdout {
    my ($self, $kernel, $heap, $result) = @_[OBJECT, KERNEL, HEAP, ARG0];
    if (ref($result) eq 'HASH' && $result->{busy_worker_pid}) {
        $heap->{busy_worker_pid}->{ $result->{busy_worker_pid} } = 1;
        $kernel->yield("_next");
    }
} ## end sub _handle_stdout

1;    # End of POE::Wheel::Spawner

=head1 AUTHOR

Alexei Pastuchov E<lt>palik at cpan.orgE<gt>.

=head1 REPOSITORY

L<https://github.com/p-alik/POE-Wheel-Spawner.git>

=head1 LICENSE AND COPYRIGHT


Copyright 2014-2016 by Alexei Pastuchov E<lt>palik at cpan.orgE<gt>.

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.

=cut



( run in 1.600 second using v1.01-cache-2.11-cpan-5a3173703d6 )