Acme-Sort-Sleep
view release on metacpan or search on metacpan
local/lib/perl5/IO/Async/Loop.pm view on Meta::CPAN
defined $kid or croak "Cannot fork() - $!";
if( $kid == 0 ) {
unless( $params{keep_signals} ) {
foreach( keys %SIG ) {
next if m/^__(WARN|DIE)__$/;
$SIG{$_} = "DEFAULT" if ref $SIG{$_} eq "CODE";
}
}
my $exitvalue = eval { $code->() };
defined $exitvalue or $exitvalue = -1;
POSIX::_exit( $exitvalue );
}
if( defined $params{on_exit} ) {
$self->watch_child( $kid => $params{on_exit} );
}
return $kid;
}
=head2 create_thread
$tid = $loop->create_thread( %params )
This method creates a new (non-detached) thread to run the given code block,
returning its thread ID.
=over 8
=item code => CODE
A block of code to execute in the thread. It is called in the context given by
the C<context> argument, and its return value will be available to the
C<on_joined> callback. It is called inside an C<eval> block; if it fails the
exception will be caught.
=item context => "scalar" | "list" | "void"
Optional. Gives the calling context that C<code> is invoked in. Defaults to
C<scalar> if not supplied.
=item on_joined => CODE
Callback to invoke when the thread function returns or throws an exception.
If it returned, this callback will be invoked with its result
$on_joined->( return => @result )
If it threw an exception the callback is invoked with the value of C<$@>
$on_joined->( died => $! )
=back
=cut
# It is basically impossible to have any semblance of order on global
# destruction, and even harder again to rely on when threads are going to be
# terminated and joined. Instead of ensuring we join them all, just detach any
# we no longer care about at END time
my %threads_to_detach; # {$tid} = $thread_weakly
END {
$_ and $_->detach for values %threads_to_detach;
}
sub create_thread
{
my $self = shift;
my %params = @_;
HAVE_THREADS or croak "Threads are not available";
eval { require threads } or croak "This Perl does not support threads";
my $code = $params{code} or croak "Expected 'code' as a CODE reference";
my $on_joined = $params{on_joined} or croak "Expected 'on_joined' as a CODE reference";
my $threadwatches = $self->{threadwatches};
unless( $self->{thread_join_pipe} ) {
( my $rd, $self->{thread_join_pipe} ) = IO::Async::OS->pipepair or
croak "Cannot pipepair - $!";
$self->{thread_join_pipe}->autoflush(1);
$self->watch_io(
handle => $rd,
on_read_ready => sub {
sysread $rd, my $buffer, 8192 or return;
# There's a race condition here in that we might have read from
# the pipe after the returning thread has written to it but before
# it has returned. We'll grab the actual $thread object and
# forcibly ->join it here to ensure we wait for its result.
foreach my $tid ( unpack "N*", $buffer ) {
my ( $thread, $on_joined ) = @{ delete $threadwatches->{$tid} }
or die "ARGH: Can't find threadwatch for tid $tid\n";
$on_joined->( $thread->join );
delete $threads_to_detach{$tid};
}
}
);
}
my $wr = $self->{thread_join_pipe};
my $context = $params{context} || "scalar";
my ( $thread ) = threads->create(
sub {
my ( @ret, $died );
eval {
$context eq "list" ? ( @ret = $code->() ) :
$context eq "scalar" ? ( $ret[0] = $code->() ) :
$code->();
1;
} or $died = $@;
( run in 0.520 second using v1.01-cache-2.11-cpan-39bf76dae61 )