Cassandra-Client

 view release on metacpan or  search on metacpan

lib/Cassandra/Client/Connection.pm  view on Meta::CPAN

use 5.010;
use strict;
use warnings;
use vars qw/$BUFFER/;

use Ref::Util qw/is_blessed_ref is_plain_arrayref/;
use IO::Socket::INET;
use IO::Socket::INET6;
use Errno qw/EAGAIN/;
use Socket qw/SOL_SOCKET IPPROTO_TCP SO_KEEPALIVE TCP_NODELAY/;
use Scalar::Util qw/weaken/;
use Net::SSLeay qw/ERROR_WANT_READ ERROR_WANT_WRITE ERROR_NONE/;

use Cassandra::Client::Util;
use Cassandra::Client::Protocol qw/
    :constants
    %consistency_lookup
    %batch_type_lookup
    pack_bytes
    pack_longstring
    pack_queryparameters

lib/Cassandra/Client/Connection.pm  view on Meta::CPAN

        read_buffer     => \(my $empty= ''),
        bytes_sent      => 0,
        bytes_read      => 0,

        tls             => undef,
        tls_want_write  => undef,

        healthcheck     => undef,
        protocol_version => $args{options}{protocol_version},
    }, $class;
    weaken($self->{async_io});
    weaken($self->{client});
    return $self;
}

sub get_local_status {
    my ($self, $callback)= @_;

    series([
        sub {
            my ($next)= @_;
            $self->execute_prepared($next, \"select key, data_center, host_id, broadcast_address, rack, release_version, tokens, schema_version from system.local");

lib/Cassandra/Client/NetworkStatus.pm  view on Meta::CPAN

package Cassandra::Client::NetworkStatus;
our $AUTHORITY = 'cpan:TVDW';
$Cassandra::Client::NetworkStatus::VERSION = '0.21';
use 5.010;
use strict;
use warnings;

use Scalar::Util qw/weaken/;
use Cassandra::Client::Util;

sub new {
    my ($class, %args)= @_;

    my $self= bless {
        pool => $args{pool},
        async_io => $args{async_io},

        waiting_for_cb => [],
        master_id => undef,

        shutdown => undef,
    }, $class;
    weaken($self->{pool});
    return $self;
}

sub init {
    my ($self, $callback)= @_;
    $self->select_master($callback);
}

sub select_master {
    my ($self, $callback)= @_;

lib/Cassandra/Client/Pool.pm  view on Meta::CPAN

package Cassandra::Client::Pool;
our $AUTHORITY = 'cpan:TVDW';
$Cassandra::Client::Pool::VERSION = '0.21';
use 5.010;
use strict;
use warnings;

use Scalar::Util 'weaken';
use Cassandra::Client::Util;
use Cassandra::Client::NetworkStatus;

sub new {
    my ($class, %args)= @_;
    my $self= bless {
        client => $args{client},
        options => $args{options},
        metadata => $args{metadata},
        max_connections => $args{options}{max_connections},

lib/Cassandra/Client/Pool.pm  view on Meta::CPAN

        list => [],

        last_id => 0,
        id2ip => {},

        i => 0,

        connecting => {},
        wait_connect => [],
    }, $class;
    weaken($self->{client});
    $self->{network_status}= Cassandra::Client::NetworkStatus->new(pool => $self, async_io => $args{async_io});
    return $self;
}

sub init {
    my ($self, $callback, $first_connection)= @_;

    # This code can be called twice.

    # If we didn't have a datacenter pinned before, now we do

ppport.h  view on Meta::CPAN

sv_pvutf8n||5.006000|
sv_pvutf8||5.006000|
sv_pv||5.006000|
sv_recode_to_utf8||5.007003|
sv_reftype|||
sv_ref||5.015004|
sv_replace|||
sv_report_used|||
sv_resetpvn|||
sv_reset|||
sv_rvunweaken|||
sv_rvweaken||5.006000|
sv_set_undef|||
sv_sethek|||
sv_setiv_mg|5.004050||p
sv_setiv|||
sv_setnv_mg|5.006000||p
sv_setnv|||
sv_setpv_bufsize|||
sv_setpv_mg|5.004050||p
sv_setpvf_mg_nocontext|||pvn
sv_setpvf_mg|5.006000|5.004000|pv

t/20-memleak.t  view on Meta::CPAN

#!perl
use 5.010;
use strict;
use warnings;
use File::Basename qw//; use lib File::Basename::dirname(__FILE__).'/lib';
use Test::More;
use TestCassandra;
use Cassandra::Client::Util qw/series parallel/;
use Scalar::Util 'weaken';
use Socket qw/PF_INET SOCK_STREAM/;
use Devel::Cycle;
use Data::Dumper;

plan skip_all => "Missing Cassandra test environment" unless TestCassandra->is_ok;
plan tests => 14;

{
    # Weaken() sanity
    my $h= {};
    weaken($h);
    ok(!$h) or diag("Our weaken() sucks.");
}

sub get_fd_sequence {
    my ($count)= @_;
    my @sockets;
    for (1..$count) {
        socket(my $sock, PF_INET, SOCK_STREAM, 0) or die $!;
        push @sockets, $sock;
    }

t/20-memleak.t  view on Meta::CPAN

}

$client->execute("delete from $db.test_int where id=5");
{
    my ($result)= $client->execute("select id, value from $db.test_int where id=5");
    my $rows= $result->rows;
    ok(@$rows == 0);
}

my @conns= values %{$client->{pool}{pool}};
weaken $_ for @conns;
ok(0+(grep $_, @conns));

ok(!$deinit);
$client->shutdown if rand() < 0.5;
weaken $client;
ok($deinit);

ok(!grep $_, @conns);

my @fd_sequence_done= get_fd_sequence(100);
if (join(',', @fd_sequence_init) ne join(',', @fd_sequence_init2)) {
    ok(1) and diag('Disabling FD sequence checker, does not seem supported');
} elsif (! -d "/proc/$$/fd") {
    ok(1) and diag('Disabling FD sequence checker, we don\'t have a useful /proc');
} else {

t/20-memleak.t  view on Meta::CPAN

        sub {
            shift->($one);
        },
        sub {
            shift->($one);
        },
    ], sub {
        $two->{abc}= 1;
    });

    weaken $one;
    weaken $two;
    ok(!$one);
    ok(!$two);
}

# Test parallel()
{
    my $one= {};
    my $two= {};
    parallel([
        sub { shift->($one); },
        sub { shift->($one); },
    ], sub {
        $two->{abc}= 1;
    });

    weaken $one;
    weaken $two;
    ok(!$one);
    ok(!$two);
}

1;



( run in 0.311 second using v1.01-cache-2.11-cpan-65fba6d93b7 )