Acme-Sort-Sleep

 view release on metacpan or  search on metacpan

local/lib/perl5/IO/Async/Routine.pm  view on Meta::CPAN

      }
      push @setup, $rd => "keep";
      push @channels_in, [ $ch, $wr, $rd ];
   }

   foreach my $ch ( @{ $self->{channels_out} || [] } ) {
      my ( $rd, $wr );
      unless( $wr = $ch->_extract_write_handle ) {
         ( $rd, $wr ) = IO::Async::OS->pipepair;
      }
      push @setup, $wr => "keep";
      push @channels_out, [ $ch, $rd, $wr ];
   }

   my $code  = $self->{code};

   my $setup = $self->{setup};
   push @setup, @$setup if $setup;

   my $process = IO::Async::Process->new(
      setup => \@setup,
      code => sub {
         foreach ( @channels_in ) {
            my ( $ch, undef, $rd ) = @$_;
            $ch->setup_sync_mode( $rd );
         }
         foreach ( @channels_out ) {
            my ( $ch, undef, $wr ) = @$_;
            $ch->setup_sync_mode( $wr );
         }

         my $ret = $code->();

         foreach ( @channels_in, @channels_out ) {
            my ( $ch ) = @$_;
            $ch->close;
         }

         return $ret;
      },
      on_finish => $self->_replace_weakself( sub {
         my $self = shift or return;
         my ( $exitcode ) = @_;
         $self->maybe_invoke_event( on_finish => $exitcode );

         $self->maybe_invoke_event( on_return => ($exitcode >> 8) ) unless $exitcode & 0x7f;
      }),
      on_exception => $self->_replace_weakself( sub {
         my $self = shift or return;
         my ( $exception, $errno, $exitcode ) = @_;

         $self->maybe_invoke_event( on_die => $exception );
      }),
   );

   foreach ( @channels_in ) {
      my ( $ch, $wr ) = @$_;

      $ch->setup_async_mode( write_handle => $wr );

      $self->add_child( $ch ) unless $ch->parent;
   }

   foreach ( @channels_out ) {
      my ( $ch, $rd ) = @$_;

      $ch->setup_async_mode( read_handle => $rd );

      $self->add_child( $ch ) unless $ch->parent;
   }

   $self->add_child( $self->{process} = $process );
   $self->{id} = "P" . $process->pid;

   foreach ( @channels_in, @channels_out ) {
      my ( undef, undef, $other ) = @$_;
      $other->close;
   }
}

sub _setup_thread
{
   my $self = shift;

   my @channels_in;
   my @channels_out;

   foreach my $ch ( @{ $self->{channels_in} || [] } ) {
      my ( $rd, $wr );
      unless( $rd = $ch->_extract_read_handle ) {
         ( $rd, $wr ) = IO::Async::OS->pipepair;
      }
      push @channels_in, [ $ch, $wr, $rd ];
   }

   foreach my $ch ( @{ $self->{channels_out} || [] } ) {
      my ( $rd, $wr );
      unless( $wr = $ch->_extract_write_handle ) {
         ( $rd, $wr ) = IO::Async::OS->pipepair;
      }
      push @channels_out, [ $ch, $rd, $wr ];
   }

   my $code = $self->{code};

   my $tid = $self->loop->create_thread(
      code => sub {
         foreach ( @channels_in ) {
            my ( $ch, $wr, $rd ) = @$_;
            $ch->setup_sync_mode( $rd );
            $wr->close if $wr;
         }
         foreach ( @channels_out ) {
            my ( $ch, $rd, $wr ) = @$_;
            $ch->setup_sync_mode( $wr );
            $rd->close if $rd;
         }

         my $ret = $code->();

         foreach ( @channels_in, @channels_out ) {
            my ( $ch ) = @$_;
            $ch->close;
         }

         return $ret;
      },
      on_joined => $self->_capture_weakself( sub {
         my $self = shift or return;
         my ( $ev, @result ) = @_;
         $self->maybe_invoke_event( on_finish => @_ );

         $self->maybe_invoke_event( on_return => @result ) if $ev eq "return";
         $self->maybe_invoke_event( on_die => $result[0] ) if $ev eq "died";

         delete $self->{tid};
      }),
   );

   $self->{tid} = $tid;
   $self->{id} = "T" . $tid;

   foreach ( @channels_in ) {
      my ( $ch, $wr, $rd ) = @$_;

      $ch->setup_async_mode( write_handle => $wr );
      $rd->close;

      $self->add_child( $ch ) unless $ch->parent;
   }

   foreach ( @channels_out ) {
      my ( $ch, $rd, $wr ) = @$_;

      $ch->setup_async_mode( read_handle => $rd );
      $wr->close;

      $self->add_child( $ch ) unless $ch->parent;
   }
}

=head1 METHODS

=cut

=head2 id

   $id = $routine->id

Returns an ID string that uniquely identifies the Routine out of all the
currently-running ones. (The ID of already-exited Routines may be reused,
however.)

=cut

sub id
{
   my $self = shift;
   return $self->{id};
}

=head2 model

   $model = $routine->model

Returns the detachment model in use by the Routine.

=cut

sub model
{
   my $self = shift;
   return $self->{model};
}

=head2 kill

   $routine->kill( $signal )

Sends the specified signal to the routine code. This is either implemented by
C<CORE::kill()> or C<threads::kill> as required. Note that in the thread case
this has the usual limits of signal delivery to threads; namely, that it works
at the Perl interpreter level, and cannot actually interrupt blocking system
calls.

=cut

sub kill
{
   my $self = shift;
   my ( $signal ) = @_;

   $self->{process}->kill( $signal ) if $self->{model} eq "fork";
   threads->object( $self->{tid} )->kill( $signal ) if $self->{model} eq "thread";
}

=head1 AUTHOR



( run in 1.174 second using v1.01-cache-2.11-cpan-39bf76dae61 )