AnyEvent-SNMP
view release on metacpan or search on metacpan
package AnyEvent::SNMP;
use common::sense;
# it is possible to do this without loading
# Net::SNMP::Dispatcher, but much more awkward.
use Net::SNMP::Dispatcher;
# we could inherit fro Net:SNMP::Dispatcher, but since this is undocumented,
# I'd rather see it die (and reported) than silenty and subtly fail.
*msg_handle_alloc = \&Net::SNMP::Dispatcher::msg_handle_alloc;
sub Net::SNMP::Dispatcher::instance {
AnyEvent::SNMP::
}
use Net::SNMP ();
use AnyEvent ();
our $VERSION = '6.02';
$Net::SNMP::DISPATCHER = instance Net::SNMP::Dispatcher;
our $MESSAGE_PROCESSING = $Net::SNMP::Dispatcher::MESSAGE_PROCESSING;
our $BUSY;
our $DONE; # finished all jobs
our @TRANSPORT; # fileno => [count, watcher]
our @QUEUE;
our $MAX_OUTSTANDING = 50;
our $MIN_RECVQUEUE = 8;
our $MAX_RECVQUEUE = 64;
sub kick_job;
sub _send_pdu {
my ($pdu, $retries) = @_;
# mostly copied from Net::SNMP::Dispatch
# Pass the PDU to Message Processing so that it can
# create the new outgoing message.
my $msg = $MESSAGE_PROCESSING->prepare_outgoing_msg ($pdu);
if (!defined $msg) {
--$BUSY;
kick_job;
# Inform the command generator about the Message Processing error.
$pdu->status_information ($MESSAGE_PROCESSING->error);
return;
}
# Actually send the message.
if (!defined $msg->send) {
$MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id)
if $pdu->expect_response;
# A crude attempt to recover from temporary failures.
if ($retries-- > 0 && ($!{EAGAIN} || $!{EWOULDBLOCK} || $!{ENOSPC})) {
my $retry_w; $retry_w = AE::timer $pdu->timeout, 0, sub {
undef $retry_w;
_send_pdu ($pdu, $retries);
};
} else {
--$BUSY;
kick_job;
}
# Inform the command generator about the send() error.
$pdu->status_information ($msg->error);
return;
}
# Schedule the timeout handler if the message expects a response.
if ($pdu->expect_response) {
my $transport = $msg->transport;
my $fileno = $transport->fileno;
# register the transport
unless ($TRANSPORT[$fileno][0]++) {
$TRANSPORT[$fileno][1] = AE::io $transport->socket, 0, sub {
for my $count (1..$MAX_RECVQUEUE) { # handle up to this many requests in one go
# Create a new Message object to receive the response
my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport);
if (!defined $msg) {
die sprintf 'Failed to create Message object [%s]', $error;
}
# Read the message from the Transport Layer
if (!defined $msg->recv) {
if ($transport->connectionless) {
# if we handled very few replies and we have queued work, try
# to increase the parallelity as we probably can handle more.
if ($count < $MIN_RECVQUEUE && @QUEUE) {
++$MAX_OUTSTANDING;
kick_job;
}
} else {
# for some reason, connected-oriented transports seem to need this
delete $TRANSPORT[$fileno]
unless --$TRANSPORT[$fileno][0];
}
$msg->error;
return;
}
# For connection-oriented Transport Domains, it is possible to
# "recv" an empty buffer if reassembly is required.
if (!$msg->length) {
return;
}
# Hand the message over to Message Processing.
if (!defined $MESSAGE_PROCESSING->prepare_data_elements ($msg)) {
$MESSAGE_PROCESSING->error;
return;
}
# Set the error if applicable.
$msg->error ($MESSAGE_PROCESSING->error) if $MESSAGE_PROCESSING->error;
# Notify the command generator to process the response.
# Net::SNMP calls process_response_pdu, which simply calls callback_execute,
# but some errors cause $msg to be of type Net::SNMP::Message, not Net::SMMP::PDU,
# so we call the underlying callback_execute method which exists on both and
# seems to do the right thing.
$msg->callback_execute;
# Cancel the timeout.
my $rtimeout_w = $msg->timeout_id;
if ($$rtimeout_w) {
undef $$rtimeout_w;
--$BUSY;
kick_job;
unless (--$TRANSPORT[$fileno][0]) {
delete $TRANSPORT[$fileno];
return;
}
}
}
# when we end up here, we successfully handled $MAX_RECVQUEUE
# replies in one iteration, so assume we are overloaded
# and reduce the amount of parallelity.
$MAX_OUTSTANDING = (int $MAX_OUTSTANDING * 0.95) || 1;
};
}
$msg->timeout_id (\(my $rtimeout_w =
AE::timer $pdu->timeout, 0, sub {
my $rtimeout_w = $msg->timeout_id;
if ($$rtimeout_w) {
undef $$rtimeout_w;
delete $TRANSPORT[$fileno]
unless --$TRANSPORT[$fileno][0];
}
if ($retries--) {
_send_pdu ($pdu, $retries);
} else {
$MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id);
$pdu->status_information ("No response from remote host '%s'", $pdu->hostname);
--$BUSY;
kick_job;
}
})
);
} else {
--$BUSY;
kick_job;
}
}
sub kick_job {
while ($BUSY < $MAX_OUTSTANDING) {
my $pdu = shift @QUEUE
or last;
++$BUSY;
_send_pdu $pdu, $pdu->retries;
}
$DONE and $DONE->() unless $BUSY;
}
sub send_pdu($$$) {
my (undef, $pdu, $delay) = @_;
# $delay is not very sensibly implemented by AnyEvent::SNMP,
# but apparently it is not a very sensible feature.
if ($delay > 0) {
++$BUSY;
my $delay_w; $delay_w = AE::timer $delay, 0, sub {
undef $delay_w;
push @QUEUE, $pdu;
--$BUSY;
kick_job;
};
return 1;
}
push @QUEUE, $pdu;
kick_job;
1
}
sub loop($) {
while ($BUSY) {
$DONE = AE::cv;
$DONE->recv;
undef $DONE;
}
}
*activate = \&loop; # 5.x compatibility?
*listen = \&loop; # 5.x compatibility?
sub one_event($) {
# should not ever be used
AnyEvent->one_event; #d# todo
}
sub set_max_outstanding($) {
$MAX_OUTSTANDING = $_[0];
kick_job;
}
# not provided yet:
# schedule # apparently only used by Net::SNMP::Dispatcher itself
# register # apparently only used by Net::SNMP::Dispatcher itself
# deregister # apparently only used by Net::SNMP::Dispatcher itself
# cancel # apparently only used by Net::SNMP::Dispatcher itself
# return_response_pdu # apparently not used at all?
# error # only used by Net::SNMP::Dispatcher itself?
# debug # only used by Net::SNMP::Dispatcher itself?
=head1 SEE ALSO
L<AnyEvent>, L<Net::SNMP>, L<Net::SNMP::XS>, L<Net::SNMP::EV>.
=head1 AUTHOR
Marc Lehmann <schmorp@schmorp.de>
http://home.schmorp.de/
=cut
1
( run in 0.755 second using v1.01-cache-2.11-cpan-39bf76dae61 )