MojoX-HTTP-Async
view release on metacpan or search on metacpan
188189190191192193194195196197198199200201202203204205206207208
that GET HTTP method will be used.
$timeout
Time in seconds. Can be fractional
with
microseconds tolerance.
The request_timeout from conmtrucor will be used by
default
.
not_empty(
$self
)
Returns 1
if
there even one slot is busy or slot contains a not
processed response. Otherwise the method returns 0.
wait_for_next_response(
$self
,
$timeout
= 0)
Waits
for
first received response or
time
-outed request in any slot.
Returns the Mojo::Transaction::HTTP instance
with
result.
$timeout
Period of
time
in seconds. Can be fractial
with
microsecond
176177178179180181182183184185186187188189190191192193194195
Using of string
with
URI or an instance of
"Mojo::URL"
class assumes
that GET HTTP method will be used.
###### $timeout
Time in seconds. Can be fractional
with
microseconds tolerance.
The
"request_timeout"
from construcor will be used by
default
.
##### not_empty($self)
Returns 1
if
there even one slot is busy or slot contains a not
processed response. Otherwise the method returns 0.
##### wait_for_next_response($self, $timeout = 0)
Waits
for
first received response or
time
-outed request in any slot.
Returns the
"Mojo::Transaction::HTTP"
instance
with
result.
###### $timeout
Period of
time
in seconds. Can be fractional
with
microsecond
tolerance. The response will be marked as
time
-outed
after
this
time
is out.
lib/MojoX/HTTP/Async.pm view on Meta::CPAN
298299300301302303304305306307308309310311312313314315316317sub
_add_slot (
$self
,
$slot
) {
push
(
$self
->{
'_conns'
}->@*,
$slot
)
if
(
$slot
);
}
sub
_make_slot (
$self
) {
return
{
'reader'
=>
undef
,
'writer'
=>
undef
,
'socket'
=>
undef
,
'sock_no'
=> 0,
'is_busy'
=> 0,
'request'
=>
undef
,
'tx'
=>
undef
,
'exp_ts'
=> 0,
'tmp_response'
=>
undef
,
'reconnect_is_required'
=> 0,
'last_response_ts'
=> 0,
'connected_ts'
=> 0,
};
}
lib/MojoX/HTTP/Async.pm view on Meta::CPAN
329330331332333334335336337338339340341342343344345346347348349
my
$slot
=
$socks2slots
->{
$slot_no
};
$self
->_mark_response_as_broken(
$slot
, 520,
$message
);
}
}
}
sub
_get_free_slot (
$self
) {
my
$slot
;
my
%socks2slots
=
map
{
$_
->{
'sock_no'
} =>
$_
}
grep
{ !
$_
->{
'is_busy'
} &&
$_
->{
'socket'
} && !
$_
->{
'reconnect_is_required'
} }
$self
->{
'_conns'
}->@*;
if
(
%socks2slots
) {
local
$!;
my
$write_handles
=
''
;
vec
(
$write_handles
,
$_
, 1) = 1
for
keys
%socks2slots
;
my
$error_handles
=
$write_handles
;
lib/MojoX/HTTP/Async.pm view on Meta::CPAN
425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464
if
(
$request
) {
$self
->_send_request(
$slot
,
$request
,
$timeout
);
$status
= 1;
}
}
return
$status
;
}
sub
_clear_slot (
$self
,
$slot
,
$force
= 0) {
$slot
->{
'is_busy'
} = 0;
$slot
->{
'exp_ts'
} = 0;
$slot
->{
'tx'
} =
undef
;
$slot
->{
'request'
} =
undef
;
$slot
->{
'tmp_response'
} =
undef
;
if
(
$force
) {
close
(
$slot
->{
'socket'
})
if
$slot
->{
'socket'
};
$slot
->{
'socket'
} =
undef
;
$slot
->{
'reader'
} =
undef
;
$slot
->{
'writer'
} =
undef
;
$slot
->{
'sock_no'
} = 0;
$slot
->{
'reconnect_is_required'
} = 0;
$slot
->{
'last_response_ts'
} = 0;
$slot
->{
'connected_ts'
} = 0;
}
}
sub
_mark_slot_as_broken(
$self
,
$slot
) {
$slot
->{
'reconnect_is_required'
} = 1;
$slot
->{
'is_busy'
} = 1;
$slot
->{
'request'
} //= Mojo::Message::Request->new();
$slot
->{
'tx'
} //= Mojo::Transaction::HTTP->new(
'req'
=>
$slot
->{
'request'
},
'res'
=> Mojo::Message::Response->new()
);
}
sub
_mark_request_as_broken (
$self
,
$slot
,
$code
= 520,
$msg
=
'Unknown Error'
) {
$self
->_mark_slot_as_broken(
$slot
);li
$slot
->{
'request'
}->error({
'message'
=>
$msg
,
'code'
=>
$code
});
lib/MojoX/HTTP/Async.pm view on Meta::CPAN
477478479480481482483484485486487488489490491492493494495496497sub
_mark_request_as_timeouted (
$self
,
$slot
,
$message
=
'Request timeout'
) {
$self
->_mark_request_as_broken(
$slot
, 524,
$message
);
}
sub
_mark_response_as_timeouted (
$self
,
$slot
,
$message
=
'Request timeout'
) {
$self
->_mark_response_as_broken(
$slot
, 524,
$message
);
}
sub
_send_request (
$self
,
$slot
,
$request
,
$timeout
=
undef
) {
croak(
"slot is busy"
)
if
(
$slot
->{
'is_busy'
});
croak(
"request object is obligatory"
)
if
(!
$request
);
croak(
'request must be a descendant of Mojo::Message::Request package'
)
if
(!
$request
->isa(
'Mojo::Message::Request'
));
my
$required_scheme
=
$self
->{
'ssl'
} ?
'https'
:
'http'
;
my
$url
=
$request
->url();
my
$uri
= URI->new(
$url
);
my
$scheme
=
$url
->scheme();
if
(
$scheme
&&
$required_scheme
ne
$scheme
) {
croak(
sprintf
(
"Wrong scheme in URI '%s'. It must correspond to the 'ssl' option"
,
$uri
->as_string()));
lib/MojoX/HTTP/Async.pm view on Meta::CPAN
511512513514515516517518519520521522523524525526527528529530531$timeout
//=
$self
->{
'request_timeout'
};
my
$response
=
''
;
state
$default_ua_hdr
=
'perl/'
. __PACKAGE__;
my
$h
=
$request
->headers();
$h
->host(
$self
->{
'host'
})
if
(!
$h
->host() );
$h
->user_agent(
$default_ua_hdr
)
if
(!
$h
->user_agent() );
$slot
->{
'request'
} =
$request
;
$slot
->{
'is_busy'
} = 1;
$slot
->{
'exp_ts'
} = (
$timeout
> 0) ? (
time
() +
$timeout
) : 0;
my
$plain_request
=
$request
->to_string();
if
(
$self
->{
'ssl'
}) {
$slot
->{
'writer'
}->
(
$plain_request
);
}
else
{
my
$socket
=
$slot
->{
'socket'
};
my
$msg_len
= bytes::
length
(
$plain_request
);
my
$sent_bytes
= 0;
lib/MojoX/HTTP/Async.pm view on Meta::CPAN
555556557558559560561562563564565566567568569570571572573574
if
(
$slot
->{
'exp_ts'
} &&
time
() >
$slot
->{
'exp_ts'
}) {
$self
->_mark_request_as_timeouted(
$slot
);
}
return
;
}
sub
_try_to_read (
$self
,
$slot
) {
return
if
$slot
->{
'tx'
} || !
$slot
->{
'is_busy'
};
my
$reader
=
$slot
->{
'reader'
};
my
$response
=
$slot
->{
'tmp_response'
} // Mojo::Message::Response->new();
$response
->parse(
$_
)
while
(<
$reader
>);
if
($! && !$!{
'EAGAIN'
} && !$!{
'EWOULDBLOCK'
}) {
# not a "Resourse temporary unavailable" (no data)
$self
->_mark_response_as_broken(
$slot
, 520, $!);
}
elsif
(
$response
&&
$response
->code()) {
lib/MojoX/HTTP/Async.pm view on Meta::CPAN
589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619
$slot
->{
'reconnect_is_required'
} = 1
if
(
$content
->relaxed());
# responses that are terminated with a connection close
}
if
(!
$slot
->{
'tx'
} && (
$slot
->{
'exp_ts'
} &&
time
() >
$slot
->{
'exp_ts'
})) {
$self
->_mark_response_as_timeouted(
$slot
);
}
}
=head2 not_empty($self)
Returns 1 if there even one slot is busy or slot contains a not processed response.
Otherwise the method returns 0.
=cut
sub
not_empty (
$self
) {
my
$not_empty
=
scalar
$self
->{
'_conns'
}->@*;
for
my
$slot
(
$self
->{
'_conns'
}->@*) {
$not_empty
--
if
!
$slot
->{
'is_busy'
} && !
$slot
->{
'tx'
};
}
return
$not_empty
? 1 : 0;
}
=head2 wait_for_next_response($self, $timeout = 0)
Waits for first received response or time-outed request in any slot.
Returns the C<Mojo::Transaction::HTTP> instance with result.
lib/MojoX/HTTP/Async.pm view on Meta::CPAN
668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708
$self
->_clear_slot(
$slot
,
$slot
->{
'reconnect_is_required'
});
}
return
$tx
;
}
sub
_get_response_from_ready_slot (
$self
) {
my
$tx
;
my
%socks2slots
=
map
{
$_
->{
'sock_no'
} =>
$_
}
grep
{ !
$_
->{
'tx'
} && !
$_
->{
'reconnect_is_required'
} &&
$_
->{
'is_busy'
} }
$self
->{
'_conns'
}->@*;
if
(
%socks2slots
) {
local
$!;
my
$read_handles
=
''
;
vec
(
$read_handles
,
$_
, 1) = 1
for
keys
%socks2slots
;
my
$error_handles
=
$read_handles
;
my
(
$nfound
,
$timeleft
) =
select
(
$read_handles
,
undef
,
$error_handles
, 0);
$self
->_check_for_errors(\
%socks2slots
,
$error_handles
, $!);
for
my
$sock_no
(
keys
%socks2slots
) {
my
$slot
=
$socks2slots
{
$sock_no
};
if
(
$nfound
&&
vec
(
$read_handles
,
$sock_no
, 1) == 1 ) {
$self
->_try_to_read(
$slot
);
next
if
!
$slot
->{
'tx'
};
next
if
!
$slot
->{
'is_busy'
};
$tx
=
$slot
->{
'tx'
};
}
else
{
if
(!
$slot
->{
'tx'
} && (
$slot
->{
'exp_ts'
} &&
time
() >
$slot
->{
'exp_ts'
})) {
$self
->_mark_response_as_timeouted(
$slot
);
$tx
=
$slot
->{
'tx'
};
}
}
if
(
$tx
) {
$self
->_clear_slot(
$slot
, 0);
lib/MojoX/HTTP/Async.pm view on Meta::CPAN
778779780781782783784785786787788789790791792793794795The class destructor.
Closes all opened sockets.
=cut
sub DESTROY ($self) {
my $in_use = 0;
while ( my $slot = shift($self->{'_conns'}->@*) ) {
$in_use++ if ($slot->{'is_busy'});
$slot->{'socket'}->close() if ($slot->{'socket'});
}
warn ref($self) ." object destroyed but still in use" if $in_use;
}
1;
__END__
t/inactivity_timeout.t view on Meta::CPAN
505152535455565758596061626364656667686970717273747576777879my
$server
= start_server(\
&on_start_cb
,
$host
);
my
$ua
= MojoX::HTTP::Async->new(
'host'
=>
$host
,
'port'
=>
$server
->port(),
'slots'
=>
$slots
,
'connect_timeout'
=>
$connect_timeout
,
'request_timeout'
=>
$request_timeout
,
'inactivity_conn_ts'
=>
$inactivity_timeout
,
);
# one slot is free and one slot is busy
ok(
$ua
->add(
"/page/01.html"
),
"Adding the first request"
);
sleep
(
$inactivity_timeout
+ 0.1);
my
$n
=
$ua
->refresh_connections();
is(
$n
, 1,
"Checking the amount of renewed slots"
);
# both slots are busy
ok(
$ua
->add(
"/page/02.html"
),
"Adding the second request"
);
ok(
$ua
->add(
"/page/03.html"
),
"Adding the third request"
);
sleep
(
$inactivity_timeout
+ 0.1);
$n
=
$ua
->refresh_connections();
is(
$n
, 2,
"Checking the amount of renewed slots"
);
# all connections are fresh
( run in 0.264 second using v1.01-cache-2.11-cpan-d6f9594c0a5 )