POE-Wheel-Spawner
view release on metacpan or search on metacpan
lib/POE/Wheel/Spawner.pm view on Meta::CPAN
} ## end sub new
=head2 run(%arg)
optional C<%arg> arguments for L<POE::Session>:
=over
=item
debug
default 0
=item
trace
default 0
=back
create a L<POE::Session>
run L<POE::Kernel>
=cut
sub run {
my ($self, %arg) = @_;
ref($self->{workload}) eq 'CODE'
|| die "work_method is not a code reference";
POE::Session->create(
options => { debug => $arg{debug} || 0, trace => $arg{trace} || 0 },
object_states => [
$self => {
_start => '_handle_start',
_next => '_handle_start',
_sig_child => '_handle_sig_child',
_done => '_handle_done',
_stderr => '_handle_stderr',
_stdout => '_handle_stdout',
}
]
);
POE::Kernel->run();
} ## end sub run
=head2 spawn($pid)
request to spawn
=cut
sub spawn {
my ($self, $pid) = @_;
my $filter = POE::Filter::Reference->new();
my $output = $filter->put([{ busy_worker_pid => $pid }]);
print @$output;
} ## end sub spawn
#=head2 _handle_start
#
#handle C<_start> and C<_next> events defined in POE::Session, which is initialized in C<run>.
#
#start execution of C<workload> by C<pool_size> parallel running pids
#
#=cut
sub _handle_start {
my ($self, $kernel, $heap) = @_[OBJECT, KERNEL, HEAP];
my $pids_count = scalar(keys(%{ $heap->{worker_by_pid} }));
($pids_count >= $self->{pool_size}) && return;
my $w = POE::Wheel::Run->new(
Program => sub { &{ $self->{workload} } },
StdoutFilter => POE::Filter::Reference->new(),
StdoutEvent => "_stdout",
StderrEvent => "_stderr",
CloseEvent => "_done",
);
$heap->{worker_by_pid}->{ $w->PID } = $w;
$kernel->sig_child($w->PID, "_sig_child");
} ## end sub _handle_start
#=head2 _handle_sig_child
#
#Clear heap. Trigger '_next' if !stop_if_done and currently no child is busy
#
#=cut
sub _handle_sig_child {
my ($self, $kernel, $heap, $pid, $exit_val)
= @_[OBJECT, KERNEL, HEAP, ARG1, ARG2];
++$self->{_workers_sig_count};
my $child = delete $heap->{worker_by_pid}{$pid};
unless ($child) {
POE::Kernel::_die("no child pid: $pid");
}
delete $heap->{busy_worker_pid}->{$pid};
if ($self->{stop_if_done}) {
($self->{_workers_sig_count} >= $self->{pool_size}) && return;
}
else {
(scalar(keys(%{ $heap->{busy_worker_pid} })))
|| $kernel->yield("_next");
}
} ## end sub _handle_sig_child
#=head2 _handle_done
#
#is not implemented yet
#
#=cut
sub _handle_done { }
#=head2 _handle_stderr
#
#provide STDERR to POE::Kernel::_warn
#
#=cut
sub _handle_stderr {
my ($self, $input, $wheel_id) = @_[OBJECT, ARG0, ARG1];
POE::Kernel::_warn("wheel $wheel_id STDERR: $input");
}
#=head2 _handle_stdout
#
#evaluate from child to stdout printed result.
#
#trigger _next event if child asks - by using busy_worker_pid printed to stdout - for a sibling
#
#=cut
sub _handle_stdout {
my ($self, $kernel, $heap, $result) = @_[OBJECT, KERNEL, HEAP, ARG0];
if (ref($result) eq 'HASH' && $result->{busy_worker_pid}) {
$heap->{busy_worker_pid}->{ $result->{busy_worker_pid} } = 1;
$kernel->yield("_next");
}
} ## end sub _handle_stdout
1; # End of POE::Wheel::Spawner
=head1 AUTHOR
Alexei Pastuchov E<lt>palik at cpan.orgE<gt>.
=head1 REPOSITORY
L<https://github.com/p-alik/POE-Wheel-Spawner.git>
=head1 LICENSE AND COPYRIGHT
Copyright 2014-2016 by Alexei Pastuchov E<lt>palik at cpan.orgE<gt>.
This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.
=cut
( run in 1.600 second using v1.01-cache-2.11-cpan-5a3173703d6 )