AnyEvent-InfluxDB
view release on metacpan or search on metacpan
lib/AnyEvent/InfluxDB.pm view on Meta::CPAN
#ABSTRACT: An asynchronous library for InfluxDB time-series database
use strict;
use warnings;
package AnyEvent::InfluxDB;
our $AUTHORITY = 'cpan:AJGB';
$AnyEvent::InfluxDB::VERSION = '1.0.2.0';
use AnyEvent;
use AnyEvent::HTTP;
use URI;
use URI::QueryParam;
use JSON qw(decode_json);
use List::MoreUtils qw(zip);
use URI::Encode::XS qw( uri_encode );
use Moo;
has [qw( ssl_options username password jwt on_request )] => (
is => 'ro',
predicate => 1,
);
has 'server' => (
is => 'rw',
default => 'http://localhost:8086',
);
has '_is_ssl' => (
is => 'lazy',
);
has '_tls_ctx' => (
is => 'lazy',
);
has '_server_uri' => (
is => 'lazy',
);
sub _build__tls_ctx {
my ($self) = @_;
# no ca/hostname checks
return 'low' unless $self->has_ssl_options;
# create ctx
require AnyEvent::TLS;
return AnyEvent::TLS->new( %{ $self->ssl_options } );
}
sub _build__is_ssl {
my ($self) = @_;
return $self->server =~ /^https/;
}
sub _build__server_uri {
my ($self) = @_;
my $url = URI->new( $self->server, 'http' );
if ( $self->has_username && $self->has_password ) {
lib/AnyEvent/InfluxDB.pm view on Meta::CPAN
);
}
sub drop_subscription {
my ($self, %args) = @_;
my $q;
if ( exists $args{q} ) {
$q = $args{q};
} else {
$q = 'DROP SUBSCRIPTION '. $args{name} .' ON '
. $args{database} .'.'. $args{rp};
}
my $url = $self->_make_url('/query', {
q => $q,
});
$self->_http_request( POST => $url,
sub {
my ($body, $headers) = @_;
if ( $headers->{Status} eq '200' ) {
$args{on_success}->();
} else {
$args{on_error}->( $body );
}
}
);
}
sub query {
my ($self, %args) = @_;
my $url = $self->_server_uri->clone;
$url->path('/query');
$url->query_form_hash( $args{query} );
my $method = $args{method} || 'GET';
$self->_http_request( $method => $url,
sub {
$args{on_response}->(@_);
}
);
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
AnyEvent::InfluxDB - An asynchronous library for InfluxDB time-series database
=head1 VERSION
version 1.0.2.0
=head1 SYNOPSIS
use EV;
use AnyEvent;
use AnyEvent::Socket;
use AnyEvent::Handle;
use AnyEvent::InfluxDB;
use Monitoring::Plugin::Performance;
my $db = AnyEvent::InfluxDB->new(
server => 'http://localhost:8086',
username => 'admin',
password => 'password',
);
my $hdl;
tcp_server undef, 8888, sub {
my ($fh, $host, $port) = @_;
$hdl = AnyEvent::Handle->new(
fh => $fh,
);
$hdl->push_read(
line => sub {
my (undef, $line) = @_;
# Disk\t/=382MB;15264;15269;; /var=218MB;9443;9448
my ($measurement, $perfstring) = split(/\t/, $line);
my @perfdata
= Monitoring::Plugin::Performance->parse_perfstring($perfstring);
$db->write(
database => 'mydb',
data => [
map {
+{
measurement => $measurement,
tags => {
label => $_->label,
},
fields => {
value => $_->value,
uom => '"'. $_->uom .'"',
},
}
} @perfdata
],
on_success => sub { print "$line written\n"; },
on_error => sub { print "$line error: @_\n"; },
);
$hdl->on_drain(
sub {
$hdl->fh->close;
undef $hdl;
}
);
},
);
};
EV::run;
=head1 DESCRIPTION
Asynchronous client library for InfluxDB time-series database L<https://influxdb.com>.
This version is meant to be used with InfluxDB v1.0.0 or newer.
=head1 METHODS
=head2 new
my $db = AnyEvent::InfluxDB->new(
server => 'http://localhost:8086',
# authenticate using Basic credentials
username => 'admin',
password => 'password',
# or use JWT token
jwt => 'JWT_TOKEN_BLOB'
);
Returns object representing given InfluDB C<server> connected using optionally
provided username C<username> and password C<password>.
Default value of C<server> is C<http://localhost:8086>.
If the server protocol is C<https> then by default no validation of remote
host certificate is performed. This can be changed by setting C<ssl_options>
parameter with any options accepted by L<AnyEvent::TLS>.
my $db = AnyEvent::InfluxDB->new(
...
ssl_options => {
verify => 1,
verify_peername => 'https',
ca_file => '/path/to/cacert.pem',
}
);
As an debugging aid the C<on_request> code reference may also be provided. It will
be executed before each request with the method name, url and POST data if set.
my $db = AnyEvent::InfluxDB->new(
...
on_request => sub {
my ($method, $url, $post_data) = @_;
print "$method $url\n";
print "$post_data\n" if $post_data;
}
);
=for Pod::Coverage has_jwt jwt has_on_request has_password has_ssl_options has_username on_request password server ssl_options username
=head2 ping
$cv = AE::cv;
$db->ping(
wait_for_leader => 2,
on_success => $cv,
on_error => sub {
$cv->croak("Failed to ping cluster leader: @_");
}
( run in 0.773 second using v1.01-cache-2.11-cpan-e1769b4cff6 )