AnyEvent-InfluxDB

 view release on metacpan or  search on metacpan

lib/AnyEvent/InfluxDB.pm  view on Meta::CPAN

#ABSTRACT: An asynchronous library for InfluxDB time-series database
use strict;
use warnings;
package AnyEvent::InfluxDB;
our $AUTHORITY = 'cpan:AJGB';
$AnyEvent::InfluxDB::VERSION = '1.0.2.0';
use AnyEvent;
use AnyEvent::HTTP;
use URI;
use URI::QueryParam;
use JSON qw(decode_json);
use List::MoreUtils qw(zip);
use URI::Encode::XS qw( uri_encode );
use Moo;

has [qw( ssl_options username password jwt on_request )] => (
    is => 'ro',
    predicate => 1,
);

has 'server' => (
    is => 'rw',
    default => 'http://localhost:8086',
);

has '_is_ssl' => (
    is => 'lazy',
);

has '_tls_ctx' => (
    is => 'lazy',
);

has '_server_uri' => (
    is => 'lazy',
);


sub _build__tls_ctx {
    my ($self) = @_;

    # no ca/hostname checks
    return 'low' unless $self->has_ssl_options;

    # create ctx
    require AnyEvent::TLS;
    return AnyEvent::TLS->new( %{ $self->ssl_options } );
}

sub _build__is_ssl {
    my ($self) = @_;

    return $self->server =~ /^https/;
}

sub _build__server_uri {
    my ($self) = @_;

    my $url = URI->new( $self->server, 'http' );

    if ( $self->has_username && $self->has_password ) {
        $url->query_param( 'u' => $self->username );
        $url->query_param( 'p' => $self->password );
    }

    return $url;
}

sub _make_url {
    my ($self, $path, $params) = @_;

    my $url = $self->_server_uri->clone;
    $url->path($path);

    while ( my ($k, $v) = each %$params ) {
        $url->query_param( $k => $v );
    }

    return $url;
}

sub _http_request {
    my $cb = pop;
    my ($self, $method, $url, $post_data) = @_;

    if ($self->has_on_request) {
        $self->on_request->($method, $url, $post_data);
    }

    my %args = (
        headers => {
            referer => undef,
            'user-agent' => "AnyEvent-InfluxDB/0.13",
        }
    );

    if ($self->has_jwt) {
        $args{headers}->{Authorization} = 'Bearer '. $self->jwt;
    }

    if ( $method eq 'POST' ) {
        if ( defined $post_data ) {
            $args{'body'} = $post_data;
        } else {
            if ( my $q = $url->query_param_delete('q') ) {
                $args{headers}{'content-type'} = 'application/x-www-form-urlencoded';
                $args{body} = 'q='. uri_encode($q);
            }
        }
    }
    if ( $self->_is_ssl ) {
        $args{tls_ctx} = $self->_tls_ctx;
    }

    my $guard;
    $guard = http_request
        $method => $url->as_string,
        %args,
        sub {
            $cb->(@_);
            undef $guard;
        };
};

lib/AnyEvent/InfluxDB.pm  view on Meta::CPAN

            }
        }
    );
}


sub show_field_keys {
    my ($self, %args) = @_;

    my $q;
    if ( exists $args{q} ) {
        $q = $args{q};
    } else {
        $q = 'SHOW FIELD KEYS';

        if ( my $measurement = $args{measurement} ) {
            $q .= ' FROM '. $measurement;
        }
    }

    my $url = $self->_make_url('/query', {
        db => $args{database},
        q => $q
    });

    $self->_http_request( GET => $url,
        sub {
            my ($body, $headers) = @_;

            if ( $headers->{Status} eq '200' ) {
                my $data = decode_json($body);
                my $field_keys = {};
                for my $res ( @{ $data->{results}->[0]->{series} || [] } ) {
                    my $values = $res->{values};
                    $field_keys->{ $res->{name } } = [
                        map {
                            +{
                                name => $_->[0],
                                type => $_->[1],
                            }
                        } @{ $values || [] }
                    ];
                }
                $args{on_success}->($field_keys);
            } else {
                $args{on_error}->( $body );
            }
        }
    );
}


sub create_user {
    my ($self, %args) = @_;

    my $q;
    if ( exists $args{q} ) {
        $q = $args{q};
    } else {
        $q = 'CREATE USER '. $args{username}
            .' WITH PASSWORD \''. $args{password} .'\'';

        $q .= ' WITH ALL PRIVILEGES' if $args{all_privileges};
    }

    my $url = $self->_make_url('/query', {
        q => $q
    });

    $self->_http_request( POST => $url,
        sub {
            my ($body, $headers) = @_;

            if ( $headers->{Status} eq '200' ) {
                $args{on_success}->();
            } else {
                $args{on_error}->( $body );
            }
        }
    );
}


sub set_user_password {
    my ($self, %args) = @_;

    my $q;
    if ( exists $args{q} ) {
        $q = $args{q};
    } else {
        $q = 'SET PASSWORD FOR '. $args{username}
            .' = \''. $args{password} .'\'';
    }

    my $url = $self->_make_url('/query', {
        q => $q
    });

    $self->_http_request( POST => $url,
        sub {
            my ($body, $headers) = @_;

            if ( $headers->{Status} eq '200' ) {
                $args{on_success}->();
            } else {
                $args{on_error}->( $body );
            }
        }
    );
}


sub show_users {
    my ($self, %args) = @_;

    my $url = $self->_make_url('/query', {
        q => 'SHOW USERS'
    });

    $self->_http_request( GET => $url,
        sub {
            my ($body, $headers) = @_;

            if ( $headers->{Status} eq '200' ) {
                my $data = decode_json($body);
                my $res = $data->{results}->[0]->{series}->[0];
                my $cols = $res->{columns};
                my $values = $res->{values};
                my @users = (
                    map {
                        +{
                            zip(@$cols, @$_)
                        }
                    } @{ $values || [] }
                );
                $args{on_success}->(@users);
            } else {
                $args{on_error}->( $body );
            }
        }
    );
}


sub grant_privileges {
    my ($self, %args) = @_;

    my $q;
    if ( exists $args{q} ) {
        $q = $args{q};
    } else {
        $q = 'GRANT ';

lib/AnyEvent/InfluxDB.pm  view on Meta::CPAN


    $self->_http_request( POST => $url,
        sub {
            my ($body, $headers) = @_;

            if ( $headers->{Status} eq '200' ) {
                $args{on_success}->();
            } else {
                $args{on_error}->( $body );
            }
        }
    );
}


sub query {
    my ($self, %args) = @_;

    my $url = $self->_server_uri->clone;
    $url->path('/query');
    $url->query_form_hash( $args{query} );

    my $method = $args{method} || 'GET';

    $self->_http_request( $method => $url,
        sub {
            $args{on_response}->(@_);
        }
    );
}


1;

__END__

=pod

=encoding UTF-8

=head1 NAME

AnyEvent::InfluxDB - An asynchronous library for InfluxDB time-series database

=head1 VERSION

version 1.0.2.0

=head1 SYNOPSIS

    use EV;
    use AnyEvent;
    use AnyEvent::Socket;
    use AnyEvent::Handle;
    use AnyEvent::InfluxDB;
    use Monitoring::Plugin::Performance;

    my $db = AnyEvent::InfluxDB->new(
        server => 'http://localhost:8086',
        username => 'admin',
        password => 'password',
    );

    my $hdl;
    tcp_server undef, 8888, sub {
        my ($fh, $host, $port) = @_;

        $hdl = AnyEvent::Handle->new(
            fh => $fh,
        );

        $hdl->push_read(
            line => sub {
                my (undef, $line) = @_;

                # Disk\t/=382MB;15264;15269;; /var=218MB;9443;9448
                my ($measurement, $perfstring) = split(/\t/, $line);

                my @perfdata
                    = Monitoring::Plugin::Performance->parse_perfstring($perfstring);

                $db->write(
                    database => 'mydb',
                    data => [
                        map {
                            +{
                                measurement => $measurement,
                                tags => {
                                    label => $_->label,
                                },
                                fields => {
                                    value => $_->value,
                                    uom => '"'. $_->uom .'"',
                                },
                            }
                        } @perfdata
                    ],
                    on_success => sub { print "$line written\n"; },
                    on_error => sub { print "$line error: @_\n"; },
                );

                $hdl->on_drain(
                    sub {
                        $hdl->fh->close;
                        undef $hdl;
                    }
                );
            },
        );
    };

    EV::run;

=head1 DESCRIPTION

Asynchronous client library for InfluxDB time-series database L<https://influxdb.com>.

This version is meant to be used with InfluxDB v1.0.0 or newer.

=head1 METHODS

=head2 new

    my $db = AnyEvent::InfluxDB->new(
        server => 'http://localhost:8086',

        # authenticate using Basic credentials
        username => 'admin',
        password => 'password',

        # or use JWT token
        jwt => 'JWT_TOKEN_BLOB'
    );

Returns object representing given InfluDB C<server> connected using optionally
provided username C<username> and password C<password>.

Default value of C<server> is C<http://localhost:8086>.

If the server protocol is C<https> then by default no validation of remote
host certificate is performed. This can be changed by setting C<ssl_options>
parameter with any options accepted by L<AnyEvent::TLS>.

    my $db = AnyEvent::InfluxDB->new(
        ...
        ssl_options => {
            verify => 1,
            verify_peername => 'https',
            ca_file => '/path/to/cacert.pem',
        }
    );

As an debugging aid the C<on_request> code reference may also be provided. It will
be executed before each request with the method name, url and POST data if set.

    my $db = AnyEvent::InfluxDB->new(
        ...
        on_request => sub {
            my ($method, $url, $post_data) = @_;
            print "$method $url\n";
            print "$post_data\n" if $post_data;
        }
    );

=for Pod::Coverage has_jwt jwt has_on_request has_password has_ssl_options has_username on_request password server ssl_options username

=head2 ping

    $cv = AE::cv;
    $db->ping(
        wait_for_leader => 2,

        on_success => $cv,
        on_error => sub {
            $cv->croak("Failed to ping cluster leader: @_");
        }
    );
    my $version = $cv->recv;

Checks the leader of the cluster to ensure that the leader is available and ready.
The optional parameter C<wait_for_leader> specifies the number of seconds to wait
before returning a response.

The required C<on_success> code reference is executed if request was successful
with the value of C<X-Influxdb-Version> response header as argument,
otherwise executes the required C<on_error> code reference with the value of
C<Reason> response header as argument.

=head2 Managing Data

=head3 write

    $cv = AE::cv;
    $db->write(
        database => 'mydb',
        precision => 's',
        rp => 'last_day',
        consistency => 'quorum',

        data => [
            # line protocol formatted
            'cpu_load,host=server02,region=eu-east sensor="top",value=0.64 1456097956',

            # or as a hash
            {
                measurement => 'cpu_load',
                tags => {
                    host => 'server02',
                    region => 'eu-east',
                },
                fields => {
                    value => '0.64',
                    sensor => q{"top"},
                },
                time => time()
            }
        ],

        on_success => $cv,
        on_error => sub {
            $cv->croak("Failed to write data: @_");
        }
    );
    $cv->recv;

lib/AnyEvent/InfluxDB.pm  view on Meta::CPAN

        }
    );
    my $tag_values = $cv->recv;
    for my $measurement ( sort keys %{ $tag_values } ) {
        print "Measurement: $measurement\n";
        for my $tag_key ( sort keys %{ $tag_values->{$measurement} } ) {
            print "  Tag key: $tag_key\n";
            print "   * $_\n" for @{ $tag_values->{$measurement}->{$tag_key} };
        }
    }

Returns a hash reference with measurements as keys and their unique tag values
as values from database C<database> and optional measurement C<measurement>
from a single tag key C<key> or a list of tag keys C<keys> with number
of results limited to C<limit> with offset C<offset>.

The required C<on_success> code reference is executed if request was successful,
otherwise executes the required C<on_error> code reference.

=head3 show_field_keys

    $cv = AE::cv;
    $db->show_field_keys(
        database => 'mydb',

        # raw query
        q => "SHOW FIELD KEYS FROM cpu_load",

        # or query created from arguments
        measurement => 'cpu_load',

        # callbacks
        on_success => $cv,
        on_error => sub {
            $cv->croak("Failed to list field keys: @_");
        }
    );
    my $field_keys = $cv->recv;
    for my $measurement ( sort keys %{ $field_keys } ) {
        print "Measurement: $measurement\n";
        for my $field ( @{ $field_keys->{$measurement} } ) {
            print "  Key:  $field->{key}\n";
            print "  Type: $field->{type}\n";
        }
    }

Returns a hash reference with measurements as keys and their field keys names
and type as values from database C<database> and optional measurement
C<measurement>.

The required C<on_success> code reference is executed if request was successful,
otherwise executes the required C<on_error> code reference.

=head2 User Management

=head3 create_user

    $cv = AE::cv;
    $db->create_user(
        # raw query
        q => "CREATE USER jdoe WITH PASSWORD 'mypassword' WITH ALL PRIVILEGES",

        # or query created from arguments
        username => 'jdoe',
        password => 'mypassword',
        all_privileges => 1,

        # callbacks
        on_success => $cv,
        on_error => sub {
            $cv->croak("Failed to create user: @_");
        }
    );
    $cv->recv;

Creates user with C<username> and C<password>. If flag C<all_privileges> is set
to true created user will be granted cluster administration privileges.

Note: C<password> will be automatically enclosed in single quotes.

The required C<on_success> code reference is executed if request was successful,
otherwise executes the required C<on_error> code reference.

=head3 set_user_password

    $cv = AE::cv;
    $db->set_user_password(
        # raw query
        q => "SET PASSWORD FOR jdoe = 'otherpassword'",

        # or query created from arguments
        username => 'jdoe',
        password => 'otherpassword',

        # callbacks
        on_success => $cv,
        on_error => sub {
            $cv->croak("Failed to set password: @_");
        }
    );
    $cv->recv;

Sets password to C<password> for the user identified by C<username>.

Note: C<password> will be automatically enclosed in single quotes.

The required C<on_success> code reference is executed if request was successful,
otherwise executes the required C<on_error> code reference.

=head3 show_users

    $cv = AE::cv;
    $db->show_users(
        on_success => $cv,
        on_error => sub {
            $cv->croak("Failed to list users: @_");
        }
    );
    my @users = $cv->recv;
    for my $u ( @users ) {
        print "Name: $u->{user}\n";
        print "Admin?: $u->{admin}\n";
    }

Returns a list of hash references with keys C<user> and C<admin> for each
defined user.

The required C<on_success> code reference is executed if request was successful,
otherwise executes the required C<on_error> code reference.

=head3 grant_privileges

    $cv = AE::cv;
    $db->grant_privileges(
        # raw query
        q => "GRANT ALL ON mydb TO jdoe",

        # or query created from arguments
        username => 'jdoe',

        # privileges at single database
        database => 'mydb',
        access => 'ALL',

        # or to grant cluster administration privileges
        all_privileges => 1,

        # callbacks
        on_success => $cv,
        on_error => sub {
            $cv->croak("Failed to grant privileges: @_");
        }
    );
    $cv->recv;

Grants to user C<username> access C<access> on database C<database>.
If flag C<all_privileges> is set it grants cluster administration privileges
instead.

The required C<on_success> code reference is executed if request was successful,
otherwise executes the required C<on_error> code reference.

=head3 show_grants

    $cv = AE::cv;



( run in 1.228 second using v1.01-cache-2.11-cpan-140bd7fdf52 )