Acme-Sort-Sleep

 view release on metacpan or  search on metacpan

local/lib/perl5/IO/Async/Loop.pm  view on Meta::CPAN

       }

       return 0;
    },
 ) );

 $loop->run;

=head1 DESCRIPTION

This module provides an abstract class which implements the core loop of the
L<IO::Async> framework. Its primary purpose is to store a set of
L<IO::Async::Notifier> objects or subclasses of them. It handles all of the
lower-level set manipulation actions, and leaves the actual IO readiness 
testing/notification to the concrete class that implements it. It also
provides other functionality such as signal handling, child process managing,
and timers.

See also the two bundled Loop subclasses:

=over 4

=item L<IO::Async::Loop::Select>

=item L<IO::Async::Loop::Poll>

=back

Or other subclasses that may appear on CPAN which are not part of the core
L<IO::Async> distribution.

=head2 Ignoring SIGPIPE

Since version I<0.66> loading this module automatically ignores C<SIGPIPE>, as
it is highly unlikely that the default-terminate action is the best course of
action for an L<IO::Async>-based program to take. If at load time the handler
disposition is still set as C<DEFAULT>, it is set to ignore. If already
another handler has been placed there by the program code, it will be left
undisturbed.

=cut

# Internal constructor used by subclasses
sub __new
{
   my $class = shift;

   # Detect if the API version provided by the subclass is sufficient
   $class->can( "API_VERSION" ) or
      die "$class is too old for IO::Async $VERSION; it does not provide \->API_VERSION\n";

   $class->API_VERSION >= NEED_API_VERSION or
      die "$class is too old for IO::Async $VERSION; we need API version >= ".NEED_API_VERSION.", it provides ".$class->API_VERSION."\n";

   WATCHDOG_ENABLE and !$class->_CAN_WATCHDOG and
      warn "$class cannot implement IO_ASYNC_WATCHDOG\n";

   my $self = bless {
      notifiers     => {}, # {nkey} = notifier
      iowatches     => {}, # {fd} = [ $on_read_ready, $on_write_ready, $on_hangup ]
      sigattaches   => {}, # {sig} => \@callbacks
      childmanager  => undef,
      childwatches  => {}, # {pid} => $code
      threadwatches => {}, # {tid} => $code
      timequeue     => undef,
      deferrals     => [],
      os            => {}, # A generic scratchpad for IO::Async::OS to store whatever it wants
   }, $class;

   # It's possible this is a specific subclass constructor. We still want the
   # magic IO::Async::Loop->new constructor to yield this if it's the first
   # one
   our $ONE_TRUE_LOOP ||= $self;

   # Legacy support - temporary until all CPAN classes are updated; bump NEEDAPI version at that point
   my $old_timer = $self->can( "enqueue_timer" ) != \&enqueue_timer;
   if( $old_timer != ( $self->can( "cancel_timer" ) != \&cancel_timer ) ) {
      die "$class should overload both ->enqueue_timer and ->cancel_timer, or neither";
   }

   if( $old_timer ) {
      warnings::warnif( deprecated => "Enabling old_timer workaround for old loop class " . $class );
   }

   $self->{old_timer} = $old_timer;

   return $self;
}

=head1 MAGIC CONSTRUCTOR

=head2 new

   $loop = IO::Async::Loop->new

This function attempts to find a good subclass to use, then calls its
constructor. It works by making a list of likely candidate classes, then
trying each one in turn, C<require>ing the module then calling its C<new>
method. If either of these operations fails, the next subclass is tried. If
no class was successful, then an exception is thrown.

The constructed object is cached, and will be returned again by a subsequent
call. The cache will also be set by a constructor on a specific subclass. This
behaviour makes it possible to simply use the normal constructor in a module
that wishes to interract with the main program's Loop, such as an integration
module for another event system.

For example, the following two C<$loop> variables will refer to the same
object:

 use IO::Async::Loop;
 use IO::Async::Loop::Poll;

 my $loop_poll = IO::Async::Loop::Poll->new;

 my $loop = IO::Async::Loop->new;

While it is not advised to do so under normal circumstances, if the program
really wishes to construct more than one Loop object, it can call the
constructor C<really_new>, or invoke one of the subclass-specific constructors
directly.

local/lib/perl5/IO/Async/Loop.pm  view on Meta::CPAN

   my ( $notifier ) = @_;

   if( defined $notifier->parent ) {
      croak "Cannot remove a child notifier directly - remove its parent";
   }

   $self->_remove_noparentcheck( $notifier );
}

sub _remove_noparentcheck
{
   my $self = shift;
   my ( $notifier ) = @_;

   my $nkey = refaddr $notifier;

   exists $self->{notifiers}->{$nkey} or croak "Notifier does not exist in collection";

   delete $self->{notifiers}->{$nkey};

   $notifier->__set_loop( undef );

   $self->_remove_noparentcheck( $_ ) for $notifier->children;

   return;
}

=head2 notifiers

   @notifiers = $loop->notifiers

Returns a list of all the notifier objects currently stored in the Loop.

=cut

sub notifiers
{
   my $self = shift;
   # Sort so the order remains stable under additions/removals
   return map { $self->{notifiers}->{$_} } sort keys %{ $self->{notifiers} };
}

###################
# Looping support #
###################

=head1 LOOPING CONTROL

The following methods control the actual run cycle of the loop, and hence the
program.

=cut

=head2 loop_once

   $count = $loop->loop_once( $timeout )

This method performs a single wait loop using the specific subclass's
underlying mechanism. If C<$timeout> is undef, then no timeout is applied, and
it will wait until an event occurs. The intention of the return value is to
indicate the number of callbacks that this loop executed, though different
subclasses vary in how accurately they can report this. See the documentation
for this method in the specific subclass for more information.

=cut

sub loop_once
{
   my $self = shift;
   my ( $timeout ) = @_;

   croak "Expected that $self overrides ->loop_once";
}

=head2 run

   @result = $loop->run

   $result = $loop->run

Runs the actual IO event loop. This method blocks until the C<stop> method is
called, and returns the result that was passed to C<stop>. In scalar context
only the first result is returned; the others will be discarded if more than
one value was provided. This method may be called recursively.

This method is a recent addition and may not be supported by all the
C<IO::Async::Loop> subclasses currently available on CPAN.

=cut

sub run
{
   my $self = shift;

   local $self->{running} = 1;
   local $self->{result} = [];

   while( $self->{running} ) {
      $self->loop_once( undef );
   }

   return wantarray ? @{ $self->{result} } : $self->{result}[0];
}

=head2 stop

   $loop->stop( @result )

Stops the inner-most C<run> method currently in progress, causing it to return
the given C<@result>.

This method is a recent addition and may not be supported by all the
C<IO::Async::Loop> subclasses currently available on CPAN.

=cut

sub stop
{
   my $self = shift;

   @{ $self->{result} } = @_;

local/lib/perl5/IO/Async/Loop.pm  view on Meta::CPAN


=item on_stream => CODE

An alternative to C<on_connected>, a continuation that is passed an instance
of L<IO::Async::Stream> when the socket is connected. This is provided as a
convenience for the common case that a Stream object is required as the
transport for a Protocol object.

 $on_stream->( $stream )

=item on_socket => CODE

Similar to C<on_stream>, but constructs an instance of L<IO::Async::Socket>.
This is most useful for C<SOCK_DGRAM> or C<SOCK_RAW> sockets.

 $on_socket->( $socket )

=item on_connect_error => CODE

A continuation that is invoked after all of the addresses have been tried, and
none of them succeeded. It will be passed the most significant error that
occurred, and the name of the operation it occurred in. Errors from the
C<connect(2)> syscall are considered most significant, then C<bind(2)>, then
finally C<socket(2)>.

 $on_connect_error->( $syscall, $! )

=item on_resolve_error => CODE

A continuation that is invoked when the name resolution attempt fails. This is
invoked in the same way as the C<on_error> continuation for the C<resolve>
method.

=back

=cut

sub connect
{
   my $self = shift;
   my ( %params ) = @_;

   my $extensions;
   if( $extensions = delete $params{extensions} and @$extensions ) {
      my ( $ext, @others ) = @$extensions;

      my $method = "${ext}_connect";
      # TODO: Try to 'require IO::Async::$ext'

      $self->can( $method ) or croak "Extension method '$method' is not available";

      return $self->$method(
         %params,
         ( @others ? ( extensions => \@others ) : () ),
      );
   }

   my $handle = $params{handle};

   my $on_done;
   # Legacy callbacks
   if( my $on_connected = delete $params{on_connected} ) {
      $on_done = $on_connected;
   }
   elsif( my $on_stream = delete $params{on_stream} ) {
      defined $handle and croak "Cannot pass 'on_stream' with a handle object as well";

      require IO::Async::Stream;
      # TODO: It doesn't make sense to put a SOCK_DGRAM in an
      # IO::Async::Stream but currently we don't detect this
      $handle = IO::Async::Stream->new;
      $on_done = $on_stream;
   }
   elsif( my $on_socket = delete $params{on_socket} ) {
      defined $handle and croak "Cannot pass 'on_socket' with a handle object as well";

      require IO::Async::Socket;
      $handle = IO::Async::Socket->new;
      $on_done = $on_socket;
   }
   elsif( !defined wantarray ) {
      croak "Expected 'on_connected' or 'on_stream' callback or to return a Future";
   }

   my $on_connect_error;
   if( $on_connect_error = $params{on_connect_error} ) {
      # OK
   }
   elsif( !defined wantarray ) {
      croak "Expected 'on_connect_error' callback";
   }

   my $on_resolve_error;
   if( $on_resolve_error = $params{on_resolve_error} ) {
      # OK
   }
   elsif( !defined wantarray and exists $params{host} || exists $params{local_host} ) {
      croak "Expected 'on_resolve_error' callback or to return a Future";
   }

   my $connector = $self->{connector} ||= $self->__new_feature( "IO::Async::Internals::Connector" );

   my $future = $connector->connect( %params );

   $future = $future->then( sub {
      $handle->set_handle( shift );
      return Future->done( $handle )
   }) if $handle;

   $future->on_done( $on_done ) if $on_done;
   $future->on_fail( sub {
      $on_connect_error->( @_[2,3] ) if $on_connect_error and $_[1] eq "connect";
      $on_resolve_error->( $_[2] )   if $on_resolve_error and $_[1] eq "resolve";
   } );

   return $future if defined wantarray;

   # Caller is not going to keep hold of the Future, so we have to ensure it
   # stays alive somehow
   $future->on_ready( sub { undef $future } ); # intentional cycle
}

local/lib/perl5/IO/Async/Loop.pm  view on Meta::CPAN

=cut

# This class specifically does NOT implement this method, so that subclasses
# are forced to. The constructor will be checking....
sub __watch_io
{
   my $self = shift;
   my %params = @_;

   my $handle = delete $params{handle} or croak "Expected 'handle'";
   defined eval { $handle->fileno } or croak "Expected that 'handle' has defined ->fileno";

   # Silent "upgrade" to O_NONBLOCK
   $handle->blocking and $handle->blocking(0);

   my $watch = ( $self->{iowatches}->{$handle->fileno} ||= [] );

   $watch->[0] = $handle;

   if( exists $params{on_read_ready} ) {
      $watch->[1] = delete $params{on_read_ready};
   }

   if( exists $params{on_write_ready} ) {
      $watch->[2] = delete $params{on_write_ready};
   }

   if( exists $params{on_hangup} ) {
      $self->_CAN_ON_HANGUP or croak "Cannot watch_io for 'on_hangup' in ".ref($self);
      $watch->[3] = delete $params{on_hangup};
   }

   keys %params and croak "Unrecognised keys for ->watch_io - " . join( ", ", keys %params );
}

=head2 unwatch_io

   $loop->unwatch_io( %params )

This method removes a watch on an IO handle which was previously installed by
C<watch_io>.

The C<%params> hash takes the following keys:

=over 8

=item handle => IO

The IO handle to remove the watch for.

=item on_read_ready => BOOL

If true, remove the watch for read-readiness.

=item on_write_ready => BOOL

If true, remove the watch for write-readiness.

=back

Either or both callbacks may be removed at once. It is not an error to attempt
to remove a callback that is not present. If both callbacks were provided to
the C<watch_io> method and only one is removed by this method, the other shall
remain.

=cut

sub __unwatch_io
{
   my $self = shift;
   my %params = @_;

   my $handle = delete $params{handle} or croak "Expected 'handle'";

   my $watch = $self->{iowatches}->{$handle->fileno} or return;

   if( delete $params{on_read_ready} ) {
      undef $watch->[1];
   }

   if( delete $params{on_write_ready} ) {
      undef $watch->[2];
   }

   if( delete $params{on_hangup} ) {
      $self->_CAN_ON_HANGUP or croak "Cannot watch_io for 'on_hangup' in ".ref($self);
      undef $watch->[3];
   }

   if( not $watch->[1] and not $watch->[2] and not $watch->[3] ) {
      delete $self->{iowatches}->{$handle->fileno};
   }

   keys %params and croak "Unrecognised keys for ->unwatch_io - " . join( ", ", keys %params );
}

=head2 watch_signal

   $loop->watch_signal( $signal, $code )

This method adds a new signal handler to watch the given signal.

=over 8

=item $signal

The name of the signal to watch to. This should be a bare name like C<TERM>.

=item $code

A CODE reference to the handling callback.

=back

There can only be one callback per signal name. Registering a new one will
remove an existing one.

Applications should use a L<IO::Async::Signal> object, or call
C<attach_signal> instead of using this method.

This and C<unwatch_signal> are optional; a subclass may implement neither, or
both. If it implements neither then signal handling will be performed by the

local/lib/perl5/IO/Async/Loop.pm  view on Meta::CPAN

   delete $childwatches->{$pid};

   if( HAVE_SIGNALS and !keys %$childwatches ) {
      $self->detach_signal( CHLD => delete $self->{childwatch_sigid} );
   }
}

=head1 METHODS FOR SUBCLASSES

The following methods are provided to access internal features which are
required by specific subclasses to implement the loop functionality. The use
cases of each will be documented in the above section.

=cut

=head2 _adjust_timeout

   $loop->_adjust_timeout( \$timeout )

Shortens the timeout value passed in the scalar reference if it is longer in
seconds than the time until the next queued event on the timer queue. If there
are pending idle handlers, the timeout is reduced to zero.

=cut

sub _adjust_timeout
{
   my $self = shift;
   my ( $timeref, %params ) = @_;

   $$timeref = 0, return if @{ $self->{deferrals} };

   if( defined $self->{sigproxy} and !$params{no_sigwait} ) {
      $$timeref = $MAX_SIGWAIT_TIME if !defined $$timeref or $$timeref > $MAX_SIGWAIT_TIME;
   }
   if( !HAVE_SIGNALS and keys %{ $self->{childwatches} } ) {
      $$timeref = $MAX_CHILDWAIT_TIME if !defined $$timeref or $$timeref > $MAX_CHILDWAIT_TIME;
   }

   my $timequeue = $self->{timequeue};
   return unless defined $timequeue;

   my $nexttime = $timequeue->next_time;
   return unless defined $nexttime;

   my $now = exists $params{now} ? $params{now} : $self->time;
   my $timer_delay = $nexttime - $now;

   if( $timer_delay < 0 ) {
      $$timeref = 0;
   }
   elsif( !defined $$timeref or $timer_delay < $$timeref ) {
      $$timeref = $timer_delay;
   }
}

=head2 _manage_queues

   $loop->_manage_queues

Checks the timer queue for callbacks that should have been invoked by now, and
runs them all, removing them from the queue. It also invokes all of the
pending idle handlers. Any new idle handlers installed by these are not
invoked yet; they will wait for the next time this method is called.

=cut

sub _manage_queues
{
   my $self = shift;

   my $count = 0;

   my $timequeue = $self->{timequeue};
   $count += $timequeue->fire if $timequeue;

   my $deferrals = $self->{deferrals};
   $self->{deferrals} = [];

   foreach my $code ( @$deferrals ) {
      $code->();
      $count++;
   }

   my $childwatches = $self->{childwatches};
   if( !HAVE_SIGNALS and keys %$childwatches ) {
      _reap_children( $childwatches );
   }

   return $count;
}

=head1 EXTENSIONS

An Extension is a Perl module that provides extra methods in the
C<IO::Async::Loop> or other packages. They are intended to provide extra
functionality that easily integrates with the rest of the code.

Certain base methods take an C<extensions> parameter; an ARRAY reference
containing a list of extension names. If such a list is passed to a method, it
will immediately call a method whose name is that of the base method, prefixed
by the first extension name in the list, separated by C<_>. If the
C<extensions> list contains more extension names, it will be passed the
remaining ones in another C<extensions> parameter.

For example,

 $loop->connect(
    extensions => [qw( FOO BAR )],
    %args
 )

will become

 $loop->FOO_connect(
    extensions => [qw( BAR )],
    %args
 )

This is provided so that extension modules, such as L<IO::Async::SSL> can
easily be invoked indirectly, by passing extra arguments to C<connect> methods
or similar, without needing every module to be aware of the C<SSL> extension.
This functionality is generic and not limited to C<SSL>; other extensions may
also use it.

The following methods take an C<extensions> parameter:

 $loop->connect
 $loop->listen

If an extension C<listen> method is invoked, it will be passed a C<listener>
parameter even if one was not provided to the original C<< $loop->listen >>
call, and it will not receive any of the C<on_*> event callbacks. It should
use the C<acceptor> parameter on the C<listener> object.

=cut

=head1 STALL WATCHDOG

A well-behaved L<IO::Async> program should spend almost all of its time
blocked on input using the underlying C<IO::Async::Loop> instance. The stall
watchdog is an optional debugging feature to help detect CPU spinlocks and
other bugs, where control is not returned to the loop every so often.

If the watchdog is enabled and an event handler consumes more than a given
amount of real time before returning to the event loop, it will be interrupted
by printing a stack trace and terminating the program. The watchdog is only in
effect while the loop itself is not blocking; it won't fail simply because the
loop instance is waiting for input or timers.

It is implemented using C<SIGALRM>, so if enabled, this signal will no longer
be available to user code. (Though in any case, most uses of C<alarm()> and
C<SIGALRM> are better served by one of the L<IO::Async::Timer> subclasses).

The following environment variables control its behaviour.

=over 4

=item IO_ASYNC_WATCHDOG => BOOL

Enables the stall watchdog if set to a non-zero value.

=item IO_ASYNC_WATCHDOG_INTERVAL => INT

Watchdog interval, in seconds, to pass to the C<alarm(2)> call. Defaults to 10
seconds.

=item IO_ASYNC_WATCHDOG_SIGABRT => BOOL

If enabled, the watchdog signal handler will raise a C<SIGABRT>, which usually
has the effect of breaking out of a running program in debuggers such as
F<gdb>. If not set then the process is terminated by throwing an exception with
C<die>.

=back

=cut

=head1 AUTHOR

Paul Evans <leonerd@leonerd.org.uk>

=cut

0x55AA;



( run in 0.499 second using v1.01-cache-2.11-cpan-39bf76dae61 )