IO-Async

 view release on metacpan or  search on metacpan

lib/IO/Async/Loop.pm  view on Meta::CPAN


   my $on_done;
   # Legacy callbacks
   if( my $on_connected = delete $params{on_connected} ) {
      $on_done = $on_connected;
   }
   elsif( my $on_stream = delete $params{on_stream} ) {
      defined $handle and croak "Cannot pass 'on_stream' with a handle object as well";

      require IO::Async::Stream;
      # TODO: It doesn't make sense to put a SOCK_DGRAM in an
      # IO::Async::Stream but currently we don't detect this
      $handle = IO::Async::Stream->new;
      $on_done = $on_stream;
   }
   elsif( my $on_socket = delete $params{on_socket} ) {
      defined $handle and croak "Cannot pass 'on_socket' with a handle object as well";

      require IO::Async::Socket;
      $handle = IO::Async::Socket->new;
      $on_done = $on_socket;
   }
   elsif( !defined wantarray ) {
      croak "Expected 'on_connected' or 'on_stream' callback or to return a Future";
   }

   my $on_connect_error;
   if( $on_connect_error = $params{on_connect_error} ) {
      # OK
   }
   elsif( !defined wantarray ) {
      croak "Expected 'on_connect_error' callback";
   }

   my $on_resolve_error;
   if( $on_resolve_error = $params{on_resolve_error} ) {
      # OK
   }
   elsif( !defined wantarray and exists $params{host} || exists $params{local_host} ) {
      croak "Expected 'on_resolve_error' callback or to return a Future";
   }

   my $connector = $self->{connector} ||= $self->__new_feature( "IO::Async::Internals::Connector" );

   my $future = $connector->connect( %params );

   $future = $future->then( sub {
      $handle->set_handle( shift );
      return Future->done( $handle )
   }) if $handle;

   $future->on_done( $on_done ) if $on_done;
   $future->on_fail( sub {
      $on_connect_error->( @_[2,3] ) if $on_connect_error and $_[1] eq "connect";
      $on_resolve_error->( $_[2] )   if $on_resolve_error and $_[1] eq "resolve";
   } );

   return $future if defined wantarray;

   # Caller is not going to keep hold of the Future, so we have to ensure it
   # stays alive somehow
   $future->on_ready( sub { undef $future } ); # intentional cycle
}

=head2 listen

   $listener = await $loop->listen( %params );

This method sets up a listening socket and arranges for an acceptor callback
to be invoked each time a new connection is accepted on the socket. Internally
it creates an instance of L<IO::Async::Listener> and adds it to the Loop if
not given one in the arguments.

Addresses may be given directly, or they may be looked up using the system's
name resolver, or a socket handle may be given directly.

If multiple addresses are given, or resolved from the service and hostname,
then each will be attempted in turn until one succeeds.

In named resolver mode, the C<%params> hash takes the following keys:

=over 8

=item service => STRING

The service name to listen on.

=item host => STRING

The hostname to listen on. Optional. Will listen on all addresses if not
supplied.

=item family => INT

=item socktype => INT

=item protocol => INT

=item flags => INT

Optional. Other arguments to pass along with C<host> and C<service> to the
C<getaddrinfo> call.

=item socktype => STRING

Optionally may instead be one of the values C<'stream'>, C<'dgram'> or
C<'raw'> to stand for C<SOCK_STREAM>, C<SOCK_DGRAM> or C<SOCK_RAW>. This
utility is provided to allow the caller to avoid a separate C<use Socket> only
for importing these constants.

=back

It is necessary to pass the C<socktype> hint to the resolver when resolving
the host/service names into an address, as some OS's C<getaddrinfo> functions
require this hint. A warning is emitted if neither C<socktype> nor C<protocol>
hint is defined when performing a C<getaddrinfo> lookup. To avoid this warning
while still specifying no particular C<socktype> hint (perhaps to invoke some
OS-specific behaviour), pass C<0> as the C<socktype> value.

In plain address mode, the C<%params> hash takes the following keys:

lib/IO/Async/Loop.pm  view on Meta::CPAN

      }

      my $method = "${ext}_listen";
      # TODO: Try to 'require IO::Async::$ext'

      $self->can( $method ) or croak "Extension method '$method' is not available";

      my $f = $self->$method(
         %params,
         ( @others ? ( extensions => \@others ) : () ),
      );
      $f->on_fail( sub { $self->remove( $listener ) } ) if $remove_on_error;

      return $f;
   }

   my $on_notifier = delete $params{on_notifier}; # optional

   my $on_listen_error  = delete $params{on_listen_error};
   my $on_resolve_error = delete $params{on_resolve_error};

   # Shortcut
   if( $params{addr} and not $params{addrs} ) {
      $params{addrs} = [ delete $params{addr} ];
   }

   my $f;
   if( my $handle = delete $params{handle} ) {
      $f = $self->_listen_handle( $listener, $handle, %params );
   }
   elsif( my $addrs = delete $params{addrs} ) {
      $on_listen_error or defined wantarray or
         croak "Expected 'on_listen_error' or to return a Future";
      $f = $self->_listen_addrs( $listener, $addrs, %params );
   }
   elsif( defined $params{service} ) {
      $on_listen_error or defined wantarray or
         croak "Expected 'on_listen_error' or to return a Future";
      $on_resolve_error or defined wantarray or
         croak "Expected 'on_resolve_error' or to return a Future";
      $f = $self->_listen_hostservice( $listener, delete $params{host}, delete $params{service}, %params );
   }
   else {
      croak "Expected either 'service' or 'addrs' or 'addr' arguments";
   }

   $f->on_done( $on_notifier ) if $on_notifier;
   if( my $on_listen = $params{on_listen} ) {
      $f->on_done( sub { $on_listen->( shift->read_handle ) } );
   }
   $f->on_fail( sub {
      my ( $message, $how, @rest ) = @_;
      $on_listen_error->( @rest )  if $on_listen_error  and $how eq "listen";
      $on_resolve_error->( @rest ) if $on_resolve_error and $how eq "resolve";
   });
   $f->on_fail( sub { $self->remove( $listener ) } ) if $remove_on_error;

   return $f if defined wantarray;

   # Caller is not going to keep hold of the Future, so we have to ensure it
   # stays alive somehow
   $f->on_ready( sub { undef $f } ); # intentional cycle
}

sub _listen_handle
{
   my $self = shift;
   my ( $listener, $handle, %params ) = @_;

   $listener->configure( handle => $handle );
   return $self->new_future->done( $listener );
}

sub _listen_addrs
{
   my $self = shift;
   my ( $listener, $addrs, %params ) = @_;

   my $queuesize = $params{queuesize} || 3;

   my $on_fail = $params{on_fail};
   !defined $on_fail or ref $on_fail or croak "Expected 'on_fail' to be a reference";

   my $reuseaddr = 1;
   $reuseaddr = 0 if defined $params{reuseaddr} and not $params{reuseaddr};

   my $v6only = $params{v6only};

   my ( $listenerr, $binderr, $sockopterr, $socketerr );

   foreach my $addr ( @$addrs ) {
      my ( $family, $socktype, $proto, $address ) = IO::Async::OS->extract_addrinfo( $addr );

      my $sock;

      unless( $sock = IO::Async::OS->socket( $family, $socktype, $proto ) ) {
         $socketerr = $!;
         $on_fail->( socket => $family, $socktype, $proto, $! ) if $on_fail;
         next;
      }

      $sock->blocking( 0 );

      if( $reuseaddr ) {
         unless( $sock->sockopt( SO_REUSEADDR, 1 ) ) {
            $sockopterr = $!;
            $on_fail->( sockopt => $sock, SO_REUSEADDR, 1, $! ) if $on_fail;
            next;
         }
      }

      if( defined $v6only and $family == AF_INET6 ) {
         unless( $sock->setsockopt( IPPROTO_IPV6, IPV6_V6ONLY, $v6only ) ) {
            $sockopterr = $!;
            $on_fail->( sockopt => $sock, IPV6_V6ONLY, $v6only, $! ) if $on_fail;
            next;
         }
      }

      unless( $sock->bind( $address ) ) {
         $binderr = $!;



( run in 0.978 second using v1.01-cache-2.11-cpan-39bf76dae61 )