Coro

 view release on metacpan or  search on metacpan

Coro.pm  view on Meta::CPAN

The Coro object representing the current coro (the last
coro that the Coro scheduler switched to). The initial value is
C<$Coro::main> (of course).

This variable is B<strictly> I<read-only>. You can take copies of the
value stored in it and use it as any other Coro object, but you must
not otherwise modify the variable itself.

=cut

sub current() { $current } # [DEPRECATED]

=item $Coro::idle

This variable is mainly useful to integrate Coro into event loops. It is
usually better to rely on L<Coro::AnyEvent> or L<Coro::EV>, as this is
pretty low-level functionality.

This variable stores a Coro object that is put into the ready queue when
there are no other ready threads (without invoking any ready hooks).

Coro.pm  view on Meta::CPAN

         # for short-lived callbacks, this reduces pressure on the coro pool
         # as the chance is very high that the async_poll coro will be back
         # in the idle state when cede returns
         cede;
      }
      schedule; # sleep well
   }
};
$unblock_scheduler->{desc} = "[unblock_sub scheduler]";

sub unblock_sub(&) {
   my $cb = shift;

   sub {
      unshift @unblock_queue, [$cb, @_];
      $unblock_scheduler->ready;
   }
}

=item $cb = rouse_cb

Coro/AnyEvent.pm  view on Meta::CPAN


Example: wait until STDIN becomes readable, then quit the program.

   use Coro::AnyEvent;
   print "press enter to quit...\n";
   Coro::AnyEvent::readable *STDIN;
   exit 0;

=cut

sub poll() {
   my $w = AE::timer 0, 0, Coro::rouse_cb;
   Coro::rouse_wait;
}

sub sleep($) {
   my $w = AE::timer $_[0], 0, Coro::rouse_cb;
   Coro::rouse_wait;
}

sub idle() {
   my $w = AE::idle Coro::rouse_cb;
   Coro::rouse_wait;
}

sub idle_upto($) {
   my $cb = Coro::rouse_cb;
   my $t = AE::timer shift, 0, $cb;
   my $w = AE::idle $cb;
   Coro::rouse_wait;
}

sub readable($;$) {
   my $cb = Coro::rouse_cb;
   my $w = AE::io $_[0], 0, sub { $cb->(1) };
   my $t = defined $_[1] && AE::timer $_[1], 0, sub { $cb->(0) };
   Coro::rouse_wait
}

sub writable($;$) {
   my $cb = Coro::rouse_cb;
   my $w = AE::io $_[0], 1, sub { $cb->(1) };
   my $t = defined $_[1] && AE::timer $_[1], 0, sub { $cb->(0) };
   Coro::rouse_wait
}

sub Coro::AnyEvent::CondVar::send {
   (delete $_[0]{_ae_coro})->ready if $_[0]{_ae_coro};

   &AnyEvent::CondVar::Base::send;

Coro/Debug.pm  view on Meta::CPAN

   my ($pid) = @_;

   if (my ($coro) = grep $_ == $pid, Coro::State::list) {
      $coro
   } else {
      print "$pid: no such coroutine\n";
      undef
   }
}

sub format_msg($$) {
   my ($time, $micro) = Coro::Util::gettimeofday;
   my ($sec, $min, $hour, $day, $mon, $year) = gmtime $time;
   my $date = sprintf "%04d-%02d-%02dZ%02d:%02d:%02d.%04d",
                      $year + 1900, $mon + 1, $day, $hour, $min, $sec, $micro / 100;
   sprintf "%s (%d) %s", $date, $_[0], $_[1]
}

sub format_num4($) {
   my ($v) = @_;

   return sprintf "%4d"   , $v                     if $v <  1e4;
   # 1e5 redundant
   return sprintf "%3.0fk", $v /             1_000 if $v <  1e6;
   return sprintf "%1.1fM", $v /         1_000_000 if $v <  1e7 * .995;
   return sprintf "%3.0fM", $v /         1_000_000 if $v <  1e9;
   return sprintf "%1.1fG", $v /     1_000_000_000 if $v < 1e10 * .995;
   return sprintf "%3.0fG", $v /     1_000_000_000 if $v < 1e12;
   return sprintf "%1.1fT", $v / 1_000_000_000_000 if $v < 1e13 * .995;

Coro/Debug.pm  view on Meta::CPAN

environment variable PERL_CORO_STDERR_LOGLEVEL, or -1 if missing).

=item session_loglevel $level

Set the default loglevel for new coro debug sessions (defaults to the
value of the environment variable PERL_CORO_DEFAULT_LOGLEVEL, or -1 if
missing).

=cut

sub log($$) {
   my ($level, $msg) = @_;
   $msg =~ s/\s*$/\n/;
   $_->($level, $msg) for values %log;
   printf STDERR format_msg $level, $msg if $level <= $ERRLOGLEVEL;
}

sub session_loglevel($) {
   $SESLOGLEVEL = shift;
}

sub stderr_loglevel($) {
   $ERRLOGLEVEL = shift;
}

=item trace $coro, $loglevel

Enables tracing the given coroutine at the given loglevel. If loglevel is
omitted, use 5. If coro is omitted, trace the current coroutine. Tracing
incurs a very high runtime overhead.

It is not uncommon to enable tracing on oneself by simply calling

Coro/Debug.pm  view on Meta::CPAN

   $buf
}

=item command $string

Execute a debugger command, sending any output to STDOUT. Used by
C<session>, below.

=cut

sub command($) {
   my ($cmd) = @_;

   $cmd =~ s/\s+$//;

   if ($cmd =~ /^ps (?:\s* (\S+))? $/x) {
      print ps_listing;

   } elsif ($cmd =~ /^bt\s+(\d+)$/) {
      if (my $coro = find_coro $1) {
         my $bt;

Coro/Debug.pm  view on Meta::CPAN

   local $| = 1;
}

=item session $fh

Run an interactive debugger session on the given filehandle. Each line entered
is simply passed to C<command> (with a few exceptions).

=cut

sub session($) {
   my ($fh) = @_;

   $fh = Coro::Handle::unblock $fh;
   my $old_fh = select $fh;
   my $guard = guard { select $old_fh };

   my $loglevel = $SESLOGLEVEL;
   local $log{$Coro::current} = sub {
      return unless $_[0] <= $loglevel;
      print $fh "\015", (format_msg $_[0], $_[1]), "> ";

Coro/Handle.pm  view on Meta::CPAN

}

=item $fh = unblock $fh

This is a convenience function that just calls C<new_from_fh> on the
given filehandle. Use it to replace a normal perl filehandle by a
non-(coroutine-)blocking equivalent.

=cut

sub unblock($) {
   new_from_fh Coro::Handle $_[0]
}

=item $fh->writable, $fh->readable

Wait until the filehandle is readable or writable (and return true) or
until an error condition happens (and return false).

=cut

Coro/Makefile.PL  view on Meta::CPAN

use ExtUtils::MakeMaker;
use Config;

$|=1;

my $DEFINE;
my @LIBS = [];

my $threads = $Config{usethreads};

sub have_inc($) {
   scalar grep -r "$_/$_[0]", $Config{usrinc}, split / /, $Config{incpth}
}

use Config;

print <<EOF;

*** *** *** *** *** *** *** *** *** *** *** *** *** *** *** *** *** ***

Coro has a number of configuration options. Due to its maturity, the

Coro/Select.pm  view on Meta::CPAN


sub import {
   my $pkg = shift;
   if (@_) {
      $pkg->export (scalar caller 0, @_);
   } else {
      $pkg->export ("CORE::GLOBAL", "select");
   }
}

sub select(;*$$$) { # not the correct prototype, but well... :()
   if (@_ == 0) {
      return CORE::select
   } elsif (@_ == 1) {
      return CORE::select $_[0]
   } elsif (defined $_[3] && !$_[3]) {
      return CORE::select $_[0], $_[1], $_[2], $_[3]
   } else {
      my $nfound = 0;
      my @w;
      my $wakeup = Coro::rouse_cb;

Coro/Socket.pm  view on Meta::CPAN

use IO::Socket::INET ();

use Coro::Util ();

use base qw(Coro::Handle IO::Socket::INET);

our $VERSION = 6.514;

our (%_proto, %_port);

sub _proto($) {
   $_proto{$_[0]} ||= do {
      ((getprotobyname $_[0])[2] || (getprotobynumber $_[0])[2])
         or croak "unsupported protocol: $_[0]";
   };
}

sub _port($$) {
   $_port{$_[0],$_[1]} ||= do {
      return $_[0] if $_[0] =~ /^\d+$/;

      $_[0] =~ /([^(]+)\s*(?:\((\d+)\))?/x
         or croak "unparsable port number: $_[0]";
      ((getservbyname $1, $_[1])[2]
        || (getservbyport $1, $_[1])[2]
        || $2)
         or croak "unknown port: $_[0]";
   };
}

sub _sa($$$) {
   my ($host, $port, $proto) = @_;

   $port or $host =~ s/:([^:]+)$// and $port = $1;

   my $_proto = _proto($proto);
   my $_port = _port($port, $proto);

   my $_host = Coro::Util::inet_aton $host
      or croak "$host: unable to resolve";

Coro/Storable.pm  view on Meta::CPAN

# wrap xs functions
for (qw(net_pstore pstore net_mstore mstore pretrieve mretrieve dclone)) {
   my $orig = \&{"Storable::$_"};
   *{"Storable::$_"} = eval 'sub (' . (prototype $orig) . ') {
      my $guard = $lock->guard;
      &$orig
   }';
   die if $@;
}

sub thaw($) {
   open my $fh, "<:cede($GRANULARITY)", \$_[0]
      or die "cannot open pst via PerlIO::cede: $!";
   Storable::fd_retrieve $fh
}

sub freeze($) {
   open my $fh, ">:cede($GRANULARITY)", \my $buf
      or die "cannot open pst via PerlIO::cede: $!";
   Storable::store_fd $_[0], $fh;
   close $fh;

   $buf
}

sub nfreeze($) {
   open my $fh, ">:cede($GRANULARITY)", \my $buf
      or die "cannot open pst via PerlIO::cede: $!";
   Storable::nstore_fd $_[0], $fh;
   close $fh;

   $buf
}

sub blocking_thaw($) {
   open my $fh, "<", \$_[0]
      or die "cannot open pst: $!";
   Storable::fd_retrieve $fh
}

sub blocking_freeze($) {
   open my $fh, ">", \my $buf
         or die "cannot open pst: $!";
   Storable::store_fd $_[0], $fh;
   close $fh;

   $buf
}

sub blocking_nfreeze($) {
   open my $fh, ">", \my $buf
         or die "cannot open pst: $!";
   Storable::nstore_fd $_[0], $fh;
   close $fh;

   $buf
}

1;

Coro/Timer.pm  view on Meta::CPAN

      while (condition false) {
         Coro::schedule; # wait until woken up or timeout
         return 0 if $timeout; # timed out
      }

      return 1; # condition satisfied
   }

=cut

sub timeout($) {
   my $current = $Coro::current;
   my $timeout;

   bless [
      \$timeout,
      (AE::timer $_[0], 0, sub {
         $timeout = 1;
         $current->ready;
      }),
   ], "Coro::Timer::Timeout";

Coro/Util.pm  view on Meta::CPAN


our @EXPORT = qw(gethostbyname gethostbyaddr);
our @EXPORT_OK = qw(inet_aton fork_eval);

our $VERSION = 6.514;

our $MAXPARALLEL = 16; # max. number of parallel jobs

my $jobs = new Coro::Semaphore $MAXPARALLEL;

sub _do_asy(&;@) {
   my $sub = shift;
   $jobs->down;
   my $fh;

   my $pid = open $fh, "-|";

   if (!defined $pid) {
      die "fork: $!";
   } elsif (!$pid) {
      syswrite STDOUT, join "\0", map { unpack "H*", $_ } &$sub;

Coro/Util.pm  view on Meta::CPAN


Work similarly to their Perl counterparts, but do not block. Uses
C<AnyEvent::Util::inet_aton> internally.

Does not handle multihomed hosts or IPv6 - consider using
C<AnyEvent::Socket::resolve_sockaddr> or C<AnyEvent::DNS::reverse_lookup>
with the L<Coro> rouse functions instead.

=cut

sub gethostbyname($) {
   AnyEvent::Socket::inet_aton $_[0], Coro::rouse_cb;

   ($_[0], $_[0], &Socket::AF_INET, 4, map +(AnyEvent::Socket::format_address $_), grep length == 4, Coro::rouse_wait)
}

sub gethostbyaddr($$) {
   _do_asy { gethostbyaddr $_[0], $_[1] } @_
}

=item @result = Coro::Util::fork_eval { ... }, @args

Executes the given code block or code reference with the given arguments
in a separate process, returning the results. The return values must be
serialisable with Coro::Storable. It may, of course, block.

Note that using event handling in the sub is not usually a good idea as

Coro/Util.pm  view on Meta::CPAN

         local $/;
         # make my eyes hurt
         pack "C*", unpack "(xxxC)*", <$fh>
      }
   }

   my $alphachannel = do_it "/tmp/img.png";

=cut

sub fork_eval(&@) {
   my ($cb, @args) = @_;

   pipe my $fh1, my $fh2
      or die "pipe: $!";

   my $pid = fork;

   if ($pid) {
      undef $fh2;

Event/Event.pm  view on Meta::CPAN

      unshift @_, Coro::Event::;
      @_ = &$coronew;
      &Coro::schedule while &_next;
      $_[0]->cancel;
      &_event
   };
}

# do schedule in perl to avoid forcing a stack allocation.
# this is about 10% slower, though.
sub next($) {
   &Coro::schedule while &_next;
   &_event
}

sub Coro::Event::Event::hits { $_[0][3] }
sub Coro::Event::Event::got  { $_[0][4] }

=item sweep

Similar to Event::one_event and Event::sweep: The idle task is called once

eg/bench  view on Meta::CPAN

#!/usr/bin/perl

# ->resume is not exactly cheap (it saves/restores a LOT
# of global variables), but shouldn't be slow. just to show
# how fast it is, this little proggie compares a normal subroutine
# call with two calls of transfer in a loop.

use Coro;
use Benchmark;

sub a($) { }

$a = bless {}, main::;

sub b {
   my ($self) = shift;
   $self->{b} = shift if @_;
   $self->{b};
}

$b = async {



( run in 1.434 second using v1.01-cache-2.11-cpan-65fba6d93b7 )