AnyEvent-InfluxDB

 view release on metacpan or  search on metacpan

lib/AnyEvent/InfluxDB.pm  view on Meta::CPAN

#ABSTRACT: An asynchronous library for InfluxDB time-series database
use strict;
use warnings;
package AnyEvent::InfluxDB;
our $AUTHORITY = 'cpan:AJGB';
$AnyEvent::InfluxDB::VERSION = '1.0.2.0';
use AnyEvent;
use AnyEvent::HTTP;
use URI;
use URI::QueryParam;
use JSON qw(decode_json);
use List::MoreUtils qw(zip);
use URI::Encode::XS qw( uri_encode );
use Moo;

has [qw( ssl_options username password jwt on_request )] => (
    is => 'ro',
    predicate => 1,
);

has 'server' => (
    is => 'rw',
    default => 'http://localhost:8086',
);

has '_is_ssl' => (
    is => 'lazy',
);

has '_tls_ctx' => (
    is => 'lazy',
);

has '_server_uri' => (
    is => 'lazy',
);


sub _build__tls_ctx {
    my ($self) = @_;

    # no ca/hostname checks
    return 'low' unless $self->has_ssl_options;

    # create ctx
    require AnyEvent::TLS;
    return AnyEvent::TLS->new( %{ $self->ssl_options } );
}

sub _build__is_ssl {
    my ($self) = @_;

    return $self->server =~ /^https/;
}

sub _build__server_uri {
    my ($self) = @_;

    my $url = URI->new( $self->server, 'http' );

    if ( $self->has_username && $self->has_password ) {

lib/AnyEvent/InfluxDB.pm  view on Meta::CPAN

    );
}


sub drop_subscription {
    my ($self, %args) = @_;

    my $q;
    if ( exists $args{q} ) {
        $q = $args{q};
    } else {
        $q = 'DROP SUBSCRIPTION '. $args{name} .' ON '
            . $args{database} .'.'. $args{rp};
    }

    my $url = $self->_make_url('/query', {
        q => $q,
    });

    $self->_http_request( POST => $url,
        sub {
            my ($body, $headers) = @_;

            if ( $headers->{Status} eq '200' ) {
                $args{on_success}->();
            } else {
                $args{on_error}->( $body );
            }
        }
    );
}


sub query {
    my ($self, %args) = @_;

    my $url = $self->_server_uri->clone;
    $url->path('/query');
    $url->query_form_hash( $args{query} );

    my $method = $args{method} || 'GET';

    $self->_http_request( $method => $url,
        sub {
            $args{on_response}->(@_);
        }
    );
}


1;

__END__

=pod

=encoding UTF-8

=head1 NAME

AnyEvent::InfluxDB - An asynchronous library for InfluxDB time-series database

=head1 VERSION

version 1.0.2.0

=head1 SYNOPSIS

    use EV;
    use AnyEvent;
    use AnyEvent::Socket;
    use AnyEvent::Handle;
    use AnyEvent::InfluxDB;
    use Monitoring::Plugin::Performance;

    my $db = AnyEvent::InfluxDB->new(
        server => 'http://localhost:8086',
        username => 'admin',
        password => 'password',
    );

    my $hdl;
    tcp_server undef, 8888, sub {
        my ($fh, $host, $port) = @_;

        $hdl = AnyEvent::Handle->new(
            fh => $fh,
        );

        $hdl->push_read(
            line => sub {
                my (undef, $line) = @_;

                # Disk\t/=382MB;15264;15269;; /var=218MB;9443;9448
                my ($measurement, $perfstring) = split(/\t/, $line);

                my @perfdata
                    = Monitoring::Plugin::Performance->parse_perfstring($perfstring);

                $db->write(
                    database => 'mydb',
                    data => [
                        map {
                            +{
                                measurement => $measurement,
                                tags => {
                                    label => $_->label,
                                },
                                fields => {
                                    value => $_->value,
                                    uom => '"'. $_->uom .'"',
                                },
                            }
                        } @perfdata
                    ],
                    on_success => sub { print "$line written\n"; },
                    on_error => sub { print "$line error: @_\n"; },
                );

                $hdl->on_drain(
                    sub {
                        $hdl->fh->close;
                        undef $hdl;
                    }
                );
            },
        );
    };

    EV::run;

=head1 DESCRIPTION

Asynchronous client library for InfluxDB time-series database L<https://influxdb.com>.

This version is meant to be used with InfluxDB v1.0.0 or newer.

=head1 METHODS

=head2 new

    my $db = AnyEvent::InfluxDB->new(
        server => 'http://localhost:8086',

        # authenticate using Basic credentials
        username => 'admin',
        password => 'password',

        # or use JWT token
        jwt => 'JWT_TOKEN_BLOB'
    );

Returns object representing given InfluDB C<server> connected using optionally
provided username C<username> and password C<password>.

Default value of C<server> is C<http://localhost:8086>.

If the server protocol is C<https> then by default no validation of remote
host certificate is performed. This can be changed by setting C<ssl_options>
parameter with any options accepted by L<AnyEvent::TLS>.

    my $db = AnyEvent::InfluxDB->new(
        ...
        ssl_options => {
            verify => 1,
            verify_peername => 'https',
            ca_file => '/path/to/cacert.pem',
        }
    );

As an debugging aid the C<on_request> code reference may also be provided. It will
be executed before each request with the method name, url and POST data if set.

    my $db = AnyEvent::InfluxDB->new(
        ...
        on_request => sub {
            my ($method, $url, $post_data) = @_;
            print "$method $url\n";
            print "$post_data\n" if $post_data;
        }
    );

=for Pod::Coverage has_jwt jwt has_on_request has_password has_ssl_options has_username on_request password server ssl_options username

=head2 ping

    $cv = AE::cv;
    $db->ping(
        wait_for_leader => 2,

        on_success => $cv,
        on_error => sub {
            $cv->croak("Failed to ping cluster leader: @_");
        }



( run in 0.773 second using v1.01-cache-2.11-cpan-e1769b4cff6 )