Cassandra-Client

 view release on metacpan or  search on metacpan

lib/Cassandra/Client/Connection.pm  view on Meta::CPAN

package Cassandra::Client::Connection;
our $AUTHORITY = 'cpan:TVDW';
$Cassandra::Client::Connection::VERSION = '0.21';
use 5.010;
use strict;
use warnings;
use vars qw/$BUFFER/;

use Ref::Util qw/is_blessed_ref is_plain_arrayref/;
use IO::Socket::INET;
use IO::Socket::INET6;
use Errno qw/EAGAIN/;
use Socket qw/SOL_SOCKET IPPROTO_TCP SO_KEEPALIVE TCP_NODELAY/;
use Scalar::Util qw/weaken/;
use Net::SSLeay qw/ERROR_WANT_READ ERROR_WANT_WRITE ERROR_NONE/;

use Cassandra::Client::Util;
use Cassandra::Client::Protocol qw/
    :constants
    %consistency_lookup
    %batch_type_lookup
    pack_bytes
    pack_longstring
    pack_queryparameters
    pack_shortbytes
    pack_stringmap
    pack_stringlist
    unpack_bytes
    unpack_errordata
    unpack_inet
    unpack_int
    unpack_metadata
    unpack_shortbytes
    unpack_string
    unpack_stringlist
    unpack_stringmultimap
/;
use Cassandra::Client::Error::Base;
use Cassandra::Client::ResultSet;
use Cassandra::Client::TLSHandling;

use constant STREAM_ID_LIMIT => 32768;

# Populated at BEGIN{} time
my @compression_preference;
my %available_compression;

sub new {
    my ($class, %args)= @_;

    my $self= bless {
        client          => $args{client},
        async_io        => $args{async_io},
        pool_id         => undef,

        options         => $args{options},
        request_timeout => $args{options}{request_timeout},
        host            => $args{host},
        metadata        => $args{metadata},
        prepare_cache   => $args{metadata}->prepare_cache,
        last_stream_id  => 0,
        pending_streams => {},
        in_prepare      => {},

        decompress_func => undef,
        compress_func   => undef,
        connected       => 0,
        connecting      => undef,
        socket          => undef,
        fileno          => undef,
        pending_write   => undef,
        shutdown        => 0,
        read_buffer     => \(my $empty= ''),
        bytes_sent      => 0,
        bytes_read      => 0,

        tls             => undef,
        tls_want_write  => undef,

        healthcheck     => undef,
        protocol_version => $args{options}{protocol_version},
    }, $class;
    weaken($self->{async_io});
    weaken($self->{client});
    return $self;
}

sub get_local_status {
    my ($self, $callback)= @_;

    series([
        sub {
            my ($next)= @_;
            $self->execute_prepared($next, \"select key, data_center, host_id, broadcast_address, rack, release_version, tokens, schema_version from system.local");
        },
        sub {
            my ($next, $result)= @_;

            my %local_status= map { $_->[3] => {
                peer => $_->[3],
                data_center => $_->[1],
                host_id => $_->[2],
                preferred_ip => $_->[3],
                rack => $_->[4],
                release_version => $_->[5],
                tokens => $_->[6],
                schema_version => $_->[7],
            } } @{$result->rows};

            $next->(undef, \%local_status);
        },
    ], $callback);

    return;
}

sub get_peers_status {
    my ($self, $callback)= @_;

    series([
        sub {
            my ($next)= @_;
            $self->execute_prepared($next, \"select peer, data_center, host_id, preferred_ip, rack, release_version, tokens, schema_version from system.peers");
        },
        sub {
            my ($next, $result)= @_;

            my %network_status= map { $_->[0] => {
                peer => $_->[0],
                data_center => $_->[1],
                host_id => $_->[2],
                preferred_ip => $_->[3],
                rack => $_->[4],
                release_version => $_->[5],
                tokens => $_->[6],
                schema_version => $_->[7],
            } } @{$result->rows};

            $next->(undef, \%network_status);
        },
    ], $callback);

    return;
}



( run in 1.445 second using v1.01-cache-2.11-cpan-39bf76dae61 )