Acme-Sort-Sleep
view release on metacpan or search on metacpan
local/lib/perl5/IO/Async/Loop.pm view on Meta::CPAN
example, the resolver may have returned some IPv6 addresses, but only IPv4
routes are valid on the system. In this case, the first C<connect(2)> syscall
will fail. This isn't yet a fatal error, if there are more addresses to try,
perhaps some IPv4 ones.
For this reason, it is possible that the operation eventually succeeds even
though some system calls initially fail. To be aware of individual failures,
the optional C<on_fail> callback can be used. This will be invoked on each
individual C<socket(2)> or C<connect(2)> failure, which may be useful for
debugging or logging.
Because this module simply uses the C<getaddrinfo> resolver, it will be fully
IPv6-aware if the underlying platform's resolver is. This allows programs to
be fully IPv6-capable.
In plain address mode, the C<%params> hash takes the following keys:
=over 8
=item addrs => ARRAY
Reference to an array of (possibly-multiple) address structures to attempt to
connect to. Each should be in the layout described for C<addr>. Such a layout
is returned by the C<getaddrinfo> named resolver.
=item addr => HASH or ARRAY
Shortcut for passing a single address to connect to; it may be passed directly
with this key, instead of in another array on its own. This should be in a
format recognised by L<IO::Async::OS>'s C<extract_addrinfo> method.
This example shows how to use the C<Socket> functions to construct one for TCP
port 8001 on address 10.0.0.1:
$loop->connect(
addr => {
family => "inet",
socktype => "stream",
port => 8001,
ip => "10.0.0.1",
},
...
);
This example shows another way to connect to a UNIX socket at F<echo.sock>.
$loop->connect(
addr => {
family => "unix",
socktype => "stream",
path => "echo.sock",
},
...
);
=item local_addrs => ARRAY
=item local_addr => HASH or ARRAY
Optional. Similar to the C<addrs> or C<addr> parameters, these specify a local
address or set of addresses to C<bind(2)> the socket to before
C<connect(2)>ing it.
=back
When performing the resolution step too, the C<addrs> or C<addr> keys are
ignored, and instead the following keys are taken:
=over 8
=item host => STRING
=item service => STRING
The hostname and service name to connect to.
=item local_host => STRING
=item local_service => STRING
Optional. The hostname and/or service name to C<bind(2)> the socket to locally
before connecting to the peer.
=item family => INT
=item socktype => INT
=item protocol => INT
=item flags => INT
Optional. Other arguments to pass along with C<host> and C<service> to the
C<getaddrinfo> call.
=item socktype => STRING
Optionally may instead be one of the values C<'stream'>, C<'dgram'> or
C<'raw'> to stand for C<SOCK_STREAM>, C<SOCK_DGRAM> or C<SOCK_RAW>. This
utility is provided to allow the caller to avoid a separate C<use Socket> only
for importing these constants.
=back
It is necessary to pass the C<socktype> hint to the resolver when resolving
the host/service names into an address, as some OS's C<getaddrinfo> functions
require this hint. A warning is emitted if neither C<socktype> nor C<protocol>
hint is defined when performing a C<getaddrinfo> lookup. To avoid this warning
while still specifying no particular C<socktype> hint (perhaps to invoke some
OS-specific behaviour), pass C<0> as the C<socktype> value.
In either case, it also accepts the following arguments:
=over 8
=item handle => IO::Async::Handle
Optional. If given a L<IO::Async::Handle> object or a subclass (such as
L<IO::Async::Stream> or L<IO::Async::Socket> its handle will be set to the
newly-connected socket on success, and that handle used as the result of the
future instead.
=item on_fail => CODE
Optional. After an individual C<socket(2)> or C<connect(2)> syscall has failed,
this callback is invoked to inform of the error. It is passed the name of the
syscall that failed, the arguments that were passed to it, and the error it
generated. I.e.
$on_fail->( "socket", $family, $socktype, $protocol, $! );
$on_fail->( "bind", $sock, $address, $! );
$on_fail->( "connect", $sock, $address, $! );
Because of the "try all" nature when given a list of multiple addresses, this
callback may be invoked multiple times, even before an eventual success.
=back
This method accepts an C<extensions> parameter; see the C<EXTENSIONS> section
below.
=head2 connect (void)
$loop->connect( %params )
When not returning a future, additional parameters can be given containing the
continuations to invoke on success or failure.
=over 8
=item on_connected => CODE
A continuation that is invoked on a successful C<connect(2)> call to a valid
socket. It will be passed the connected socket handle, as an C<IO::Socket>
object.
$on_connected->( $handle )
=item on_stream => CODE
An alternative to C<on_connected>, a continuation that is passed an instance
of L<IO::Async::Stream> when the socket is connected. This is provided as a
convenience for the common case that a Stream object is required as the
transport for a Protocol object.
$on_stream->( $stream )
=item on_socket => CODE
Similar to C<on_stream>, but constructs an instance of L<IO::Async::Socket>.
This is most useful for C<SOCK_DGRAM> or C<SOCK_RAW> sockets.
$on_socket->( $socket )
=item on_connect_error => CODE
A continuation that is invoked after all of the addresses have been tried, and
none of them succeeded. It will be passed the most significant error that
occurred, and the name of the operation it occurred in. Errors from the
C<connect(2)> syscall are considered most significant, then C<bind(2)>, then
finally C<socket(2)>.
$on_connect_error->( $syscall, $! )
=item on_resolve_error => CODE
A continuation that is invoked when the name resolution attempt fails. This is
invoked in the same way as the C<on_error> continuation for the C<resolve>
method.
=back
=cut
sub connect
{
my $self = shift;
my ( %params ) = @_;
my $extensions;
if( $extensions = delete $params{extensions} and @$extensions ) {
my ( $ext, @others ) = @$extensions;
my $method = "${ext}_connect";
# TODO: Try to 'require IO::Async::$ext'
$self->can( $method ) or croak "Extension method '$method' is not available";
return $self->$method(
%params,
( @others ? ( extensions => \@others ) : () ),
);
}
my $handle = $params{handle};
my $on_done;
# Legacy callbacks
if( my $on_connected = delete $params{on_connected} ) {
$on_done = $on_connected;
}
elsif( my $on_stream = delete $params{on_stream} ) {
defined $handle and croak "Cannot pass 'on_stream' with a handle object as well";
require IO::Async::Stream;
# TODO: It doesn't make sense to put a SOCK_DGRAM in an
# IO::Async::Stream but currently we don't detect this
$handle = IO::Async::Stream->new;
$on_done = $on_stream;
}
elsif( my $on_socket = delete $params{on_socket} ) {
defined $handle and croak "Cannot pass 'on_socket' with a handle object as well";
require IO::Async::Socket;
$handle = IO::Async::Socket->new;
$on_done = $on_socket;
}
elsif( !defined wantarray ) {
croak "Expected 'on_connected' or 'on_stream' callback or to return a Future";
}
local/lib/perl5/IO/Async/Loop.pm view on Meta::CPAN
=item socktype => STRING
Optionally may instead be one of the values C<'stream'>, C<'dgram'> or
C<'raw'> to stand for C<SOCK_STREAM>, C<SOCK_DGRAM> or C<SOCK_RAW>. This
utility is provided to allow the caller to avoid a separate C<use Socket> only
for importing these constants.
=back
It is necessary to pass the C<socktype> hint to the resolver when resolving
the host/service names into an address, as some OS's C<getaddrinfo> functions
require this hint. A warning is emitted if neither C<socktype> nor C<protocol>
hint is defined when performing a C<getaddrinfo> lookup. To avoid this warning
while still specifying no particular C<socktype> hint (perhaps to invoke some
OS-specific behaviour), pass C<0> as the C<socktype> value.
In plain address mode, the C<%params> hash takes the following keys:
=over 8
=item addrs => ARRAY
Reference to an array of (possibly-multiple) address structures to attempt to
listen on. Each should be in the layout described for C<addr>. Such a layout
is returned by the C<getaddrinfo> named resolver.
=item addr => ARRAY
Shortcut for passing a single address to listen on; it may be passed directly
with this key, instead of in another array of its own. This should be in a
format recognised by L<IO::Async::OS>'s C<extract_addrinfo> method. See also
the C<EXAMPLES> section.
=back
In direct socket handle mode, the following keys are taken:
=over 8
=item handle => IO
The listening socket handle.
=back
In either case, the following keys are also taken:
=over 8
=item on_fail => CODE
Optional. A callback that is invoked if a syscall fails while attempting to
create a listening sockets. It is passed the name of the syscall that failed,
the arguments that were passed to it, and the error generated. I.e.
$on_fail->( "socket", $family, $socktype, $protocol, $! );
$on_fail->( "sockopt", $sock, $optname, $optval, $! );
$on_fail->( "bind", $sock, $address, $! );
$on_fail->( "listen", $sock, $queuesize, $! );
=item queuesize => INT
Optional. The queue size to pass to the C<listen(2)> calls. If not supplied,
then 3 will be given instead.
=item reuseaddr => BOOL
Optional. If true or not supplied then the C<SO_REUSEADDR> socket option will
be set. To prevent this, pass a false value such as 0.
=item v6only => BOOL
Optional. If defined, sets or clears the C<IPV6_V6ONLY> socket option on
C<PF_INET6> sockets. This option disables the ability of C<PF_INET6> socket to
accept connections from C<AF_INET> addresses. Not all operating systems allow
this option to be disabled.
=back
An alternative which gives more control over the listener, is to create the
L<IO::Async::Listener> object directly and add it explicitly to the Loop.
This method accepts an C<extensions> parameter; see the C<EXTENSIONS> section
below.
=head2 listen (void)
$loop->listen( %params )
When not returning a future, additional parameters can be given containing the
continuations to invoke on success or failure.
=over 8
=item on_notifier => CODE
Optional. A callback that is invoked when the Listener object is ready to
receive connections. The callback is passed the Listener object itself.
$on_notifier->( $listener )
If this callback is required, it may instead be better to construct the
Listener object directly.
=item on_listen => CODE
Optional. A callback that is invoked when the listening socket is ready.
Typically this would be used in the name resolver case, in order to inspect
the socket's sockname address, or otherwise inspect the filehandle.
$on_listen->( $socket )
=item on_listen_error => CODE
A continuation this is invoked after all of the addresses have been tried, and
none of them succeeded. It will be passed the most significant error that
occurred, and the name of the operation it occurred in. Errors from the
C<listen(2)> syscall are considered most significant, then C<bind(2)>, then
C<sockopt(2)>, then finally C<socket(2)>.
=item on_resolve_error => CODE
A continuation that is invoked when the name resolution attempt fails. This is
invoked in the same way as the C<on_error> continuation for the C<resolve>
method.
=back
=cut
sub listen
{
my $self = shift;
my ( %params ) = @_;
my $remove_on_error;
my $listener = $params{listener} ||= do {
$remove_on_error++;
require IO::Async::Listener;
# Our wrappings of these don't want $listener
my %listenerparams;
for (qw( on_accept on_stream on_socket )) {
next unless exists $params{$_};
croak "Cannot ->listen with '$_' and 'listener'" if $params{listener};
my $code = delete $params{$_};
$listenerparams{$_} = sub {
shift;
goto &$code;
};
}
my $listener = IO::Async::Listener->new( %listenerparams );
$self->add( $listener );
$listener
};
my $extensions;
if( $extensions = delete $params{extensions} and @$extensions ) {
my ( $ext, @others ) = @$extensions;
# We happen to know we break older IO::Async::SSL
if( $ext eq "SSL" and $IO::Async::SSL::VERSION < '0.12001' ) {
croak "IO::Async::SSL version too old; need at least 0.12_001; found $IO::Async::SSL::VERSION";
}
my $method = "${ext}_listen";
# TODO: Try to 'require IO::Async::$ext'
$self->can( $method ) or croak "Extension method '$method' is not available";
my $f = $self->$method(
%params,
( @others ? ( extensions => \@others ) : () ),
);
$f->on_fail( sub { $self->remove( $listener ) } ) if $remove_on_error;
local/lib/perl5/IO/Async/Loop.pm view on Meta::CPAN
$f = $self->_listen_handle( $listener, $handle, %params );
}
elsif( my $addrs = delete $params{addrs} ) {
$on_listen_error or defined wantarray or
croak "Expected 'on_listen_error' or to return a Future";
$f = $self->_listen_addrs( $listener, $addrs, %params );
}
elsif( defined $params{service} ) {
$on_listen_error or defined wantarray or
croak "Expected 'on_listen_error' or to return a Future";
$on_resolve_error or defined wantarray or
croak "Expected 'on_resolve_error' or to return a Future";
$f = $self->_listen_hostservice( $listener, delete $params{host}, delete $params{service}, %params );
}
else {
croak "Expected either 'service' or 'addrs' or 'addr' arguments";
}
$f->on_done( $on_notifier ) if $on_notifier;
if( my $on_listen = $params{on_listen} ) {
$f->on_done( sub { $on_listen->( shift->read_handle ) } );
}
$f->on_fail( sub {
my ( $message, $how, @rest ) = @_;
$on_listen_error->( @rest ) if $on_listen_error and $how eq "listen";
$on_resolve_error->( @rest ) if $on_resolve_error and $how eq "resolve";
});
$f->on_fail( sub { $self->remove( $listener ) } ) if $remove_on_error;
return $f if defined wantarray;
# Caller is not going to keep hold of the Future, so we have to ensure it
# stays alive somehow
$f->on_ready( sub { undef $f } ); # intentional cycle
}
sub _listen_handle
{
my $self = shift;
my ( $listener, $handle, %params ) = @_;
$listener->configure( handle => $handle );
return $self->new_future->done( $listener );
}
sub _listen_addrs
{
my $self = shift;
my ( $listener, $addrs, %params ) = @_;
my $queuesize = $params{queuesize} || 3;
my $on_fail = $params{on_fail};
!defined $on_fail or ref $on_fail or croak "Expected 'on_fail' to be a reference";
my $reuseaddr = 1;
$reuseaddr = 0 if defined $params{reuseaddr} and not $params{reuseaddr};
my $v6only = $params{v6only};
my ( $listenerr, $binderr, $sockopterr, $socketerr );
foreach my $addr ( @$addrs ) {
my ( $family, $socktype, $proto, $address ) = IO::Async::OS->extract_addrinfo( $addr );
my $sock;
unless( $sock = IO::Async::OS->socket( $family, $socktype, $proto ) ) {
$socketerr = $!;
$on_fail->( socket => $family, $socktype, $proto, $! ) if $on_fail;
next;
}
if( $reuseaddr ) {
unless( $sock->sockopt( SO_REUSEADDR, 1 ) ) {
$sockopterr = $!;
$on_fail->( sockopt => $sock, SO_REUSEADDR, 1, $! ) if $on_fail;
next;
}
}
if( defined $v6only and $family == AF_INET6 ) {
unless( $sock->setsockopt( IPPROTO_IPV6, IPV6_V6ONLY, $v6only ) ) {
$sockopterr = $!;
$on_fail->( sockopt => $sock, IPV6_V6ONLY, $v6only, $! ) if $on_fail;
next;
}
}
unless( $sock->bind( $address ) ) {
$binderr = $!;
$on_fail->( bind => $sock, $address, $! ) if $on_fail;
next;
}
unless( $sock->listen( $queuesize ) ) {
$listenerr = $!;
$on_fail->( listen => $sock, $queuesize, $! ) if $on_fail;
next;
}
return $self->_listen_handle( $listener, $sock, %params );
}
my $f = $self->new_future;
return $f->fail( "Cannot listen() - $listenerr", listen => listen => $listenerr ) if $listenerr;
return $f->fail( "Cannot bind() - $binderr", listen => bind => $binderr ) if $binderr;
return $f->fail( "Cannot setsockopt() - $sockopterr", listen => sockopt => $sockopterr ) if $sockopterr;
return $f->fail( "Cannot socket() - $socketerr", listen => socket => $socketerr ) if $socketerr;
die 'Oops; $loop->listen failed but no error cause was found';
}
sub _listen_hostservice
{
my $self = shift;
my ( $listener, $host, $service, %params ) = @_;
$host ||= "";
defined $service or $service = ""; # might be 0
my %gai_hints;
exists $params{$_} and $gai_hints{$_} = $params{$_} for qw( family socktype protocol flags );
defined $gai_hints{socktype} or defined $gai_hints{protocol} or
carp "Attempting to ->listen without either 'socktype' or 'protocol' hint is not portable";
$self->resolver->getaddrinfo(
host => $host,
service => $service,
passive => 1,
%gai_hints,
)->then( sub {
my @addrs = @_;
$self->_listen_addrs( $listener, \@addrs, %params );
});
}
=head1 OS ABSTRACTIONS
Because the Magic Constructor searches for OS-specific subclasses of the Loop,
several abstractions of OS services are provided, in case specific OSes need
to give different implementations on that OS.
=cut
=head2 signame2num
$signum = $loop->signame2num( $signame )
Legacy wrappers around L<IO::Async::OS> functions.
=cut
sub signame2num { shift; IO::Async::OS->signame2num( @_ ) }
=head2 time
$time = $loop->time
Returns the current UNIX time in fractional seconds. This is currently
equivalent to C<Time::HiRes::time> but provided here as a utility for
programs to obtain the time current used by L<IO::Async> for its own timing
purposes.
=cut
sub time
( run in 0.522 second using v1.01-cache-2.11-cpan-2398b32b56e )