AnyEvent-ClickHouse
view release on metacpan or search on metacpan
lib/AnyEvent/ClickHouse.pm view on Meta::CPAN
package AnyEvent::ClickHouse;
use 5.010000;
use strict;
no strict 'refs';
use warnings;
our $VERSION = '0.031';
use vars qw(@ISA @EXPORT);
our @ISA = qw(Exporter);
our @EXPORT = qw(clickhouse_do clickhouse_select clickhouse_select_array clickhouse_select_hash);
use AnyEvent;
use AnyEvent::HTTP;
use URI;
use URI::QueryParam;
use Scalar::Util qw/looks_like_number/;
use Data::Dumper;
our $headers;
$headers->{'User-agent'} = 'Mozilla/5.0 (compatible; U; Perl-AnyEvent-ClickHouse;)';
$headers->{'Te'} = undef;
$headers->{'Referer'} = undef;
$headers->{'Connection'} = 'Keep-Alive';
sub _init {
my $param = shift;
my %_param = (
host => '127.0.0.1',
port => 8123,
database => 'default',
user => undef,
password => undef
);
foreach my $_key ( keys %_param ) {
unless ($param->{$_key}){
$param->{$_key} = $_param{$_key};
}
}
unless ($param->{uri}) {
my $_uri = URI->new(sprintf ("http://%s:%d/",$param->{host},$param->{port}));
$_uri->query_param('user' => $param->{user}) if $param->{user};
$_uri->query_param('password' => $param->{password}) if $param->{password};
$_uri->query_param('database' => $param->{database});
$param->{uri} = $_uri;
}
return $param;
}
sub _data_prepare {
my $self = shift;
my @_rows = map { [@$_] } @_;
foreach my $row (@_rows) {
foreach my $val (@$row) {
unless (defined ($val)) {
$val = qq{''};
}
elsif (ref($val) eq 'ARRAY') {
$val = q{'}.join ("','", @$val).q{'};
}
elsif (defined ($val) && !looks_like_number($val)) {
$val =~ s/\\/\\\\/g;
$val =~ s/'/\\'/g;
$val = qq{'$val'};
}
}
}
return scalar @_rows ? join ",", map { "(".join (",", @{ $_ }).")" } @_rows : "\n";
}
sub clickhouse_do {
my $param = shift;
my $query = shift;
my $cb = shift;
my $err_cb = shift;
my $data = _data_prepare(@_);
$param = _init $param;
$param->{uri}->query_param('query' => $query);
http_request
POST => $param->{uri}->as_string(),
body => $data,
headers => $headers,
persistent => 1,
keepalive => 1,
sub {
my $data = shift;
my $hdr = shift;
my $status = $hdr->{Status};
if ($status == 200){
# do ok
if (defined $cb && ref $cb eq 'CODE') {
$cb->($data);
}
}
else {
# 500 error
if (defined $err_cb && ref $err_cb eq 'CODE') {
# if defined err cb func
$err_cb->($data);
}
}
}
;
}
sub _select {
my $format = shift;
my $param = shift;
my $query = shift;
my $cb = shift;
my $err_cb = shift;
$param = _init $param;
$param->{uri}->query_param('query' => $query);
http_request
GET => $param->{uri}->as_string(),
headers => $headers,
( run in 1.746 second using v1.01-cache-2.11-cpan-39bf76dae61 )