Acme-Sort-Sleep
view release on metacpan or search on metacpan
local/lib/perl5/IO/Async/Routine.pm view on Meta::CPAN
Invoked if the code block returns normally. Note that C<fork()>-based Routines
can only transport an integer result between 0 and 255, as this is the actual
C<exit()> value.
=head2 on_die $exception
Invoked if the code block fails with an exception.
=cut
=head1 PARAMETERS
The following named parameters may be passed to C<new> or C<configure>:
=head2 model => "fork" | "thread"
Optional. Defines how the routine will detach itself from the main process.
C<fork> uses a child process detached using an L<IO::Async::Process>.
C<thread> uses a thread, and is only available on threaded Perls.
If the model is not specified, the environment variable
C<IO_ASYNC_ROUTINE_MODEL> is used to pick a default. If that isn't defined,
C<fork> is preferred if it is available, otherwise C<thread>.
=head2 channels_in => ARRAY of IO::Async::Channel
ARRAY reference of L<IO::Async::Channel> objects to set up for passing values
in to the Routine.
=head2 channels_out => ARRAY of IO::Async::Channel
ARRAY reference of L<IO::Async::Channel> objects to set up for passing values
out of the Routine.
=head2 code => CODE
CODE reference to the body of the Routine, to execute once the channels are
set up.
=head2 setup => ARRAY
Optional. For C<fork()>-based Routines, gives a reference to an array to pass
to the underlying C<Loop> C<fork_child> method. Ignored for thread-based
Routines.
=cut
use constant PREFERRED_MODEL =>
IO::Async::OS->HAVE_POSIX_FORK ? "fork" :
IO::Async::OS->HAVE_THREADS ? "thread" :
die "No viable Routine models";
sub _init
{
my $self = shift;
my ( $params ) = @_;
$params->{model} ||= $ENV{IO_ASYNC_ROUTINE_MODEL} || PREFERRED_MODEL;
$self->SUPER::_init( @_ );
}
sub configure
{
my $self = shift;
my %params = @_;
# TODO: Can only reconfigure when not running
foreach (qw( channels_in channels_out code setup on_finish on_return on_die )) {
$self->{$_} = delete $params{$_} if exists $params{$_};
}
if( defined( my $model = delete $params{model} ) ) {
$model eq "fork" or $model eq "thread" or
croak "Expected 'model' to be either 'fork' or 'thread'";
$model eq "fork" and !IO::Async::OS->HAVE_POSIX_FORK and
croak "Cannot use 'fork' model as fork() is not available";
$model eq "thread" and !IO::Async::OS->HAVE_THREADS and
croak "Cannot use 'thread' model as threads are not available";
$self->{model} = $model;
}
$self->SUPER::configure( %params );
}
sub _add_to_loop
{
my $self = shift;
my ( $loop ) = @_;
$self->SUPER::_add_to_loop( $loop );
return $self->_setup_fork if $self->{model} eq "fork";
return $self->_setup_thread if $self->{model} eq "thread";
die "TODO: unrecognised Routine model $self->{model}";
}
sub _setup_fork
{
my $self = shift;
my @setup;
my @channels_in;
my @channels_out;
foreach my $ch ( @{ $self->{channels_in} || [] } ) {
my ( $rd, $wr );
unless( $rd = $ch->_extract_read_handle ) {
( $rd, $wr ) = IO::Async::OS->pipepair;
}
push @setup, $rd => "keep";
push @channels_in, [ $ch, $wr, $rd ];
}
foreach my $ch ( @{ $self->{channels_out} || [] } ) {
my ( $rd, $wr );
unless( $wr = $ch->_extract_write_handle ) {
( $rd, $wr ) = IO::Async::OS->pipepair;
}
push @setup, $wr => "keep";
push @channels_out, [ $ch, $rd, $wr ];
}
my $code = $self->{code};
my $setup = $self->{setup};
push @setup, @$setup if $setup;
my $process = IO::Async::Process->new(
setup => \@setup,
code => sub {
foreach ( @channels_in ) {
my ( $ch, undef, $rd ) = @$_;
$ch->setup_sync_mode( $rd );
}
foreach ( @channels_out ) {
my ( $ch, undef, $wr ) = @$_;
$ch->setup_sync_mode( $wr );
}
my $ret = $code->();
foreach ( @channels_in, @channels_out ) {
my ( $ch ) = @$_;
$ch->close;
}
return $ret;
},
on_finish => $self->_replace_weakself( sub {
( run in 1.165 second using v1.01-cache-2.11-cpan-8f98c5d2c55 )