Acme-Sort-Sleep

 view release on metacpan or  search on metacpan

local/lib/perl5/IO/Async/Loop.pm  view on Meta::CPAN

   defined $kid or croak "Cannot fork() - $!";

   if( $kid == 0 ) {
      unless( $params{keep_signals} ) {
         foreach( keys %SIG ) {
            next if m/^__(WARN|DIE)__$/;
            $SIG{$_} = "DEFAULT" if ref $SIG{$_} eq "CODE";
         }
      }

      my $exitvalue = eval { $code->() };

      defined $exitvalue or $exitvalue = -1;

      POSIX::_exit( $exitvalue );
   }

   if( defined $params{on_exit} ) {
      $self->watch_child( $kid => $params{on_exit} );
   }

   return $kid;
}

=head2 create_thread

   $tid = $loop->create_thread( %params )

This method creates a new (non-detached) thread to run the given code block,
returning its thread ID.

=over 8

=item code => CODE

A block of code to execute in the thread. It is called in the context given by
the C<context> argument, and its return value will be available to the
C<on_joined> callback. It is called inside an C<eval> block; if it fails the
exception will be caught.

=item context => "scalar" | "list" | "void"

Optional. Gives the calling context that C<code> is invoked in. Defaults to
C<scalar> if not supplied.

=item on_joined => CODE

Callback to invoke when the thread function returns or throws an exception.
If it returned, this callback will be invoked with its result

 $on_joined->( return => @result )

If it threw an exception the callback is invoked with the value of C<$@>

 $on_joined->( died => $! )

=back

=cut

# It is basically impossible to have any semblance of order on global
# destruction, and even harder again to rely on when threads are going to be
# terminated and joined. Instead of ensuring we join them all, just detach any
# we no longer care about at END time
my %threads_to_detach; # {$tid} = $thread_weakly
END {
   $_ and $_->detach for values %threads_to_detach;
}

sub create_thread
{
   my $self = shift;
   my %params = @_;

   HAVE_THREADS or croak "Threads are not available";

   eval { require threads } or croak "This Perl does not support threads";

   my $code = $params{code} or croak "Expected 'code' as a CODE reference";
   my $on_joined = $params{on_joined} or croak "Expected 'on_joined' as a CODE reference";

   my $threadwatches = $self->{threadwatches};

   unless( $self->{thread_join_pipe} ) {
      ( my $rd, $self->{thread_join_pipe} ) = IO::Async::OS->pipepair or
         croak "Cannot pipepair - $!";
      $self->{thread_join_pipe}->autoflush(1);

      $self->watch_io(
         handle => $rd,
         on_read_ready => sub {
            sysread $rd, my $buffer, 8192 or return;

            # There's a race condition here in that we might have read from
            # the pipe after the returning thread has written to it but before
            # it has returned. We'll grab the actual $thread object and
            # forcibly ->join it here to ensure we wait for its result.

            foreach my $tid ( unpack "N*", $buffer ) {
               my ( $thread, $on_joined ) = @{ delete $threadwatches->{$tid} }
                  or die "ARGH: Can't find threadwatch for tid $tid\n";
               $on_joined->( $thread->join );
               delete $threads_to_detach{$tid};
            }
         }
      );
   }

   my $wr = $self->{thread_join_pipe};

   my $context = $params{context} || "scalar";

   my ( $thread ) = threads->create(
      sub {
         my ( @ret, $died );
         eval {
            $context eq "list"   ? ( @ret    = $code->() ) :
            $context eq "scalar" ? ( $ret[0] = $code->() ) :
                                               $code->();
            1;
         } or $died = $@;



( run in 0.520 second using v1.01-cache-2.11-cpan-39bf76dae61 )