MojoX-HTTP-Async
view release on metacpan or search on metacpan
that GET HTTP method will be used.
$timeout
Time in seconds. Can be fractional with microseconds tolerance.
The request_timeout from conmtrucor will be used by default.
not_empty($self)
Returns 1 if there even one slot is busy or slot contains a not
processed response. Otherwise the method returns 0.
wait_for_next_response($self, $timeout = 0)
Waits for first received response or time-outed request in any slot.
Returns the Mojo::Transaction::HTTP instance with result.
$timeout
Period of time in seconds. Can be fractial with microsecond
Using of string with URI or an instance of "Mojo::URL" class assumes
that GET HTTP method will be used.
###### $timeout
Time in seconds. Can be fractional with microseconds tolerance.
The "request_timeout" from construcor will be used by default.
##### not_empty($self)
Returns 1 if there even one slot is busy or slot contains a not
processed response. Otherwise the method returns 0.
##### wait_for_next_response($self, $timeout = 0)
Waits for first received response or time-outed request in any slot.
Returns the "Mojo::Transaction::HTTP" instance with result.
###### $timeout
Period of time in seconds. Can be fractional with microsecond
tolerance. The response will be marked as time-outed after this time is out.
lib/MojoX/HTTP/Async.pm view on Meta::CPAN
sub _add_slot ($self, $slot) {
push($self->{'_conns'}->@*, $slot) if ($slot);
}
sub _make_slot ($self) {
return {
'reader' => undef,
'writer' => undef,
'socket' => undef,
'sock_no' => 0,
'is_busy' => 0,
'request' => undef,
'tx' => undef,
'exp_ts' => 0,
'tmp_response' => undef,
'reconnect_is_required' => 0,
'last_response_ts' => 0,
'connected_ts' => 0,
};
}
lib/MojoX/HTTP/Async.pm view on Meta::CPAN
my $slot = $socks2slots->{ $slot_no };
$self->_mark_response_as_broken($slot, 520, $message);
}
}
}
sub _get_free_slot ($self) {
my $slot;
my %socks2slots = map { $_->{'sock_no'} => $_ }
grep { !$_->{'is_busy'} && $_->{'socket'} && !$_->{'reconnect_is_required'} }
$self->{'_conns'}->@*;
if (%socks2slots) {
local $!;
my $write_handles = '';
vec($write_handles, $_, 1) = 1 for keys %socks2slots;
my $error_handles = $write_handles;
lib/MojoX/HTTP/Async.pm view on Meta::CPAN
if ($request) {
$self->_send_request($slot, $request, $timeout);
$status = 1;
}
}
return $status;
}
sub _clear_slot ($self, $slot, $force = 0) {
$slot->{'is_busy'} = 0;
$slot->{'exp_ts'} = 0;
$slot->{'tx'} = undef;
$slot->{'request'} = undef;
$slot->{'tmp_response'} = undef;
if ($force) {
close($slot->{'socket'}) if $slot->{'socket'};
$slot->{'socket'} = undef;
$slot->{'reader'} = undef;
$slot->{'writer'} = undef;
$slot->{'sock_no'} = 0;
$slot->{'reconnect_is_required'} = 0;
$slot->{'last_response_ts'} = 0;
$slot->{'connected_ts'} = 0;
}
}
sub _mark_slot_as_broken($self, $slot) {
$slot->{'reconnect_is_required'} = 1;
$slot->{'is_busy'} = 1;
$slot->{'request'} //= Mojo::Message::Request->new();
$slot->{'tx'} //= Mojo::Transaction::HTTP->new(
'req' => $slot->{'request'},
'res' => Mojo::Message::Response->new()
);
}
sub _mark_request_as_broken ($self, $slot, $code = 520, $msg = 'Unknown Error') {
$self->_mark_slot_as_broken($slot);li
$slot->{'request'}->error({'message' => $msg, 'code' => $code});
lib/MojoX/HTTP/Async.pm view on Meta::CPAN
sub _mark_request_as_timeouted ($self, $slot, $message = 'Request timeout') {
$self->_mark_request_as_broken($slot, 524, $message);
}
sub _mark_response_as_timeouted ($self, $slot, $message = 'Request timeout') {
$self->_mark_response_as_broken($slot, 524, $message);
}
sub _send_request ($self, $slot, $request, $timeout = undef) {
croak("slot is busy") if ($slot->{'is_busy'});
croak("request object is obligatory") if (!$request);
croak('request must be a descendant of Mojo::Message::Request package') if (!$request->isa('Mojo::Message::Request'));
my $required_scheme = $self->{'ssl'} ? 'https' : 'http';
my $url = $request->url();
my $uri = URI->new( $url );
my $scheme = $url->scheme();
if ($scheme && $required_scheme ne $scheme) {
croak(sprintf("Wrong scheme in URI '%s'. It must correspond to the 'ssl' option", $uri->as_string()));
lib/MojoX/HTTP/Async.pm view on Meta::CPAN
$timeout //= $self->{'request_timeout'};
my $response = '';
state $default_ua_hdr = 'perl/' . __PACKAGE__;
my $h = $request->headers();
$h->host($self->{'host'}) if (! $h->host() );
$h->user_agent($default_ua_hdr) if (! $h->user_agent() );
$slot->{'request'} = $request;
$slot->{'is_busy'} = 1;
$slot->{'exp_ts'} = ($timeout > 0) ? ( time() + $timeout ) : 0;
my $plain_request = $request->to_string();
if ($self->{'ssl'}) {
$slot->{'writer'}->print($plain_request);
} else {
my $socket = $slot->{'socket'};
my $msg_len = bytes::length($plain_request);
my $sent_bytes = 0;
lib/MojoX/HTTP/Async.pm view on Meta::CPAN
if ($slot->{'exp_ts'} && time() > $slot->{'exp_ts'}) {
$self->_mark_request_as_timeouted($slot);
}
return;
}
sub _try_to_read ($self, $slot) {
return if $slot->{'tx'} || ! $slot->{'is_busy'};
my $reader = $slot->{'reader'};
my $response = $slot->{'tmp_response'} // Mojo::Message::Response->new();
$response->parse($_) while (<$reader>);
if ($! && !$!{'EAGAIN'} && !$!{'EWOULDBLOCK'}) { # not a "Resourse temporary unavailable" (no data)
$self->_mark_response_as_broken($slot, 520, $!);
} elsif ($response && $response->code()) {
lib/MojoX/HTTP/Async.pm view on Meta::CPAN
$slot->{'reconnect_is_required'} = 1 if ($content->relaxed()); # responses that are terminated with a connection close
}
if (! $slot->{'tx'} && ($slot->{'exp_ts'} && time() > $slot->{'exp_ts'})) {
$self->_mark_response_as_timeouted($slot);
}
}
=head2 not_empty($self)
Returns 1 if there even one slot is busy or slot contains a not processed response.
Otherwise the method returns 0.
=cut
sub not_empty ($self) {
my $not_empty = scalar $self->{'_conns'}->@*;
for my $slot ($self->{'_conns'}->@*) {
$not_empty-- if !$slot->{'is_busy'} && !$slot->{'tx'};
}
return $not_empty ? 1 : 0;
}
=head2 wait_for_next_response($self, $timeout = 0)
Waits for first received response or time-outed request in any slot.
Returns the C<Mojo::Transaction::HTTP> instance with result.
lib/MojoX/HTTP/Async.pm view on Meta::CPAN
$self->_clear_slot($slot, $slot->{'reconnect_is_required'});
}
return $tx;
}
sub _get_response_from_ready_slot ($self) {
my $tx;
my %socks2slots = map { $_->{'sock_no'} => $_ }
grep { ! $_->{'tx'} && ! $_->{'reconnect_is_required'} && $_->{'is_busy'} }
$self->{'_conns'}->@*;
if (%socks2slots) {
local $!;
my $read_handles = '';
vec($read_handles, $_, 1) = 1 for keys %socks2slots;
my $error_handles = $read_handles;
my ($nfound, $timeleft) = select($read_handles, undef, $error_handles, 0);
$self->_check_for_errors(\%socks2slots, $error_handles, $!);
for my $sock_no (keys %socks2slots) {
my $slot = $socks2slots{ $sock_no };
if ( $nfound && vec($read_handles, $sock_no, 1) == 1 ) {
$self->_try_to_read($slot);
next if ! $slot->{'tx'};
next if ! $slot->{'is_busy'};
$tx = $slot->{'tx'};
} else {
if (!$slot->{'tx'} && ($slot->{'exp_ts'} && time() > $slot->{'exp_ts'})) {
$self->_mark_response_as_timeouted($slot);
$tx = $slot->{'tx'};
}
}
if ($tx) {
$self->_clear_slot($slot, 0);
lib/MojoX/HTTP/Async.pm view on Meta::CPAN
The class destructor.
Closes all opened sockets.
=cut
sub DESTROY ($self) {
my $in_use = 0;
while ( my $slot = shift($self->{'_conns'}->@*) ) {
$in_use++ if ($slot->{'is_busy'});
$slot->{'socket'}->close() if ($slot->{'socket'});
}
warn ref($self) ." object destroyed but still in use" if $in_use;
}
1;
__END__
t/inactivity_timeout.t view on Meta::CPAN
my $server = start_server(\&on_start_cb, $host);
my $ua = MojoX::HTTP::Async->new(
'host' => $host,
'port' => $server->port(),
'slots' => $slots,
'connect_timeout' => $connect_timeout,
'request_timeout' => $request_timeout,
'inactivity_conn_ts' => $inactivity_timeout,
);
# one slot is free and one slot is busy
ok($ua->add("/page/01.html"), "Adding the first request");
sleep($inactivity_timeout + 0.1);
my $n = $ua->refresh_connections();
is($n, 1, "Checking the amount of renewed slots");
# both slots are busy
ok($ua->add("/page/02.html"), "Adding the second request");
ok($ua->add("/page/03.html"), "Adding the third request");
sleep($inactivity_timeout + 0.1);
$n = $ua->refresh_connections();
is($n, 2, "Checking the amount of renewed slots");
# all connections are fresh
( run in 0.426 second using v1.01-cache-2.11-cpan-87723dcf8b7 )