MojoX-HTTP-Async

 view release on metacpan or  search on metacpan

lib/MojoX/HTTP/Async.pm  view on Meta::CPAN


    # let's fill slots
    $ua->add( '/page1.html?lang=en');
    $ua->add( 'http://my-site.com/page2.html');
    $ua->add( Mojo::URL->new("/page/03.html") );
    $ua->add( Mojo::Message::Request->new() );

    # non-blocking requests processing
    while ( $ua->not_empty() ) {
        if (my $tx = $ua->next_response) { # returns an instance of Mojo::Transaction::HTTP class
            print $tx->res->headers->to_string;
        } else {
            # do something else
        }
    }

    # blocking requests processing
    while (my $tx = $ua->wait_for_next_response($timeout)) {
        # do something here
    }

    # how to process connect timeouts
    if (my $error = $tx->req()->error()) {
        say $error->{code};
        say $error->{message};
    }

    # how to process request timeouts and other errors sucn as broken pipes, etc
    if (my $error = $tx->res()->error()) {
        say $error->{code};
        say $error->{message};
    }

    # makes reconnection if either slot was timeouted or was inactive too long
    $ua->refresh_connections();

    # closes everything
    $ua->close_all();

=head1 DESCRIPTION

This library allows to make multiple HTTP/HTTPS request to the particular host in non-blocking mode.

In comparison with C<HTTP::Async>, this library doesn't make a new connection on each request.

And in comparison with C<Mojo::AsyncAwait>, it's more intuitive how to use it, and there is no any Singleton restrictions.

The instance of this class can work only with one domain and scheme: either HTTP or HTTPS.

=head1 LICENSE

This module is distributed under terms of Artistic Perl 5 license.

=cut

use 5.020;
use warnings;
use bytes ();
use Socket qw/ inet_aton pack_sockaddr_in AF_INET SOCK_STREAM SOL_SOCKET SO_KEEPALIVE SO_OOBINLINE IPPROTO_TCP TCP_KEEPIDLE TCP_KEEPINTVL TCP_KEEPCNT /;
#use IO::Socket::IP ();
use IO::Socket::SSL ();
use Fcntl qw/ F_SETFL O_NONBLOCK FD_CLOEXEC O_NOINHERIT /;
use experimental qw/ signatures /;
use Carp qw/ croak /;
use List::Util qw/ first /;
use Time::HiRes qw/ time /;
use Mojo::Message::Request ();
use Mojo::Message::Response ();
use Mojo::Transaction::HTTP ();
use URI ();
use Scalar::Util qw/ blessed /;
use Errno qw / :POSIX /;

our $VERSION = 0.14;

use constant {
    IS_WIN     => ($^O eq 'MSWin32') ? 1 : 0,
    IS_NOT_WIN => ($^O ne 'MSWin32') ? 1 : 0,
};

=head2 new($class, %opts)

The class constructor.

=over

=item host

It's the obligatory option.
Sets the name/address of remote host to be requested.

=item port

By default it's equal to 80.
Sets the port number of remote point.

=item slots

By default it's equal to 5.
Sets the maximum amount of slots.
These slot will be filled one by one if required.

=item ssl

By default it's equal to 0 (means HTTP).
Sets the scheme of requests: HTTP or HTTPS.

=item ssl_opts

It's a HashRef with options to control SSL Layer.
See C<IO::Socket::SSL> constructor arguments for details.

=item connect_timeout

By default it's equal to 1.
Sets connection timeout in seconds.

If it's equal to 0, then there will be no timeout restrictions.

=item request_timeout

By default it's equal to 1.
Sets the time in seconds with granular accuracy as micro seconds.
The awaiting time of response will be limited with this value.

In case of 0 value there will be no time restrictions.

=item sol_socket

It's a HashRef with socket options.
The possible keys are:

B<so_keepalive> - enables TCP KeepAlive on socket.
The default value is 1 (means that option is enabled).

=item B<sol_tcp>

WARNING: These options can be unsupported on some OS platforms.

It's a HashRef with socket TCP-options.

If some key is absent in HashRef then system settings will be used.

The supported keys are shown below:

B<tcp_keepidle> - the time (in seconds) the connection needs to remain idle before TCP starts sending keepalive probes

B<tcp_keepintvl> - the time (in seconds) between individual keepalive probes

B<tcp_keepcnt> - the maximum number of keepalive probes TCP should send before dropping the connection.

=item inactivity_conn_ts

If last response was received C<inactivity_conn_ts> seconds or more ago,
then such slots will be destroyed.

By default the value is 0 (disabled).

=item debug

Enables debug mode. The dbug messages will be printed in STDERR.
By default the value is 0 (disabled).

=back

=cut

sub new ($class, %opts) {
    croak("host is mandatory") if (! $opts{'host'});
    my $self = bless({
        'slots' => 5,

lib/MojoX/HTTP/Async.pm  view on Meta::CPAN

        'request_timeout' => 1, # 1 sec
        'connect_timeout' => 1, # 1 sec
        'sol_socket' => {
            'so_keepalive' => 1,
        },
        'sol_tcp' => {},
        'inactivity_conn_ts' => 0,
        %opts,
        '_conns' => [],
    }, $class);
    return $self;
}

sub _connect ($self, $slot, $proto, $peer_addr) {

    warn("Connecting\n") if $self->{'debug'};

    socket(my $socket, AF_INET, SOCK_STREAM, $proto) || croak("socket error: $!");
    connect($socket, $peer_addr)                     || croak("connect error: $!"); # in case of O_NONBLOCK it will return with EINPROGRESS

    # When a constant is used in an expression, Perl replaces it with its value at compile time,
    # and may then optimize the expression further. In particular, any code in an if (CONSTANT)
    # block will be optimized away if the constant is false.
    if (&IS_NOT_WIN) {
        fcntl($socket, F_SETFL, O_NONBLOCK | FD_CLOEXEC) || croak("fcntl error has occurred: $!");
    }

    if (&IS_WIN) {
        $socket = IO::Socket::IP->new_from_fd(fileno($socket), '+<');
        defined($socket->blocking(0)) or croak("can't set non-blocking state on socket: $!");
        #$socket->sockopt(O_NOINHERIT, 1) or croak("fcntl error has occurred: $!"); # the same as SOCK_CLOEXEC
    }

    my $sol_socket_opts = $self->{'sol_socket'} // {};

    if (exists($sol_socket_opts->{'so_keepalive'})) {
        setsockopt($socket, SOL_SOCKET, SO_KEEPALIVE, 1) || croak("setsockopt error has occurred while setting SO_KEEPALIVE: $!");

        if ($sol_socket_opts->{'so_keepalive'}) {
            my $sol_tcp_opts = $self->{'sol_tcp'} // {};
            state $SOL_TCP = &IPPROTO_TCP();

            if (exists($sol_tcp_opts->{'tcp_keepidle'})) {
                setsockopt($socket, $SOL_TCP, TCP_KEEPIDLE, $sol_tcp_opts->{'tcp_keepidle'}) || croak("setsockopt error has occurred while setting TCP_KEEPIDLE: $!");
            }

            if (exists($sol_tcp_opts->{'tcp_keepintvl'})) {
                setsockopt($socket, $SOL_TCP, TCP_KEEPINTVL, $sol_tcp_opts->{'tcp_keepintvl'}) || croak("setsockopt error has occurred while setting TCP_KEEPINTVL: $!");
            }

            if (exists($sol_tcp_opts->{'tcp_keepcnt'})) {
                setsockopt($socket, $SOL_TCP, TCP_KEEPCNT, $sol_tcp_opts->{'tcp_keepcnt'}) || croak("setsockopt error has occurred while setting TCP_KEEPCNT: $!");
            }
        }
    }

    $slot->{'connected_ts'} = time();
    $slot->{'reader'} = $slot->{'writer'} = $slot->{'socket'} = $socket;
    $slot->{'sock_no'} = fileno($socket);
    if ($self->{'ssl'}) {
        my $ssl_socket = IO::Socket::SSL->new_from_fd($socket, ($self->{'ssl_opts'} // {})->%*);
        croak("error=$!, ssl_error=" . $IO::Socket::SSL::SSL_ERROR) if (!$ssl_socket);
        $ssl_socket->blocking(0); # just to be sure
        $slot->{'reader'} = $slot->{'writer'} = $ssl_socket;
    }
}

sub _connect_slot ($self, $slot) {
    my $timeout = $self->{'connect_timeout'};

    if ($timeout > 0) {
        eval {
            local $SIG{'ALRM'} = sub { die "alarm\n" };
            alarm($timeout);
            $self->_connect($slot, @{$self}{qw/ proto peer_addr /});
            alarm(0);
        };

        my $error = $@;

        alarm(0);

        if ($error) {
            croak($error) if ($error ne "alarm\n");
            $self->_mark_request_as_timeouted($slot, 'Connect timeout');
        }
    } else {
        $self->_connect($slot, @{$self}{qw/ proto peer_addr /});
    }
}

sub _make_connections ($self, $amount) {

    my $host_addr = inet_aton($self->{'host'});
    croak("can't call inet_aton") if (! $host_addr);

    $self->{'peer_addr'} //= pack_sockaddr_in($self->{'port'}, $host_addr);
    $self->{'proto'} //= getprotobyname("tcp");

    for (1 .. $amount) {
        my $slot = $self->_make_slot();
        $self->_connect_slot($slot);
        $self->_add_slot($slot);
    }
}

sub _add_slot ($self, $slot) {
    push($self->{'_conns'}->@*, $slot) if ($slot);
}

sub _make_slot ($self) {
    return {
        'reader' => undef,
        'writer' => undef,
        'socket' => undef,
        'sock_no' => 0,
        'is_busy' => 0,
        'request' => undef,
        'tx' => undef,
        'exp_ts' => 0,
        'tmp_response' => undef,
        'reconnect_is_required' => 0,



( run in 2.231 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )