IO-Async
view release on metacpan or search on metacpan
lib/IO/Async/Loop.pm view on Meta::CPAN
my $on_done;
# Legacy callbacks
if( my $on_connected = delete $params{on_connected} ) {
$on_done = $on_connected;
}
elsif( my $on_stream = delete $params{on_stream} ) {
defined $handle and croak "Cannot pass 'on_stream' with a handle object as well";
require IO::Async::Stream;
# TODO: It doesn't make sense to put a SOCK_DGRAM in an
# IO::Async::Stream but currently we don't detect this
$handle = IO::Async::Stream->new;
$on_done = $on_stream;
}
elsif( my $on_socket = delete $params{on_socket} ) {
defined $handle and croak "Cannot pass 'on_socket' with a handle object as well";
require IO::Async::Socket;
$handle = IO::Async::Socket->new;
$on_done = $on_socket;
}
elsif( !defined wantarray ) {
croak "Expected 'on_connected' or 'on_stream' callback or to return a Future";
}
my $on_connect_error;
if( $on_connect_error = $params{on_connect_error} ) {
# OK
}
elsif( !defined wantarray ) {
croak "Expected 'on_connect_error' callback";
}
my $on_resolve_error;
if( $on_resolve_error = $params{on_resolve_error} ) {
# OK
}
elsif( !defined wantarray and exists $params{host} || exists $params{local_host} ) {
croak "Expected 'on_resolve_error' callback or to return a Future";
}
my $connector = $self->{connector} ||= $self->__new_feature( "IO::Async::Internals::Connector" );
my $future = $connector->connect( %params );
$future = $future->then( sub {
$handle->set_handle( shift );
return Future->done( $handle )
}) if $handle;
$future->on_done( $on_done ) if $on_done;
$future->on_fail( sub {
$on_connect_error->( @_[2,3] ) if $on_connect_error and $_[1] eq "connect";
$on_resolve_error->( $_[2] ) if $on_resolve_error and $_[1] eq "resolve";
} );
return $future if defined wantarray;
# Caller is not going to keep hold of the Future, so we have to ensure it
# stays alive somehow
$future->on_ready( sub { undef $future } ); # intentional cycle
}
=head2 listen
$listener = await $loop->listen( %params );
This method sets up a listening socket and arranges for an acceptor callback
to be invoked each time a new connection is accepted on the socket. Internally
it creates an instance of L<IO::Async::Listener> and adds it to the Loop if
not given one in the arguments.
Addresses may be given directly, or they may be looked up using the system's
name resolver, or a socket handle may be given directly.
If multiple addresses are given, or resolved from the service and hostname,
then each will be attempted in turn until one succeeds.
In named resolver mode, the C<%params> hash takes the following keys:
=over 8
=item service => STRING
The service name to listen on.
=item host => STRING
The hostname to listen on. Optional. Will listen on all addresses if not
supplied.
=item family => INT
=item socktype => INT
=item protocol => INT
=item flags => INT
Optional. Other arguments to pass along with C<host> and C<service> to the
C<getaddrinfo> call.
=item socktype => STRING
Optionally may instead be one of the values C<'stream'>, C<'dgram'> or
C<'raw'> to stand for C<SOCK_STREAM>, C<SOCK_DGRAM> or C<SOCK_RAW>. This
utility is provided to allow the caller to avoid a separate C<use Socket> only
for importing these constants.
=back
It is necessary to pass the C<socktype> hint to the resolver when resolving
the host/service names into an address, as some OS's C<getaddrinfo> functions
require this hint. A warning is emitted if neither C<socktype> nor C<protocol>
hint is defined when performing a C<getaddrinfo> lookup. To avoid this warning
while still specifying no particular C<socktype> hint (perhaps to invoke some
OS-specific behaviour), pass C<0> as the C<socktype> value.
In plain address mode, the C<%params> hash takes the following keys:
lib/IO/Async/Loop.pm view on Meta::CPAN
}
my $method = "${ext}_listen";
# TODO: Try to 'require IO::Async::$ext'
$self->can( $method ) or croak "Extension method '$method' is not available";
my $f = $self->$method(
%params,
( @others ? ( extensions => \@others ) : () ),
);
$f->on_fail( sub { $self->remove( $listener ) } ) if $remove_on_error;
return $f;
}
my $on_notifier = delete $params{on_notifier}; # optional
my $on_listen_error = delete $params{on_listen_error};
my $on_resolve_error = delete $params{on_resolve_error};
# Shortcut
if( $params{addr} and not $params{addrs} ) {
$params{addrs} = [ delete $params{addr} ];
}
my $f;
if( my $handle = delete $params{handle} ) {
$f = $self->_listen_handle( $listener, $handle, %params );
}
elsif( my $addrs = delete $params{addrs} ) {
$on_listen_error or defined wantarray or
croak "Expected 'on_listen_error' or to return a Future";
$f = $self->_listen_addrs( $listener, $addrs, %params );
}
elsif( defined $params{service} ) {
$on_listen_error or defined wantarray or
croak "Expected 'on_listen_error' or to return a Future";
$on_resolve_error or defined wantarray or
croak "Expected 'on_resolve_error' or to return a Future";
$f = $self->_listen_hostservice( $listener, delete $params{host}, delete $params{service}, %params );
}
else {
croak "Expected either 'service' or 'addrs' or 'addr' arguments";
}
$f->on_done( $on_notifier ) if $on_notifier;
if( my $on_listen = $params{on_listen} ) {
$f->on_done( sub { $on_listen->( shift->read_handle ) } );
}
$f->on_fail( sub {
my ( $message, $how, @rest ) = @_;
$on_listen_error->( @rest ) if $on_listen_error and $how eq "listen";
$on_resolve_error->( @rest ) if $on_resolve_error and $how eq "resolve";
});
$f->on_fail( sub { $self->remove( $listener ) } ) if $remove_on_error;
return $f if defined wantarray;
# Caller is not going to keep hold of the Future, so we have to ensure it
# stays alive somehow
$f->on_ready( sub { undef $f } ); # intentional cycle
}
sub _listen_handle
{
my $self = shift;
my ( $listener, $handle, %params ) = @_;
$listener->configure( handle => $handle );
return $self->new_future->done( $listener );
}
sub _listen_addrs
{
my $self = shift;
my ( $listener, $addrs, %params ) = @_;
my $queuesize = $params{queuesize} || 3;
my $on_fail = $params{on_fail};
!defined $on_fail or ref $on_fail or croak "Expected 'on_fail' to be a reference";
my $reuseaddr = 1;
$reuseaddr = 0 if defined $params{reuseaddr} and not $params{reuseaddr};
my $v6only = $params{v6only};
my ( $listenerr, $binderr, $sockopterr, $socketerr );
foreach my $addr ( @$addrs ) {
my ( $family, $socktype, $proto, $address ) = IO::Async::OS->extract_addrinfo( $addr );
my $sock;
unless( $sock = IO::Async::OS->socket( $family, $socktype, $proto ) ) {
$socketerr = $!;
$on_fail->( socket => $family, $socktype, $proto, $! ) if $on_fail;
next;
}
$sock->blocking( 0 );
if( $reuseaddr ) {
unless( $sock->sockopt( SO_REUSEADDR, 1 ) ) {
$sockopterr = $!;
$on_fail->( sockopt => $sock, SO_REUSEADDR, 1, $! ) if $on_fail;
next;
}
}
if( defined $v6only and $family == AF_INET6 ) {
unless( $sock->setsockopt( IPPROTO_IPV6, IPV6_V6ONLY, $v6only ) ) {
$sockopterr = $!;
$on_fail->( sockopt => $sock, IPV6_V6ONLY, $v6only, $! ) if $on_fail;
next;
}
}
unless( $sock->bind( $address ) ) {
$binderr = $!;
( run in 0.978 second using v1.01-cache-2.11-cpan-39bf76dae61 )