AnyEvent-Curl-Multi
view release on metacpan or search on metacpan
lib/AnyEvent/Curl/Multi.pm view on Meta::CPAN
Specified a timeout for the request. If your WWW::Curl is linked against
libcurl 7.16.2 or later, this value can be specified in fractional seconds (ms
resolution). Otherwise, the value must be specified in whole seconds.
=item proxy => HOST[:PORT]
Specifies a proxy host/port, separated by a colon. (The port number is optional.)
=item max_redirects => COUNT
Specifies the maximum number of HTTP redirects that will be followed. Set to
0 to disable following redirects.
=back
The request() method returns an object of class AnyEvent::Curl::Multi::Handle.
This object can be used later to cancel the request; see "Canceling requests",
below.
Calling $handle->cv() will return an AnyEvent condvar that you can use as usual
(e.g., recv() or cb()) to retrieve response results, or that will croak if an
error occurs. See L<AnyEvent> for details on condvars.
=head2 Callbacks
Instead of using condvars, you may register interest in the following events
using the client's reg_cb() method (see Object::Event for more details on
reg_cb()):
=over
=item response => $cb->($client, $request, $response, $stats);
Fired when a response is received. (This doesn't imply that the response is
HTTP OK, so you should examine the response to determine whether there was
an HTTP error of some sort.)
The arguments sent to your callback will be the client object, the original
request (untampered with), the response (as an HTTP::Response object), and a
hashref containing some interesting statistics.
=item error => $cb->($client, $request, $errmsg, $stats);
Fired when an error is received.
The arguments sent to your callback will be the client object, the original
request (untampered with), the error message, and a hashref containing some
interesting statistics. (If the error was other than a timeout, the statistics
may be invalid.)
=back
=cut
sub new {
my $class = shift;
my $self = $class->SUPER::new(
multi_h => WWW::Curl::Multi->new,
state => {},
timer_w => undef,
io_w => {},
queue => [],
max_concurrency => 0,
max_redirects => 0,
timeout => undef,
proxy => undef,
debug => undef,
ipresolve => undef,
@_
);
if (! $MS_TIMEOUT_SUPPORTED
&& $self->{timeout}
&& $self->{timeout} != int($self->{timeout})) {
croak "Subsecond timeout resolution is not supported by your " .
"libcurl version. Upgrade to 7.16.2 or later.";
}
return bless $self, $class;
}
sub request {
my $self = shift;
my ($req, %opts) = @_;
my $easy_h;
if ($req->isa("HTTP::Request")) {
# Convert to WWW::Curl::Easy
$easy_h = $self->_gen_easy_h($req, %opts);
} else {
croak "Unsupported request type";
}
# Initialize easy curl handle
my $id = refaddr $easy_h;
my ($response, $header);
$easy_h->setopt(CURLOPT_WRITEDATA, \$response);
$easy_h->setopt(CURLOPT_WRITEHEADER, \$header);
$easy_h->setopt(CURLOPT_PRIVATE, $id);
my $obj = {
easy_h => $easy_h,
req => $req,
response => \$response,
header => \$header,
cv => AE::cv,
};
push @{$self->{queue}}, $obj;
$self->_dequeue;
return bless $obj, 'AnyEvent::Curl::Multi::Handle';
}
sub _dequeue {
my $self = shift;
while ($self->{max_concurrency} == 0 ||
scalar keys %{$self->{state}} < $self->{max_concurrency}) {
if (my $dequeued = shift @{$self->{queue}}) {
$self->{state}->{refaddr($dequeued->{easy_h})} = $dequeued;
# Add it to our multi handle
$self->{multi_h}->add_handle($dequeued->{easy_h});
} else {
last;
}
}
# Start our timer
$self->{timer_w} = AE::timer(0, 0.5, sub { $self->_perform });
}
sub _perform {
my $self = shift;
$self->{multi_h}->perform;
while (my ($id, $rv) = $self->{multi_h}->info_read) {
if ($id) {
my $state = $self->{state}->{$id};
my $req = $state->{req};
my $easy_h = $state->{easy_h};
my $stats = {
total_time => $easy_h->getinfo(CURLINFO_TOTAL_TIME),
dns_time => $easy_h->getinfo(CURLINFO_NAMELOOKUP_TIME),
connect_time => $easy_h->getinfo(CURLINFO_CONNECT_TIME),
start_transfer_time =>
$easy_h->getinfo(CURLINFO_STARTTRANSFER_TIME),
download_bytes =>
$easy_h->getinfo(CURLINFO_SIZE_DOWNLOAD),
upload_bytes => $easy_h->getinfo(CURLINFO_SIZE_UPLOAD),
};
if ($rv) {
# Error
$state->{cv}->croak($easy_h->errbuf);
$req->event('error', $easy_h->errbuf, $stats)
if $req->can('event');
$self->event('error', $req, $easy_h->errbuf, $stats);
} else {
# libcurl appends subsequent response headers to the buffer
# when following redirects. We need to remove all but the
# most recent header before we parse the response.
my $last_header = (split(/\r?\n\r?\n/,
${$state->{header}}))[-1];
my $response = HTTP::Response->parse($last_header .
"\n\n" .
${$state->{response}});
$req->uri($easy_h->getinfo(CURLINFO_EFFECTIVE_URL));
$response->request($req);
$state->{cv}->send($response, $stats);
$req->event('response', $response, $stats)
if $req->can('event');
$self->event('response', $req, $response, $stats);
}
delete $self->{state}->{$id};
$self->_dequeue;
}
}
# We must recalculate the number of active handles here, because
# a user-provided callback may have added a new one.
my $active_handles = scalar keys %{$self->{state}};
if (! $active_handles) {
# Nothing left to do - no point keeping the watchers around anymore.
delete $self->{timer_w};
delete $self->{io_w};
return;
}
# Re-establish all I/O watchers
foreach my $fd (keys %{$self->{io_w}}) {
delete $self->{io_w}->{$fd};
}
my ($readfds, $writefds, $errfds) = $self->{multi_h}->fdset;
foreach my $fd (@$writefds) {
$self->{io_w}->{$fd} ||= AE::io($fd, 1, sub { $self->_perform });
}
foreach my $fd (@$readfds) {
$self->{io_w}->{$fd} ||= AE::io($fd, 0, sub { $self->_perform });
}
}
sub _gen_easy_h {
my $self = shift;
my $req = shift;
my %opts = @_;
my $easy_h = WWW::Curl::Easy->new;
$easy_h->setopt(CURLOPT_URL, $req->uri);
$easy_h->setopt(CURLOPT_SSL_VERIFYPEER, 0);
$easy_h->setopt(CURLOPT_DNS_CACHE_TIMEOUT, 0);
if (defined $self->{ipresolve}) {
if (int($self->{ipresolve}) == 4) {
$easy_h->setopt(CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
} elsif (int($self->{ipresolve}) == 6) {
$easy_h->setopt(CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V6);
} else {
die "Invalid ipresolve setting '$self->{ipresolve}' (must be 4 or 6)";
}
}
$easy_h->setopt(CURLOPT_CUSTOMREQUEST, $req->method);
$easy_h->setopt(CURLOPT_HTTPHEADER,
[ split "\n", $req->headers->as_string ]);
if (length $req->content) {
$easy_h->setopt(CURLOPT_POSTFIELDS, $req->content);
$easy_h->setopt(CURLOPT_POSTFIELDSIZE, length $req->content);
}
# Accept gzip or deflate-compressed responses
$easy_h->setopt(CURLOPT_ENCODING, "");
$easy_h->setopt(CURLOPT_VERBOSE, 1) if $self->{debug} || $opts{debug};
my $proxy = $self->{proxy} || $opts{proxy};
$easy_h->setopt(CURLOPT_PROXY, $proxy) if $proxy;
my $timeout = $self->{timeout} || $opts{timeout};
if ($timeout) {
if ($timeout == int($timeout)) {
( run in 1.917 second using v1.01-cache-2.11-cpan-39bf76dae61 )