AnyEvent-ZabbixSender

 view release on metacpan or  search on metacpan

ZabbixSender.pm  view on Meta::CPAN

zabbix server.

=back

=cut

our $NOP = sub { };

my $json = eval { require JSON::XS; JSON::XS->new } || do { require JSON::PP; JSON::PP->new };

$json->utf8;

sub new {
   my $class = shift;
   my $self  = bless {
      server      => "localhost:10051",
      delay       => 0,
      retry_min   => 30,
      retry_max   => 300,
      queue_time  => 3600,
      on_response => $NOP,
      on_error    => sub {
         AE::log 4 => "$_[0]{zhost}:$_[0]{zport}: $_[2]"; # error
      },
      on_loss     => sub {
         my $nitems = @{ $_[1] };
         AE::log 5 => "$_[0]{zhost}:$_[0]{zport}: $nitems items lost"; # warn
      },

      @_,

      on_clear    => $NOP,
   }, $class;

   ($self->{zhost}, $self->{zport}) = AnyEvent::Socket::parse_hostport $self->{server}, 10051;

   $self->{host} //= do {
      require POSIX;
      (POSIX::uname())[1]
   };

   $self->{linger_time} //= $self->{queue_time};

   $self
}

sub DESTROY {
   my ($self) = @_;

   $self->_wait;

   %$self = ();
}

sub _wait {
   my ($self) = @_;

   while (@{ $self->{queue} } || $self->{sending}) {
      my $cv = AE::cv;

      my $to = AE::timer $self->{linger_time}, 0, $cv;
      local $self->{on_clear} = $cv;

      $cv->recv;
   }
}

=item $zbx->submit ($k, $v[, $clock[, $host]])

Submits a new key-value pair to the zabbix server. If C<$clock> is missing
or C<undef>, then C<AE::now> is used for the event timestamp. If C<$host>
is missing, then the default set during object creation is used.

=item $zbx->submit_multiple ([ [$k, $v, $clock, $host]... ])

Like C<submit>, but submits many key-value pairs at once.

=cut

sub submit_multiple {
   my ($self, $kvcs) = @_;

   push @{ $self->{queue} }, [AE::now, $kvcs];

   $self->_send
      unless $self->{sending};
}

sub submit {
   my ($self, $k, $v, $clock, $host) = @_;

   push @{ $self->{queue} }, [AE::now, [[$k, $v, $clock, $host]]];

   $self->_send;
}

# start sending
sub _send {
   my ($self) = @_;

   if ($self->{delay}) {
      Scalar::Util::weaken $self;
      $self->{delay_w} ||= AE::timer $self->{delay}, 0, sub {
         delete $self->{delay_w};
         $self->{send_immediate} = 1;
         $self->_send2 unless $self->{sending}++;
      };
   } else {
      $self->{send_immediate} = 1;
      $self->_send2 unless $self->{sending}++;
   }
}

# actually do send
sub _send2 {
   my ($self) = @_;

   Scalar::Util::weaken $self;
   $self->{connect_w} = AnyEvent::Socket::tcp_connect $self->{zhost}, $self->{zport}, sub {
      my ($fh) = @_;

      $fh
         or return $self->_retry;
         
      delete $self->{retry};

      delete $self->{send_immediate};
      my $data = delete $self->{queue};
      my $items = [map @{ $_->[1] }, @$data];

      my $fail = sub {
         $self->{on_error}($self, $items, $_[0]);
         $self->_retry;
      };

      $self->{hdl} = new AnyEvent::Handle
         fh => $fh,
         on_error  => sub {
            $fail->($_[2]);
         },
         on_read   => sub {
            if (13 <= length $_[0]{rbuf}) {
               my ($zbxd, $version, $length) = unpack "a4 C Q<", $_[0]{rbuf};

               $zbxd eq "ZBXD"
                  or return $fail->("protocol mismatch");
               $version == 1
                  or return $fail->("protocol version mismatch");

               if (13 + $length <= length $_[0]{rbuf}) {
                  delete $self->{hdl};

                  my $res = eval { $json->decode (substr $_[0]{rbuf}, 13) }
                     or return $fail->("protocol error");

                  $self->{on_response}($self, $items, $res);

                  delete $self->{sending};

                  $self->_send2 if delete $self->{send_immediate} && $self->{queue};

                  $self->{on_clear}();
               }
            }
         },
      ;

      my $json = $json->encode ({
         request => "sender data",
         clock => int AE::now,
         data => [
            map {
               my $slot = $_;

               map {
                  key   => $_->[0],
                  value => $_->[1],
                  clock => int ($_->[2] // $slot->[0]),
                  host  => $_->[3] // $self->{host},
               }, @{ $slot->[1] }
            } @$data
         ],
      });

      $self->{hdl}->push_write (pack "a4 C Q</a", "ZBXD", 1, $json);
   };
}

sub _retry {
   my ($self) = @_;

   Scalar::Util::weaken $self;

   delete $self->{hdl};

   my $expire = AE::now - $self->{queue_time};
   while (@{ $self->{queue} } && $self->{queue}[0][0] < $expire) {
      $self->{on_loss}($self, [shift @{ $self->{queue} }]);
   }

   unless (@{ $self->{queue} }) {
      delete $self->{sending};
      $self->{on_clear}();
      return;
   }

   my $retry = $self->{retry_min} * 2 ** $self->{retry}++;
   $retry = $self->{retry_max} if $retry > $self->{retry_max};
   $self->{retry_w} = AE::timer $retry, 0, sub {
      delete $self->{retry_w};
      $self->_send2;
   };
}

=back

=head1 SEE ALSO

L<AnyEvent>.

=head1 AUTHOR

   Marc Lehmann <schmorp@schmorp.de>
   http://home.schmorp.de/

=cut

1



( run in 0.661 second using v1.01-cache-2.11-cpan-39bf76dae61 )