Acme-Sort-Sleep
view release on metacpan or search on metacpan
local/lib/perl5/IO/Async/Routine.pm view on Meta::CPAN
}
push @setup, $rd => "keep";
push @channels_in, [ $ch, $wr, $rd ];
}
foreach my $ch ( @{ $self->{channels_out} || [] } ) {
my ( $rd, $wr );
unless( $wr = $ch->_extract_write_handle ) {
( $rd, $wr ) = IO::Async::OS->pipepair;
}
push @setup, $wr => "keep";
push @channels_out, [ $ch, $rd, $wr ];
}
my $code = $self->{code};
my $setup = $self->{setup};
push @setup, @$setup if $setup;
my $process = IO::Async::Process->new(
setup => \@setup,
code => sub {
foreach ( @channels_in ) {
my ( $ch, undef, $rd ) = @$_;
$ch->setup_sync_mode( $rd );
}
foreach ( @channels_out ) {
my ( $ch, undef, $wr ) = @$_;
$ch->setup_sync_mode( $wr );
}
my $ret = $code->();
foreach ( @channels_in, @channels_out ) {
my ( $ch ) = @$_;
$ch->close;
}
return $ret;
},
on_finish => $self->_replace_weakself( sub {
my $self = shift or return;
my ( $exitcode ) = @_;
$self->maybe_invoke_event( on_finish => $exitcode );
$self->maybe_invoke_event( on_return => ($exitcode >> 8) ) unless $exitcode & 0x7f;
}),
on_exception => $self->_replace_weakself( sub {
my $self = shift or return;
my ( $exception, $errno, $exitcode ) = @_;
$self->maybe_invoke_event( on_die => $exception );
}),
);
foreach ( @channels_in ) {
my ( $ch, $wr ) = @$_;
$ch->setup_async_mode( write_handle => $wr );
$self->add_child( $ch ) unless $ch->parent;
}
foreach ( @channels_out ) {
my ( $ch, $rd ) = @$_;
$ch->setup_async_mode( read_handle => $rd );
$self->add_child( $ch ) unless $ch->parent;
}
$self->add_child( $self->{process} = $process );
$self->{id} = "P" . $process->pid;
foreach ( @channels_in, @channels_out ) {
my ( undef, undef, $other ) = @$_;
$other->close;
}
}
sub _setup_thread
{
my $self = shift;
my @channels_in;
my @channels_out;
foreach my $ch ( @{ $self->{channels_in} || [] } ) {
my ( $rd, $wr );
unless( $rd = $ch->_extract_read_handle ) {
( $rd, $wr ) = IO::Async::OS->pipepair;
}
push @channels_in, [ $ch, $wr, $rd ];
}
foreach my $ch ( @{ $self->{channels_out} || [] } ) {
my ( $rd, $wr );
unless( $wr = $ch->_extract_write_handle ) {
( $rd, $wr ) = IO::Async::OS->pipepair;
}
push @channels_out, [ $ch, $rd, $wr ];
}
my $code = $self->{code};
my $tid = $self->loop->create_thread(
code => sub {
foreach ( @channels_in ) {
my ( $ch, $wr, $rd ) = @$_;
$ch->setup_sync_mode( $rd );
$wr->close if $wr;
}
foreach ( @channels_out ) {
my ( $ch, $rd, $wr ) = @$_;
$ch->setup_sync_mode( $wr );
$rd->close if $rd;
}
my $ret = $code->();
foreach ( @channels_in, @channels_out ) {
my ( $ch ) = @$_;
$ch->close;
}
return $ret;
},
on_joined => $self->_capture_weakself( sub {
my $self = shift or return;
my ( $ev, @result ) = @_;
$self->maybe_invoke_event( on_finish => @_ );
$self->maybe_invoke_event( on_return => @result ) if $ev eq "return";
$self->maybe_invoke_event( on_die => $result[0] ) if $ev eq "died";
delete $self->{tid};
}),
);
$self->{tid} = $tid;
$self->{id} = "T" . $tid;
foreach ( @channels_in ) {
my ( $ch, $wr, $rd ) = @$_;
$ch->setup_async_mode( write_handle => $wr );
$rd->close;
$self->add_child( $ch ) unless $ch->parent;
}
foreach ( @channels_out ) {
my ( $ch, $rd, $wr ) = @$_;
$ch->setup_async_mode( read_handle => $rd );
$wr->close;
$self->add_child( $ch ) unless $ch->parent;
}
}
=head1 METHODS
=cut
=head2 id
$id = $routine->id
Returns an ID string that uniquely identifies the Routine out of all the
currently-running ones. (The ID of already-exited Routines may be reused,
however.)
=cut
sub id
{
my $self = shift;
return $self->{id};
}
=head2 model
$model = $routine->model
Returns the detachment model in use by the Routine.
=cut
sub model
{
my $self = shift;
return $self->{model};
}
=head2 kill
$routine->kill( $signal )
Sends the specified signal to the routine code. This is either implemented by
C<CORE::kill()> or C<threads::kill> as required. Note that in the thread case
this has the usual limits of signal delivery to threads; namely, that it works
at the Perl interpreter level, and cannot actually interrupt blocking system
calls.
=cut
sub kill
{
my $self = shift;
my ( $signal ) = @_;
$self->{process}->kill( $signal ) if $self->{model} eq "fork";
threads->object( $self->{tid} )->kill( $signal ) if $self->{model} eq "thread";
}
=head1 AUTHOR
( run in 1.174 second using v1.01-cache-2.11-cpan-39bf76dae61 )