AnyEvent-ClickHouse
view release on metacpan or search on metacpan
lib/AnyEvent/ClickHouse.pm view on Meta::CPAN
my %_param = (
host => '127.0.0.1',
port => 8123,
database => 'default',
user => undef,
password => undef
);
foreach my $_key ( keys %_param ) {
unless ($param->{$_key}){
$param->{$_key} = $_param{$_key};
}
}
unless ($param->{uri}) {
my $_uri = URI->new(sprintf ("http://%s:%d/",$param->{host},$param->{port}));
$_uri->query_param('user' => $param->{user}) if $param->{user};
$_uri->query_param('password' => $param->{password}) if $param->{password};
$_uri->query_param('database' => $param->{database});
$param->{uri} = $_uri;
}
return $param;
}
sub _data_prepare {
my $self = shift;
my @_rows = map { [@$_] } @_;
foreach my $row (@_rows) {
foreach my $val (@$row) {
unless (defined ($val)) {
$val = qq{''};
}
elsif (ref($val) eq 'ARRAY') {
$val = q{'}.join ("','", @$val).q{'};
}
elsif (defined ($val) && !looks_like_number($val)) {
$val =~ s/\\/\\\\/g;
$val =~ s/'/\\'/g;
$val = qq{'$val'};
}
}
}
return scalar @_rows ? join ",", map { "(".join (",", @{ $_ }).")" } @_rows : "\n";
}
sub clickhouse_do {
my $param = shift;
my $query = shift;
my $cb = shift;
my $err_cb = shift;
my $data = _data_prepare(@_);
$param = _init $param;
$param->{uri}->query_param('query' => $query);
http_request
POST => $param->{uri}->as_string(),
body => $data,
headers => $headers,
persistent => 1,
keepalive => 1,
sub {
my $data = shift;
my $hdr = shift;
my $status = $hdr->{Status};
if ($status == 200){
# do ok
if (defined $cb && ref $cb eq 'CODE') {
$cb->($data);
}
}
else {
# 500 error
if (defined $err_cb && ref $err_cb eq 'CODE') {
# if defined err cb func
$err_cb->($data);
}
}
}
;
}
sub _select {
my $format = shift;
my $param = shift;
my $query = shift;
my $cb = shift;
my $err_cb = shift;
$param = _init $param;
$param->{uri}->query_param('query' => $query);
http_request
GET => $param->{uri}->as_string(),
headers => $headers,
persistent => 1,
keepalive => 1,
sub {
my $data = shift;
my $hdr = shift;
my $status = $hdr->{Status};
if ($status == 200){
# select ok
unless ($format eq 'raw') {
$data = $format->($data);
}
$cb->($data);
}
else {
# 500 error
if (defined $err_cb && ref $err_cb eq 'CODE') {
# if defined err cb func
$err_cb->($data);
}
}
}
;
}
sub clickhouse_select {
unshift @_, 'raw';
&_select;
}
sub clickhouse_select_array {
unshift @_, sub {
my @data = split /\n/, shift;
return [ map { [ split (/\t/) ] } @data ];
};
&_select;
}
sub clickhouse_select_hash {
my $param = shift;
my $query = shift;
$query .= ' FORMAT TabSeparatedWithNames';
unshift @_, $query;
unshift @_, $param;
unshift @_, sub {
my @data = split /\n/, shift;
my @_response = @{[ map { [ split (/\t/) ] } @data ]};
my $response;
my $key = shift @_response;
for (0..$#_response) {
my $row = $_;
for (0..$#{$_response[$row]}) {
my $col = $_;
$response->[$row]->{"".$key->[$col].""} = $_response[$row][$_];
}
}
return $response;
};
&_select;
}
( run in 0.683 second using v1.01-cache-2.11-cpan-df04353d9ac )