MojoX-HTTP-Async

 view release on metacpan or  search on metacpan

README  view on Meta::CPAN

      that GET HTTP method will be used.

    $timeout

      Time in seconds. Can be fractional with microseconds tolerance.

      The request_timeout from conmtrucor will be used by default.

 not_empty($self)

    Returns 1 if there even one slot is busy or slot contains a not
    processed response. Otherwise the method returns 0.

 wait_for_next_response($self, $timeout = 0)

    Waits for first received response or time-outed request in any slot.
    Returns the Mojo::Transaction::HTTP instance with result.

    $timeout

      Period of time in seconds. Can be fractial with microsecond

README.md  view on Meta::CPAN


        Using of string with URI or an instance of "Mojo::URL" class assumes
        that GET HTTP method will be used.

###### $timeout
        Time in seconds. Can be fractional with microseconds tolerance.

        The "request_timeout" from construcor will be used by default.

##### not_empty($self)
    Returns 1 if there even one slot is busy or slot contains a not
    processed response. Otherwise the method returns 0.

##### wait_for_next_response($self, $timeout = 0)
    Waits for first received response or time-outed request in any slot.
    Returns the "Mojo::Transaction::HTTP" instance with result.

###### $timeout
        Period of time in seconds. Can be fractional with microsecond
        tolerance. The response will be marked as time-outed after this time is out.

lib/MojoX/HTTP/Async.pm  view on Meta::CPAN

sub _add_slot ($self, $slot) {
    push($self->{'_conns'}->@*, $slot) if ($slot);
}

sub _make_slot ($self) {
    return {
        'reader' => undef,
        'writer' => undef,
        'socket' => undef,
        'sock_no' => 0,
        'is_busy' => 0,
        'request' => undef,
        'tx' => undef,
        'exp_ts' => 0,
        'tmp_response' => undef,
        'reconnect_is_required' => 0,
        'last_response_ts' => 0,
        'connected_ts' => 0,
    };
}

lib/MojoX/HTTP/Async.pm  view on Meta::CPAN

            my $slot = $socks2slots->{ $slot_no };
            $self->_mark_response_as_broken($slot, 520, $message);
        }
    }
}

sub _get_free_slot ($self) {

    my $slot;
    my %socks2slots = map { $_->{'sock_no'} => $_ }
                      grep { !$_->{'is_busy'} && $_->{'socket'} && !$_->{'reconnect_is_required'} }
                      $self->{'_conns'}->@*;

    if (%socks2slots) {

        local $!;
        my $write_handles = '';

        vec($write_handles, $_, 1) = 1 for keys %socks2slots;

        my $error_handles = $write_handles;

lib/MojoX/HTTP/Async.pm  view on Meta::CPAN

        if ($request) {
            $self->_send_request($slot, $request, $timeout);
            $status = 1;
        }
    }

    return $status;
}

sub _clear_slot ($self, $slot, $force = 0) {
    $slot->{'is_busy'} = 0;
    $slot->{'exp_ts'} = 0;
    $slot->{'tx'} = undef;
    $slot->{'request'} = undef;
    $slot->{'tmp_response'} = undef;
    if ($force) {
        close($slot->{'socket'}) if $slot->{'socket'};
        $slot->{'socket'} = undef;
        $slot->{'reader'} = undef;
        $slot->{'writer'} = undef;
        $slot->{'sock_no'} = 0;
        $slot->{'reconnect_is_required'} = 0;
        $slot->{'last_response_ts'} = 0;
        $slot->{'connected_ts'} = 0;
    }
}

sub _mark_slot_as_broken($self, $slot) {
    $slot->{'reconnect_is_required'} = 1;
    $slot->{'is_busy'} = 1;
    $slot->{'request'} //= Mojo::Message::Request->new();
    $slot->{'tx'} //= Mojo::Transaction::HTTP->new(
        'req' => $slot->{'request'},
        'res' => Mojo::Message::Response->new()
    );
}

sub _mark_request_as_broken ($self, $slot, $code = 520, $msg = 'Unknown Error') {
    $self->_mark_slot_as_broken($slot);li
    $slot->{'request'}->error({'message' => $msg, 'code' => $code});

lib/MojoX/HTTP/Async.pm  view on Meta::CPAN

sub _mark_request_as_timeouted ($self, $slot, $message = 'Request timeout') {
    $self->_mark_request_as_broken($slot, 524, $message);
}

sub _mark_response_as_timeouted ($self, $slot, $message = 'Request timeout') {
    $self->_mark_response_as_broken($slot, 524, $message);
}

sub _send_request ($self, $slot, $request, $timeout = undef) {

    croak("slot is busy") if ($slot->{'is_busy'});
    croak("request object is obligatory") if (!$request);
    croak('request must be a descendant of Mojo::Message::Request package') if (!$request->isa('Mojo::Message::Request'));

    my $required_scheme = $self->{'ssl'} ? 'https' : 'http';
    my $url = $request->url();
    my $uri = URI->new( $url );
    my $scheme = $url->scheme();

    if ($scheme && $required_scheme ne $scheme) {
        croak(sprintf("Wrong scheme in URI '%s'. It must correspond to the 'ssl' option", $uri->as_string()));

lib/MojoX/HTTP/Async.pm  view on Meta::CPAN

    $timeout //= $self->{'request_timeout'};

    my $response = '';
    state $default_ua_hdr = 'perl/' . __PACKAGE__;

    my $h = $request->headers();
    $h->host($self->{'host'}) if (! $h->host() );
    $h->user_agent($default_ua_hdr) if (! $h->user_agent() );

    $slot->{'request'} = $request;
    $slot->{'is_busy'} = 1;
    $slot->{'exp_ts'} = ($timeout > 0) ? ( time() + $timeout ) : 0;

    my $plain_request = $request->to_string();

    if ($self->{'ssl'}) {
        $slot->{'writer'}->print($plain_request);
    } else {
        my $socket = $slot->{'socket'};
        my $msg_len = bytes::length($plain_request);
        my $sent_bytes = 0;

lib/MojoX/HTTP/Async.pm  view on Meta::CPAN


    if ($slot->{'exp_ts'} && time() > $slot->{'exp_ts'}) {
        $self->_mark_request_as_timeouted($slot);
    }

    return;
}

sub _try_to_read ($self, $slot) {

    return if $slot->{'tx'} || ! $slot->{'is_busy'};

    my $reader = $slot->{'reader'};
    my $response = $slot->{'tmp_response'} // Mojo::Message::Response->new();

    $response->parse($_) while (<$reader>);

    if ($! && !$!{'EAGAIN'} && !$!{'EWOULDBLOCK'}) { # not a "Resourse temporary unavailable" (no data)
        $self->_mark_response_as_broken($slot, 520, $!);
    } elsif ($response && $response->code()) {

lib/MojoX/HTTP/Async.pm  view on Meta::CPAN

        $slot->{'reconnect_is_required'} = 1 if ($content->relaxed()); # responses that are terminated with a connection close
    }

    if (! $slot->{'tx'} && ($slot->{'exp_ts'} && time() > $slot->{'exp_ts'})) {
        $self->_mark_response_as_timeouted($slot);
    }
}

=head2 not_empty($self)

Returns 1 if there even one slot is busy or slot contains a not processed response.
Otherwise the method returns 0.

=cut

sub not_empty ($self) {

    my $not_empty = scalar $self->{'_conns'}->@*;

    for my $slot ($self->{'_conns'}->@*) {
        $not_empty-- if !$slot->{'is_busy'} && !$slot->{'tx'};
    }

    return $not_empty ? 1 : 0;
}


=head2 wait_for_next_response($self, $timeout = 0)

Waits for first received response or time-outed request in any slot.
Returns the C<Mojo::Transaction::HTTP> instance with result.

lib/MojoX/HTTP/Async.pm  view on Meta::CPAN

        $self->_clear_slot($slot, $slot->{'reconnect_is_required'});
    }

    return $tx;
}

sub _get_response_from_ready_slot ($self) {

    my $tx;
    my %socks2slots = map { $_->{'sock_no'} => $_ }
                      grep { ! $_->{'tx'} && ! $_->{'reconnect_is_required'} && $_->{'is_busy'} }
                      $self->{'_conns'}->@*;

    if (%socks2slots) {

        local $!;
        my $read_handles = '';

        vec($read_handles, $_, 1) = 1 for keys %socks2slots;

        my $error_handles = $read_handles;
        my ($nfound, $timeleft) = select($read_handles, undef, $error_handles, 0);

        $self->_check_for_errors(\%socks2slots, $error_handles, $!);

        for my $sock_no (keys %socks2slots) {
            my $slot = $socks2slots{ $sock_no };
            if ( $nfound && vec($read_handles, $sock_no, 1) == 1 ) {
                $self->_try_to_read($slot);
                next if ! $slot->{'tx'};
                next if ! $slot->{'is_busy'};
                $tx = $slot->{'tx'};
            } else {
                if (!$slot->{'tx'} && ($slot->{'exp_ts'} && time() > $slot->{'exp_ts'})) {
                    $self->_mark_response_as_timeouted($slot);
                    $tx = $slot->{'tx'};
                }
            }

            if ($tx) {
                $self->_clear_slot($slot, 0);

lib/MojoX/HTTP/Async.pm  view on Meta::CPAN


The class destructor.

Closes all opened sockets.

=cut

sub DESTROY ($self) {
    my $in_use = 0;
    while ( my $slot = shift($self->{'_conns'}->@*) ) {
        $in_use++ if ($slot->{'is_busy'});
        $slot->{'socket'}->close() if ($slot->{'socket'});
    }
    warn ref($self) ." object destroyed but still in use" if $in_use;
}

1;
__END__

t/inactivity_timeout.t  view on Meta::CPAN

my $server = start_server(\&on_start_cb, $host);
my $ua = MojoX::HTTP::Async->new(
    'host' => $host,
    'port' => $server->port(),
    'slots' => $slots,
    'connect_timeout' => $connect_timeout,
    'request_timeout' => $request_timeout,
    'inactivity_conn_ts' => $inactivity_timeout,
);

# one slot is free and one slot is busy
ok($ua->add("/page/01.html"), "Adding the first request");

sleep($inactivity_timeout + 0.1);

my $n = $ua->refresh_connections();

is($n, 1, "Checking the amount of renewed slots");

# both slots are busy
ok($ua->add("/page/02.html"), "Adding the second request");
ok($ua->add("/page/03.html"), "Adding the third request");

sleep($inactivity_timeout + 0.1);

$n = $ua->refresh_connections();

is($n, 2, "Checking the amount of renewed slots");

# all connections are fresh



( run in 0.426 second using v1.01-cache-2.11-cpan-87723dcf8b7 )