AnyEvent-InfluxDB
view release on metacpan or search on metacpan
lib/AnyEvent/InfluxDB.pm view on Meta::CPAN
#ABSTRACT: An asynchronous library for InfluxDB time-series database
use strict;
use warnings;
package AnyEvent::InfluxDB;
our $AUTHORITY = 'cpan:AJGB';
$AnyEvent::InfluxDB::VERSION = '1.0.2.0';
use AnyEvent;
use AnyEvent::HTTP;
use URI;
use URI::QueryParam;
use JSON qw(decode_json);
use List::MoreUtils qw(zip);
use URI::Encode::XS qw( uri_encode );
use Moo;
has [qw( ssl_options username password jwt on_request )] => (
is => 'ro',
predicate => 1,
);
has 'server' => (
is => 'rw',
default => 'http://localhost:8086',
);
has '_is_ssl' => (
is => 'lazy',
);
has '_tls_ctx' => (
is => 'lazy',
);
has '_server_uri' => (
is => 'lazy',
);
sub _build__tls_ctx {
my ($self) = @_;
# no ca/hostname checks
return 'low' unless $self->has_ssl_options;
# create ctx
require AnyEvent::TLS;
return AnyEvent::TLS->new( %{ $self->ssl_options } );
}
sub _build__is_ssl {
my ($self) = @_;
return $self->server =~ /^https/;
}
sub _build__server_uri {
my ($self) = @_;
my $url = URI->new( $self->server, 'http' );
if ( $self->has_username && $self->has_password ) {
$url->query_param( 'u' => $self->username );
$url->query_param( 'p' => $self->password );
}
return $url;
}
sub _make_url {
my ($self, $path, $params) = @_;
my $url = $self->_server_uri->clone;
$url->path($path);
while ( my ($k, $v) = each %$params ) {
$url->query_param( $k => $v );
}
return $url;
}
sub _http_request {
my $cb = pop;
my ($self, $method, $url, $post_data) = @_;
if ($self->has_on_request) {
$self->on_request->($method, $url, $post_data);
}
my %args = (
headers => {
referer => undef,
'user-agent' => "AnyEvent-InfluxDB/0.13",
}
);
if ($self->has_jwt) {
$args{headers}->{Authorization} = 'Bearer '. $self->jwt;
}
if ( $method eq 'POST' ) {
if ( defined $post_data ) {
$args{'body'} = $post_data;
} else {
if ( my $q = $url->query_param_delete('q') ) {
$args{headers}{'content-type'} = 'application/x-www-form-urlencoded';
$args{body} = 'q='. uri_encode($q);
}
}
}
if ( $self->_is_ssl ) {
$args{tls_ctx} = $self->_tls_ctx;
}
my $guard;
$guard = http_request
$method => $url->as_string,
%args,
sub {
$cb->(@_);
undef $guard;
};
};
lib/AnyEvent/InfluxDB.pm view on Meta::CPAN
}
}
);
}
sub show_field_keys {
my ($self, %args) = @_;
my $q;
if ( exists $args{q} ) {
$q = $args{q};
} else {
$q = 'SHOW FIELD KEYS';
if ( my $measurement = $args{measurement} ) {
$q .= ' FROM '. $measurement;
}
}
my $url = $self->_make_url('/query', {
db => $args{database},
q => $q
});
$self->_http_request( GET => $url,
sub {
my ($body, $headers) = @_;
if ( $headers->{Status} eq '200' ) {
my $data = decode_json($body);
my $field_keys = {};
for my $res ( @{ $data->{results}->[0]->{series} || [] } ) {
my $values = $res->{values};
$field_keys->{ $res->{name } } = [
map {
+{
name => $_->[0],
type => $_->[1],
}
} @{ $values || [] }
];
}
$args{on_success}->($field_keys);
} else {
$args{on_error}->( $body );
}
}
);
}
sub create_user {
my ($self, %args) = @_;
my $q;
if ( exists $args{q} ) {
$q = $args{q};
} else {
$q = 'CREATE USER '. $args{username}
.' WITH PASSWORD \''. $args{password} .'\'';
$q .= ' WITH ALL PRIVILEGES' if $args{all_privileges};
}
my $url = $self->_make_url('/query', {
q => $q
});
$self->_http_request( POST => $url,
sub {
my ($body, $headers) = @_;
if ( $headers->{Status} eq '200' ) {
$args{on_success}->();
} else {
$args{on_error}->( $body );
}
}
);
}
sub set_user_password {
my ($self, %args) = @_;
my $q;
if ( exists $args{q} ) {
$q = $args{q};
} else {
$q = 'SET PASSWORD FOR '. $args{username}
.' = \''. $args{password} .'\'';
}
my $url = $self->_make_url('/query', {
q => $q
});
$self->_http_request( POST => $url,
sub {
my ($body, $headers) = @_;
if ( $headers->{Status} eq '200' ) {
$args{on_success}->();
} else {
$args{on_error}->( $body );
}
}
);
}
sub show_users {
my ($self, %args) = @_;
my $url = $self->_make_url('/query', {
q => 'SHOW USERS'
});
$self->_http_request( GET => $url,
sub {
my ($body, $headers) = @_;
if ( $headers->{Status} eq '200' ) {
my $data = decode_json($body);
my $res = $data->{results}->[0]->{series}->[0];
my $cols = $res->{columns};
my $values = $res->{values};
my @users = (
map {
+{
zip(@$cols, @$_)
}
} @{ $values || [] }
);
$args{on_success}->(@users);
} else {
$args{on_error}->( $body );
}
}
);
}
sub grant_privileges {
my ($self, %args) = @_;
my $q;
if ( exists $args{q} ) {
$q = $args{q};
} else {
$q = 'GRANT ';
lib/AnyEvent/InfluxDB.pm view on Meta::CPAN
$self->_http_request( POST => $url,
sub {
my ($body, $headers) = @_;
if ( $headers->{Status} eq '200' ) {
$args{on_success}->();
} else {
$args{on_error}->( $body );
}
}
);
}
sub query {
my ($self, %args) = @_;
my $url = $self->_server_uri->clone;
$url->path('/query');
$url->query_form_hash( $args{query} );
my $method = $args{method} || 'GET';
$self->_http_request( $method => $url,
sub {
$args{on_response}->(@_);
}
);
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
AnyEvent::InfluxDB - An asynchronous library for InfluxDB time-series database
=head1 VERSION
version 1.0.2.0
=head1 SYNOPSIS
use EV;
use AnyEvent;
use AnyEvent::Socket;
use AnyEvent::Handle;
use AnyEvent::InfluxDB;
use Monitoring::Plugin::Performance;
my $db = AnyEvent::InfluxDB->new(
server => 'http://localhost:8086',
username => 'admin',
password => 'password',
);
my $hdl;
tcp_server undef, 8888, sub {
my ($fh, $host, $port) = @_;
$hdl = AnyEvent::Handle->new(
fh => $fh,
);
$hdl->push_read(
line => sub {
my (undef, $line) = @_;
# Disk\t/=382MB;15264;15269;; /var=218MB;9443;9448
my ($measurement, $perfstring) = split(/\t/, $line);
my @perfdata
= Monitoring::Plugin::Performance->parse_perfstring($perfstring);
$db->write(
database => 'mydb',
data => [
map {
+{
measurement => $measurement,
tags => {
label => $_->label,
},
fields => {
value => $_->value,
uom => '"'. $_->uom .'"',
},
}
} @perfdata
],
on_success => sub { print "$line written\n"; },
on_error => sub { print "$line error: @_\n"; },
);
$hdl->on_drain(
sub {
$hdl->fh->close;
undef $hdl;
}
);
},
);
};
EV::run;
=head1 DESCRIPTION
Asynchronous client library for InfluxDB time-series database L<https://influxdb.com>.
This version is meant to be used with InfluxDB v1.0.0 or newer.
=head1 METHODS
=head2 new
my $db = AnyEvent::InfluxDB->new(
server => 'http://localhost:8086',
# authenticate using Basic credentials
username => 'admin',
password => 'password',
# or use JWT token
jwt => 'JWT_TOKEN_BLOB'
);
Returns object representing given InfluDB C<server> connected using optionally
provided username C<username> and password C<password>.
Default value of C<server> is C<http://localhost:8086>.
If the server protocol is C<https> then by default no validation of remote
host certificate is performed. This can be changed by setting C<ssl_options>
parameter with any options accepted by L<AnyEvent::TLS>.
my $db = AnyEvent::InfluxDB->new(
...
ssl_options => {
verify => 1,
verify_peername => 'https',
ca_file => '/path/to/cacert.pem',
}
);
As an debugging aid the C<on_request> code reference may also be provided. It will
be executed before each request with the method name, url and POST data if set.
my $db = AnyEvent::InfluxDB->new(
...
on_request => sub {
my ($method, $url, $post_data) = @_;
print "$method $url\n";
print "$post_data\n" if $post_data;
}
);
=for Pod::Coverage has_jwt jwt has_on_request has_password has_ssl_options has_username on_request password server ssl_options username
=head2 ping
$cv = AE::cv;
$db->ping(
wait_for_leader => 2,
on_success => $cv,
on_error => sub {
$cv->croak("Failed to ping cluster leader: @_");
}
);
my $version = $cv->recv;
Checks the leader of the cluster to ensure that the leader is available and ready.
The optional parameter C<wait_for_leader> specifies the number of seconds to wait
before returning a response.
The required C<on_success> code reference is executed if request was successful
with the value of C<X-Influxdb-Version> response header as argument,
otherwise executes the required C<on_error> code reference with the value of
C<Reason> response header as argument.
=head2 Managing Data
=head3 write
$cv = AE::cv;
$db->write(
database => 'mydb',
precision => 's',
rp => 'last_day',
consistency => 'quorum',
data => [
# line protocol formatted
'cpu_load,host=server02,region=eu-east sensor="top",value=0.64 1456097956',
# or as a hash
{
measurement => 'cpu_load',
tags => {
host => 'server02',
region => 'eu-east',
},
fields => {
value => '0.64',
sensor => q{"top"},
},
time => time()
}
],
on_success => $cv,
on_error => sub {
$cv->croak("Failed to write data: @_");
}
);
$cv->recv;
lib/AnyEvent/InfluxDB.pm view on Meta::CPAN
}
);
my $tag_values = $cv->recv;
for my $measurement ( sort keys %{ $tag_values } ) {
print "Measurement: $measurement\n";
for my $tag_key ( sort keys %{ $tag_values->{$measurement} } ) {
print " Tag key: $tag_key\n";
print " * $_\n" for @{ $tag_values->{$measurement}->{$tag_key} };
}
}
Returns a hash reference with measurements as keys and their unique tag values
as values from database C<database> and optional measurement C<measurement>
from a single tag key C<key> or a list of tag keys C<keys> with number
of results limited to C<limit> with offset C<offset>.
The required C<on_success> code reference is executed if request was successful,
otherwise executes the required C<on_error> code reference.
=head3 show_field_keys
$cv = AE::cv;
$db->show_field_keys(
database => 'mydb',
# raw query
q => "SHOW FIELD KEYS FROM cpu_load",
# or query created from arguments
measurement => 'cpu_load',
# callbacks
on_success => $cv,
on_error => sub {
$cv->croak("Failed to list field keys: @_");
}
);
my $field_keys = $cv->recv;
for my $measurement ( sort keys %{ $field_keys } ) {
print "Measurement: $measurement\n";
for my $field ( @{ $field_keys->{$measurement} } ) {
print " Key: $field->{key}\n";
print " Type: $field->{type}\n";
}
}
Returns a hash reference with measurements as keys and their field keys names
and type as values from database C<database> and optional measurement
C<measurement>.
The required C<on_success> code reference is executed if request was successful,
otherwise executes the required C<on_error> code reference.
=head2 User Management
=head3 create_user
$cv = AE::cv;
$db->create_user(
# raw query
q => "CREATE USER jdoe WITH PASSWORD 'mypassword' WITH ALL PRIVILEGES",
# or query created from arguments
username => 'jdoe',
password => 'mypassword',
all_privileges => 1,
# callbacks
on_success => $cv,
on_error => sub {
$cv->croak("Failed to create user: @_");
}
);
$cv->recv;
Creates user with C<username> and C<password>. If flag C<all_privileges> is set
to true created user will be granted cluster administration privileges.
Note: C<password> will be automatically enclosed in single quotes.
The required C<on_success> code reference is executed if request was successful,
otherwise executes the required C<on_error> code reference.
=head3 set_user_password
$cv = AE::cv;
$db->set_user_password(
# raw query
q => "SET PASSWORD FOR jdoe = 'otherpassword'",
# or query created from arguments
username => 'jdoe',
password => 'otherpassword',
# callbacks
on_success => $cv,
on_error => sub {
$cv->croak("Failed to set password: @_");
}
);
$cv->recv;
Sets password to C<password> for the user identified by C<username>.
Note: C<password> will be automatically enclosed in single quotes.
The required C<on_success> code reference is executed if request was successful,
otherwise executes the required C<on_error> code reference.
=head3 show_users
$cv = AE::cv;
$db->show_users(
on_success => $cv,
on_error => sub {
$cv->croak("Failed to list users: @_");
}
);
my @users = $cv->recv;
for my $u ( @users ) {
print "Name: $u->{user}\n";
print "Admin?: $u->{admin}\n";
}
Returns a list of hash references with keys C<user> and C<admin> for each
defined user.
The required C<on_success> code reference is executed if request was successful,
otherwise executes the required C<on_error> code reference.
=head3 grant_privileges
$cv = AE::cv;
$db->grant_privileges(
# raw query
q => "GRANT ALL ON mydb TO jdoe",
# or query created from arguments
username => 'jdoe',
# privileges at single database
database => 'mydb',
access => 'ALL',
# or to grant cluster administration privileges
all_privileges => 1,
# callbacks
on_success => $cv,
on_error => sub {
$cv->croak("Failed to grant privileges: @_");
}
);
$cv->recv;
Grants to user C<username> access C<access> on database C<database>.
If flag C<all_privileges> is set it grants cluster administration privileges
instead.
The required C<on_success> code reference is executed if request was successful,
otherwise executes the required C<on_error> code reference.
=head3 show_grants
$cv = AE::cv;
( run in 1.228 second using v1.01-cache-2.11-cpan-140bd7fdf52 )