Acme-Sort-Sleep

 view release on metacpan or  search on metacpan

local/lib/perl5/IO/Async/Routine.pm  view on Meta::CPAN


Invoked if the code block returns normally. Note that C<fork()>-based Routines
can only transport an integer result between 0 and 255, as this is the actual
C<exit()> value.

=head2 on_die $exception

Invoked if the code block fails with an exception.

=cut

=head1 PARAMETERS

The following named parameters may be passed to C<new> or C<configure>:

=head2 model => "fork" | "thread"

Optional. Defines how the routine will detach itself from the main process.
C<fork> uses a child process detached using an L<IO::Async::Process>.
C<thread> uses a thread, and is only available on threaded Perls.

If the model is not specified, the environment variable
C<IO_ASYNC_ROUTINE_MODEL> is used to pick a default. If that isn't defined,
C<fork> is preferred if it is available, otherwise C<thread>.

=head2 channels_in => ARRAY of IO::Async::Channel

ARRAY reference of L<IO::Async::Channel> objects to set up for passing values
in to the Routine.

=head2 channels_out => ARRAY of IO::Async::Channel

ARRAY reference of L<IO::Async::Channel> objects to set up for passing values
out of the Routine.

=head2 code => CODE

CODE reference to the body of the Routine, to execute once the channels are
set up.

=head2 setup => ARRAY

Optional. For C<fork()>-based Routines, gives a reference to an array to pass
to the underlying C<Loop> C<fork_child> method. Ignored for thread-based
Routines.

=cut

use constant PREFERRED_MODEL =>
   IO::Async::OS->HAVE_POSIX_FORK ? "fork" :
   IO::Async::OS->HAVE_THREADS    ? "thread" :
      die "No viable Routine models";

sub _init
{
   my $self = shift;
   my ( $params ) = @_;

   $params->{model} ||= $ENV{IO_ASYNC_ROUTINE_MODEL} || PREFERRED_MODEL;

   $self->SUPER::_init( @_ );
}

sub configure
{
   my $self = shift;
   my %params = @_;

   # TODO: Can only reconfigure when not running
   foreach (qw( channels_in channels_out code setup on_finish on_return on_die )) {
      $self->{$_} = delete $params{$_} if exists $params{$_};
   }

   if( defined( my $model = delete $params{model} ) ) {
      $model eq "fork" or $model eq "thread" or
         croak "Expected 'model' to be either 'fork' or 'thread'";

      $model eq "fork" and !IO::Async::OS->HAVE_POSIX_FORK and
         croak "Cannot use 'fork' model as fork() is not available";
      $model eq "thread" and !IO::Async::OS->HAVE_THREADS and
         croak "Cannot use 'thread' model as threads are not available";

      $self->{model} = $model;
   }

   $self->SUPER::configure( %params );
}

sub _add_to_loop
{
   my $self = shift;
   my ( $loop ) = @_;
   $self->SUPER::_add_to_loop( $loop );

   return $self->_setup_fork   if $self->{model} eq "fork";
   return $self->_setup_thread if $self->{model} eq "thread";

   die "TODO: unrecognised Routine model $self->{model}";
}

sub _setup_fork
{
   my $self = shift;

   my @setup;
   my @channels_in;
   my @channels_out;

   foreach my $ch ( @{ $self->{channels_in} || [] } ) {
      my ( $rd, $wr );
      unless( $rd = $ch->_extract_read_handle ) {
         ( $rd, $wr ) = IO::Async::OS->pipepair;
      }
      push @setup, $rd => "keep";
      push @channels_in, [ $ch, $wr, $rd ];
   }

   foreach my $ch ( @{ $self->{channels_out} || [] } ) {
      my ( $rd, $wr );
      unless( $wr = $ch->_extract_write_handle ) {
         ( $rd, $wr ) = IO::Async::OS->pipepair;
      }
      push @setup, $wr => "keep";
      push @channels_out, [ $ch, $rd, $wr ];
   }

   my $code  = $self->{code};

   my $setup = $self->{setup};
   push @setup, @$setup if $setup;

   my $process = IO::Async::Process->new(
      setup => \@setup,
      code => sub {
         foreach ( @channels_in ) {
            my ( $ch, undef, $rd ) = @$_;
            $ch->setup_sync_mode( $rd );
         }
         foreach ( @channels_out ) {
            my ( $ch, undef, $wr ) = @$_;
            $ch->setup_sync_mode( $wr );
         }

         my $ret = $code->();

         foreach ( @channels_in, @channels_out ) {
            my ( $ch ) = @$_;
            $ch->close;
         }

         return $ret;
      },
      on_finish => $self->_replace_weakself( sub {



( run in 1.165 second using v1.01-cache-2.11-cpan-8f98c5d2c55 )