Cassandra-Client
view release on metacpan or search on metacpan
lib/Cassandra/Client/Connection.pm view on Meta::CPAN
package Cassandra::Client::Connection;
our $AUTHORITY = 'cpan:TVDW';
$Cassandra::Client::Connection::VERSION = '0.21';
use 5.010;
use strict;
use warnings;
use vars qw/$BUFFER/;
use Ref::Util qw/is_blessed_ref is_plain_arrayref/;
use IO::Socket::INET;
use IO::Socket::INET6;
use Errno qw/EAGAIN/;
use Socket qw/SOL_SOCKET IPPROTO_TCP SO_KEEPALIVE TCP_NODELAY/;
use Scalar::Util qw/weaken/;
use Net::SSLeay qw/ERROR_WANT_READ ERROR_WANT_WRITE ERROR_NONE/;
use Cassandra::Client::Util;
use Cassandra::Client::Protocol qw/
:constants
%consistency_lookup
%batch_type_lookup
pack_bytes
pack_longstring
pack_queryparameters
pack_shortbytes
pack_stringmap
pack_stringlist
unpack_bytes
unpack_errordata
unpack_inet
unpack_int
unpack_metadata
unpack_shortbytes
unpack_string
unpack_stringlist
unpack_stringmultimap
/;
use Cassandra::Client::Error::Base;
use Cassandra::Client::ResultSet;
use Cassandra::Client::TLSHandling;
use constant STREAM_ID_LIMIT => 32768;
# Populated at BEGIN{} time
my @compression_preference;
my %available_compression;
sub new {
my ($class, %args)= @_;
my $self= bless {
client => $args{client},
async_io => $args{async_io},
pool_id => undef,
options => $args{options},
request_timeout => $args{options}{request_timeout},
host => $args{host},
metadata => $args{metadata},
prepare_cache => $args{metadata}->prepare_cache,
last_stream_id => 0,
pending_streams => {},
in_prepare => {},
decompress_func => undef,
compress_func => undef,
connected => 0,
connecting => undef,
socket => undef,
fileno => undef,
pending_write => undef,
shutdown => 0,
read_buffer => \(my $empty= ''),
bytes_sent => 0,
bytes_read => 0,
tls => undef,
tls_want_write => undef,
healthcheck => undef,
protocol_version => $args{options}{protocol_version},
}, $class;
weaken($self->{async_io});
weaken($self->{client});
return $self;
}
sub get_local_status {
my ($self, $callback)= @_;
series([
sub {
my ($next)= @_;
$self->execute_prepared($next, \"select key, data_center, host_id, broadcast_address, rack, release_version, tokens, schema_version from system.local");
},
sub {
my ($next, $result)= @_;
my %local_status= map { $_->[3] => {
peer => $_->[3],
data_center => $_->[1],
host_id => $_->[2],
preferred_ip => $_->[3],
rack => $_->[4],
release_version => $_->[5],
tokens => $_->[6],
schema_version => $_->[7],
} } @{$result->rows};
$next->(undef, \%local_status);
},
], $callback);
return;
}
sub get_peers_status {
my ($self, $callback)= @_;
series([
sub {
my ($next)= @_;
$self->execute_prepared($next, \"select peer, data_center, host_id, preferred_ip, rack, release_version, tokens, schema_version from system.peers");
},
sub {
my ($next, $result)= @_;
my %network_status= map { $_->[0] => {
peer => $_->[0],
data_center => $_->[1],
host_id => $_->[2],
preferred_ip => $_->[3],
rack => $_->[4],
release_version => $_->[5],
tokens => $_->[6],
schema_version => $_->[7],
} } @{$result->rows};
$next->(undef, \%network_status);
},
], $callback);
return;
}
( run in 1.445 second using v1.01-cache-2.11-cpan-39bf76dae61 )