AnyEvent-SNMP

 view release on metacpan or  search on metacpan

SNMP.pm  view on Meta::CPAN


package AnyEvent::SNMP;

use common::sense;

# it is possible to do this without loading
# Net::SNMP::Dispatcher, but much more awkward.
use Net::SNMP::Dispatcher;

# we could inherit fro Net:SNMP::Dispatcher, but since this is undocumented,
# I'd rather see it die (and reported) than silenty and subtly fail.
*msg_handle_alloc = \&Net::SNMP::Dispatcher::msg_handle_alloc;

sub Net::SNMP::Dispatcher::instance {
   AnyEvent::SNMP::
}

use Net::SNMP ();
use AnyEvent ();

our $VERSION = '6.02';

$Net::SNMP::DISPATCHER = instance Net::SNMP::Dispatcher;

our $MESSAGE_PROCESSING = $Net::SNMP::Dispatcher::MESSAGE_PROCESSING;

our $BUSY;
our $DONE; # finished all jobs
our @TRANSPORT; # fileno => [count, watcher]
our @QUEUE;
our $MAX_OUTSTANDING = 50;
our $MIN_RECVQUEUE   =  8;
our $MAX_RECVQUEUE   = 64;

sub kick_job;

sub _send_pdu {
   my ($pdu, $retries) = @_;

   # mostly copied from Net::SNMP::Dispatch

   # Pass the PDU to Message Processing so that it can
   # create the new outgoing message.
   my $msg = $MESSAGE_PROCESSING->prepare_outgoing_msg ($pdu);

   if (!defined $msg) {
      --$BUSY;
      kick_job;
      # Inform the command generator about the Message Processing error.
      $pdu->status_information ($MESSAGE_PROCESSING->error);
      return; 
   }

   # Actually send the message.
   if (!defined $msg->send) {
      $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id)
         if $pdu->expect_response;

      # A crude attempt to recover from temporary failures.
      if ($retries-- > 0 && ($!{EAGAIN} || $!{EWOULDBLOCK} || $!{ENOSPC})) {
         my $retry_w; $retry_w = AE::timer $pdu->timeout, 0, sub {
            undef $retry_w;
            _send_pdu ($pdu, $retries);
         };
      } else {
         --$BUSY;
         kick_job;
      }

      # Inform the command generator about the send() error.
      $pdu->status_information ($msg->error);
      return;
   }

   # Schedule the timeout handler if the message expects a response.
   if ($pdu->expect_response) {
      my $transport = $msg->transport;
      my $fileno    = $transport->fileno;

      # register the transport
      unless ($TRANSPORT[$fileno][0]++) {
         $TRANSPORT[$fileno][1] = AE::io $transport->socket, 0, sub {
            for my $count (1..$MAX_RECVQUEUE) { # handle up to this many requests in one go
               # Create a new Message object to receive the response
               my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport);

               if (!defined $msg) {
                  die sprintf 'Failed to create Message object [%s]', $error;
               }

               # Read the message from the Transport Layer
               if (!defined $msg->recv) {
                  if ($transport->connectionless) {
                     # if we handled very few replies and we have queued work, try
                     # to increase the parallelity as we probably can handle more.
                     if ($count < $MIN_RECVQUEUE && @QUEUE) {
                        ++$MAX_OUTSTANDING;
                        kick_job;
                     }
                  } else {
                     # for some reason, connected-oriented transports seem to need this
                     delete $TRANSPORT[$fileno]
                        unless --$TRANSPORT[$fileno][0];
                  }

                  $msg->error;
                  return;
               }

               # For connection-oriented Transport Domains, it is possible to
               # "recv" an empty buffer if reassembly is required.
               if (!$msg->length) {
                  return;
               }

               # Hand the message over to Message Processing.
               if (!defined $MESSAGE_PROCESSING->prepare_data_elements ($msg)) {
                  $MESSAGE_PROCESSING->error;
                  return;
               }

               # Set the error if applicable.
               $msg->error ($MESSAGE_PROCESSING->error) if $MESSAGE_PROCESSING->error;

               # Notify the command generator to process the response.
               # Net::SNMP calls process_response_pdu, which simply calls callback_execute,
               # but some errors cause $msg to be of type Net::SNMP::Message, not Net::SMMP::PDU,
               # so we call the underlying callback_execute method which exists on both and
               # seems to do the right thing.
               $msg->callback_execute;

               # Cancel the timeout.
               my $rtimeout_w = $msg->timeout_id;
               if ($$rtimeout_w) {
                  undef $$rtimeout_w;

                  --$BUSY;
                  kick_job;

                  unless (--$TRANSPORT[$fileno][0]) {
                     delete $TRANSPORT[$fileno];
                     return;
                  }
               }
            }

            # when we end up here, we successfully handled $MAX_RECVQUEUE
            # replies in one iteration, so assume we are overloaded
            # and reduce the amount of parallelity.
            $MAX_OUTSTANDING = (int $MAX_OUTSTANDING * 0.95) || 1;
         };
      }

      $msg->timeout_id (\(my $rtimeout_w =
         AE::timer $pdu->timeout, 0, sub {
            my $rtimeout_w = $msg->timeout_id;
            if ($$rtimeout_w) {
               undef $$rtimeout_w;
               delete $TRANSPORT[$fileno]
                  unless --$TRANSPORT[$fileno][0];
            }

            if ($retries--) {
               _send_pdu ($pdu, $retries);
            } else {
               $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id);
               $pdu->status_information ("No response from remote host '%s'", $pdu->hostname);

               --$BUSY;
               kick_job;
            }
         })
      ); 
   } else {
     --$BUSY;
     kick_job;
   }
}

sub kick_job {
   while ($BUSY < $MAX_OUTSTANDING) {
      my $pdu = shift @QUEUE
         or last;

      ++$BUSY;
      _send_pdu $pdu, $pdu->retries;
   }

   $DONE and $DONE->() unless $BUSY;
}

sub send_pdu($$$) {
   my (undef, $pdu, $delay) = @_;

   # $delay is not very sensibly implemented by AnyEvent::SNMP,
   # but apparently it is not a very sensible feature.
   if ($delay > 0) {
      ++$BUSY;
      my $delay_w; $delay_w = AE::timer $delay, 0, sub {
         undef $delay_w;
         push @QUEUE, $pdu;
         --$BUSY;
         kick_job;
      };
      return 1;
   }

   push @QUEUE, $pdu;
   kick_job;

   1
}

sub loop($) {
   while ($BUSY) {
      $DONE = AE::cv;
      $DONE->recv;
      undef $DONE;
   }
}

*activate = \&loop; # 5.x compatibility?
*listen   = \&loop; # 5.x compatibility?

sub one_event($) {
   # should not ever be used
   AnyEvent->one_event; #d# todo
}

sub set_max_outstanding($) {
   $MAX_OUTSTANDING = $_[0];
   kick_job;
}

# not provided yet:
# schedule            # apparently only used by Net::SNMP::Dispatcher itself
# register            # apparently only used by Net::SNMP::Dispatcher itself
# deregister          # apparently only used by Net::SNMP::Dispatcher itself
# cancel              # apparently only used by Net::SNMP::Dispatcher itself
# return_response_pdu # apparently not used at all?
# error               # only used by Net::SNMP::Dispatcher itself?
# debug               # only used by Net::SNMP::Dispatcher itself?

=head1 SEE ALSO

L<AnyEvent>, L<Net::SNMP>, L<Net::SNMP::XS>, L<Net::SNMP::EV>.

=head1 AUTHOR

 Marc Lehmann <schmorp@schmorp.de>
 http://home.schmorp.de/

=cut

1



( run in 0.755 second using v1.01-cache-2.11-cpan-39bf76dae61 )