Cassandra-Client
view release on metacpan or search on metacpan
lib/Cassandra/Client/Connection.pm view on Meta::CPAN
use 5.010;
use strict;
use warnings;
use vars qw/$BUFFER/;
use Ref::Util qw/is_blessed_ref is_plain_arrayref/;
use IO::Socket::INET;
use IO::Socket::INET6;
use Errno qw/EAGAIN/;
use Socket qw/SOL_SOCKET IPPROTO_TCP SO_KEEPALIVE TCP_NODELAY/;
use Scalar::Util qw/weaken/;
use Net::SSLeay qw/ERROR_WANT_READ ERROR_WANT_WRITE ERROR_NONE/;
use Cassandra::Client::Util;
use Cassandra::Client::Protocol qw/
:constants
%consistency_lookup
%batch_type_lookup
pack_bytes
pack_longstring
pack_queryparameters
lib/Cassandra/Client/Connection.pm view on Meta::CPAN
read_buffer => \(my $empty= ''),
bytes_sent => 0,
bytes_read => 0,
tls => undef,
tls_want_write => undef,
healthcheck => undef,
protocol_version => $args{options}{protocol_version},
}, $class;
weaken($self->{async_io});
weaken($self->{client});
return $self;
}
sub get_local_status {
my ($self, $callback)= @_;
series([
sub {
my ($next)= @_;
$self->execute_prepared($next, \"select key, data_center, host_id, broadcast_address, rack, release_version, tokens, schema_version from system.local");
lib/Cassandra/Client/NetworkStatus.pm view on Meta::CPAN
package Cassandra::Client::NetworkStatus;
our $AUTHORITY = 'cpan:TVDW';
$Cassandra::Client::NetworkStatus::VERSION = '0.21';
use 5.010;
use strict;
use warnings;
use Scalar::Util qw/weaken/;
use Cassandra::Client::Util;
sub new {
my ($class, %args)= @_;
my $self= bless {
pool => $args{pool},
async_io => $args{async_io},
waiting_for_cb => [],
master_id => undef,
shutdown => undef,
}, $class;
weaken($self->{pool});
return $self;
}
sub init {
my ($self, $callback)= @_;
$self->select_master($callback);
}
sub select_master {
my ($self, $callback)= @_;
lib/Cassandra/Client/Pool.pm view on Meta::CPAN
package Cassandra::Client::Pool;
our $AUTHORITY = 'cpan:TVDW';
$Cassandra::Client::Pool::VERSION = '0.21';
use 5.010;
use strict;
use warnings;
use Scalar::Util 'weaken';
use Cassandra::Client::Util;
use Cassandra::Client::NetworkStatus;
sub new {
my ($class, %args)= @_;
my $self= bless {
client => $args{client},
options => $args{options},
metadata => $args{metadata},
max_connections => $args{options}{max_connections},
lib/Cassandra/Client/Pool.pm view on Meta::CPAN
list => [],
last_id => 0,
id2ip => {},
i => 0,
connecting => {},
wait_connect => [],
}, $class;
weaken($self->{client});
$self->{network_status}= Cassandra::Client::NetworkStatus->new(pool => $self, async_io => $args{async_io});
return $self;
}
sub init {
my ($self, $callback, $first_connection)= @_;
# This code can be called twice.
# If we didn't have a datacenter pinned before, now we do
sv_pvutf8n||5.006000|
sv_pvutf8||5.006000|
sv_pv||5.006000|
sv_recode_to_utf8||5.007003|
sv_reftype|||
sv_ref||5.015004|
sv_replace|||
sv_report_used|||
sv_resetpvn|||
sv_reset|||
sv_rvunweaken|||
sv_rvweaken||5.006000|
sv_set_undef|||
sv_sethek|||
sv_setiv_mg|5.004050||p
sv_setiv|||
sv_setnv_mg|5.006000||p
sv_setnv|||
sv_setpv_bufsize|||
sv_setpv_mg|5.004050||p
sv_setpvf_mg_nocontext|||pvn
sv_setpvf_mg|5.006000|5.004000|pv
t/20-memleak.t view on Meta::CPAN
#!perl
use 5.010;
use strict;
use warnings;
use File::Basename qw//; use lib File::Basename::dirname(__FILE__).'/lib';
use Test::More;
use TestCassandra;
use Cassandra::Client::Util qw/series parallel/;
use Scalar::Util 'weaken';
use Socket qw/PF_INET SOCK_STREAM/;
use Devel::Cycle;
use Data::Dumper;
plan skip_all => "Missing Cassandra test environment" unless TestCassandra->is_ok;
plan tests => 14;
{
# Weaken() sanity
my $h= {};
weaken($h);
ok(!$h) or diag("Our weaken() sucks.");
}
sub get_fd_sequence {
my ($count)= @_;
my @sockets;
for (1..$count) {
socket(my $sock, PF_INET, SOCK_STREAM, 0) or die $!;
push @sockets, $sock;
}
t/20-memleak.t view on Meta::CPAN
}
$client->execute("delete from $db.test_int where id=5");
{
my ($result)= $client->execute("select id, value from $db.test_int where id=5");
my $rows= $result->rows;
ok(@$rows == 0);
}
my @conns= values %{$client->{pool}{pool}};
weaken $_ for @conns;
ok(0+(grep $_, @conns));
ok(!$deinit);
$client->shutdown if rand() < 0.5;
weaken $client;
ok($deinit);
ok(!grep $_, @conns);
my @fd_sequence_done= get_fd_sequence(100);
if (join(',', @fd_sequence_init) ne join(',', @fd_sequence_init2)) {
ok(1) and diag('Disabling FD sequence checker, does not seem supported');
} elsif (! -d "/proc/$$/fd") {
ok(1) and diag('Disabling FD sequence checker, we don\'t have a useful /proc');
} else {
t/20-memleak.t view on Meta::CPAN
sub {
shift->($one);
},
sub {
shift->($one);
},
], sub {
$two->{abc}= 1;
});
weaken $one;
weaken $two;
ok(!$one);
ok(!$two);
}
# Test parallel()
{
my $one= {};
my $two= {};
parallel([
sub { shift->($one); },
sub { shift->($one); },
], sub {
$two->{abc}= 1;
});
weaken $one;
weaken $two;
ok(!$one);
ok(!$two);
}
1;
( run in 0.311 second using v1.01-cache-2.11-cpan-65fba6d93b7 )