AnyEvent
view release on metacpan or search on metacpan
lib/AnyEvent/DNS.pm view on Meta::CPAN
$id->[1]->($res);
}
sub _recv {
my ($self, $pkt, $peer) = @_;
# we ignore errors (often one gets port unreachable, but there is
# no good way to take advantage of that.
my ($port, $host) = AnyEvent::Socket::unpack_sockaddr ($peer);
return unless $port == DOMAIN_PORT && grep $_ eq $host, @{ $self->{server} };
$self->_feed ($pkt);
}
sub _free_id {
my ($self, $id, $timeout) = @_;
if ($timeout) {
# we need to block the id for a while
$self->{id}{$id} = 1;
push @{ $self->{reuse_q} }, [$NOW + $self->{reuse}, $id];
} else {
# we can quickly recycle the id
delete $self->{id}{$id};
}
--$self->{outstanding};
$self->_scheduler;
}
# execute a single request, involves sending it with timeouts to multiple servers
sub _exec {
my ($self, $req) = @_;
my $retry; # of retries
my $do_retry;
$do_retry = sub {
my $retry_cfg = $self->{retry}[$retry++]
or do {
# failure
$self->_free_id ($req->[2], $retry > 1);
undef $do_retry; return $req->[1]->();
};
my ($server, $timeout) = @$retry_cfg;
$self->{id}{$req->[2]} = [(AE::timer $timeout, 0, sub {
$NOW = time;
# timeout, try next
&$do_retry if $do_retry;
}), sub {
my ($res) = @_;
if ($res->{tc}) {
# success, but truncated, so use tcp
AnyEvent::Socket::tcp_connect (AnyEvent::Socket::format_address ($server), DOMAIN_PORT, sub {
return unless $do_retry; # some other request could have invalidated us already
my ($fh) = @_
or return &$do_retry;
require AnyEvent::Handle;
my $handle; $handle = new AnyEvent::Handle
fh => $fh,
timeout => $timeout,
on_error => sub {
undef $handle;
return unless $do_retry; # some other request could have invalidated us already
# failure, try next
&$do_retry;
};
$handle->push_write (pack "n/a*", $req->[0]);
$handle->push_read (chunk => 2, sub {
$handle->unshift_read (chunk => (unpack "n", $_[1]), sub {
undef $handle;
$self->_feed ($_[1]);
});
});
}, sub { $timeout });
} else {
# success
$self->_free_id ($req->[2], $retry > 1);
undef $do_retry; return $req->[1]->($res);
}
}];
my $sa = AnyEvent::Socket::pack_sockaddr (DOMAIN_PORT, $server);
my $fh = AF_INET == AnyEvent::Socket::sockaddr_family ($sa)
? $self->{fh4} : $self->{fh6}
or return &$do_retry;
send $fh, $req->[0], 0, $sa;
};
&$do_retry;
}
sub _scheduler {
my ($self) = @_;
return if $self->{inhibit};
#no strict 'refs';
$NOW = time;
# first clear id reuse queue
delete $self->{id}{ (shift @{ $self->{reuse_q} })->[1] }
while @{ $self->{reuse_q} } && $self->{reuse_q}[0][0] <= $NOW;
while ($self->{outstanding} < $self->{max_outstanding}) {
if (@{ $self->{reuse_q} } >= 30000) {
# we ran out of ID's, wait a bit
$self->{reuse_to} ||= AE::timer $self->{reuse_q}[0][0] - $NOW, 0, sub {
delete $self->{reuse_to};
$self->_scheduler;
};
last;
}
if (my $req = shift @{ $self->{queue} }) {
# found a request in the queue, execute it
while () {
( run in 0.819 second using v1.01-cache-2.11-cpan-140bd7fdf52 )