ClickHouse

 view release on metacpan or  search on metacpan

lib/ClickHouse.pm  view on Meta::CPAN

package ClickHouse;

use 5.010;
use strict;
use warnings FATAL => 'all';

our $VERSION = '0.05';

use Net::HTTP;
use URI;
use URI::Escape;
use URI::QueryParam;
use Carp;
use Scalar::Util qw/looks_like_number/;
use Try::Tiny;

our $AUTOLOAD;

sub new {
    my ($class, %opts) = @_;
    my $self = bless {}, $class;
    $self->_init(%opts);
    return $self;
}

{
    my %_attrs = (
        '_host'       => 'localhost',
        '_port'       => 8123,
        '_database'   => 'default',
        '_user'       => '',
        '_password'   => '',
        '_keep_alive' => 1,
        '_format'     => 'TabSeparated',
        '_socket'     => undef,
        '_uri'        => undef,
        '_timeout'    => 30,
    );

    #
    # CLASS METHODS
    #
    # Returns a copy of the instance.
    sub _clone {
        my ($self)  = @_;
        my ($clone) = {%$self};
        bless( $clone, ref $self );
        return ($clone);
    }

    # Verify that an attribute is valid (called by the AUTOLOAD sub)
    sub _accessible {
        my ( $self, $name ) = @_;
        if ( exists $_attrs{$name} ) {

            #$self->verbose("attribute $name is valid");
            return 1;
        }
        else { return 0; }
    }

    # Initialize the object (only called by the constructor)
    sub _init {
        my ( $self, %args ) = @_;

        foreach my $key ( keys %_attrs ) {
            $key =~ s/^_+//;
            if ( defined ($args{$key}) && $self->_accessible( "_$key" ) ) {
                $self->{"_$key"} = $args{$key};
            }
            else {
                $self->{"_$key"} = $_attrs{"_$key"};
            }
        }
        $self->{'_builder'} = \&_builder;

        $self->_connect();

        return 1;
    }

    sub _builder {
        my ($self) = @_;
        delete $self->{'_socket'};
        delete $self->{'_uri'};

        # create Net::HTTP object
        my $socket = Net::HTTP->new(
            'Host'        => $self->{'_host'},
            'PeerPort'    => $self->{'_port'},
            'HTTPVersion' => '1.1',
            'KeepAlive'   => $self->{'_keep_alive'},
            'Timeout'     => $self->{'_timeout'},

        ) or die "Can't connect: $@";

        # create URI object
        my $uri = URI->new(sprintf ("/?database=%s", $self->{'_database'}));
        $uri->query_param('user' => $self->{'_user'}) if $self->{'_user'};
        $uri->query_param('password' => $self->{'_password'}) if $self->{'_password'};

        $self->{'_socket'} = $socket;
        $self->{'_uri'} = $uri;

        return 1;

    }

    sub _connect {
        my ($self) = @_;
        $self->_builder($self);
        return 1;
    }

    sub _query {
        my ($self, $cb) = @_;
        return &try (
            $cb,
            catch {
                $self->_connect();
                $cb->();
            }
        );
    }
}

sub ClickHouse::AUTOLOAD {
    no strict 'refs';
    my ( $self, $value ) = @_;
    if ( ( $AUTOLOAD =~ /.*::_get(_\w+)/ ) && ( $self->_accessible($1) ) ) {
        my $attr_name = $1;
        *{$AUTOLOAD} = sub { return $_[0]->{$attr_name} };
        return ( $self->{$attr_name} );
    }
    if ( $AUTOLOAD =~ /.*::_set(_\w+)/ && $self->_accessible($1) ) {
        my $attr_name = $1;
        *{$AUTOLOAD} = sub { $_[0]->{$attr_name} = $_[1]; return; };
        $self->{$1} = $value;
        return;
    }
    croak "No such method: $AUTOLOAD";
}

sub DESTROY {}

sub disconnect {
    my ($self) = @_;
    if (my $socket = $self->_get_socket()) {
        $socket->keep_alive(0);
        $self->ping();
    }
    return 1;
}



sub select {
    my ($self, $query) = @_;
    return $self->_query(sub {
        my $method;
        my $query_url;
        my @post_data = ();
        if (length ($query) <= 7000) {
            $query_url = $self->_construct_query_uri( $query );
            $method = 'GET';
        }
        else {
            $query_url = $self->_get_uri()->clone();
            $method = 'POST';
            push @post_data, $query;
        }


        $self->_get_socket()->write_request( $method => $query_url, @post_data );
        return $self->_parse_response($query);
    });
}

sub select_value {
    my ($self, $query) = @_;

    my $arrayref = $self->select($query);
    return $arrayref->[0]->[0];
}

sub do {
    my ($self, $query, @rows) = @_;
    return $self->_query(sub {
        my @prepared_rows = $self->_prepare_query(@rows);
        my $query_url = $self->_construct_query_uri($query);
        my $post_data = scalar @prepared_rows ? join (",", map { "(" . join (",", @{ $_ }) . ")" } @prepared_rows) : "\n" ;

        $self->_get_socket()->write_request('POST' => $query_url, $post_data);
        return $self->_parse_response($query);
    });

}

sub ping {
    my ($self) = @_;

    my ($code) = eval {
        $self->_get_socket()->write_request('GET' => '/');
        $self->_get_socket()->read_response_headers();
    };

    if ($@) {
        return 0;
    }



( run in 1.435 second using v1.01-cache-2.11-cpan-39bf76dae61 )