view release on metacpan or search on metacpan
lib/Cassandra/Client.pm view on Meta::CPAN
});
return;
}
sub _command_retry {
my ($self, $command, $callback, $args, $command_info)= @_;
$command_info->{retries}++;
my $delay= 0.1 * (2 ** $command_info->{retries});
$self->{async_io}->timer(sub {
if ($self->{active_queries} >= $self->{options}{max_concurrent_queries}) {
$self->_command_enqueue($command, $callback, $args, $command_info);
} else {
$self->_command_slowpath($command, $callback, $args, $command_info);
}
}, $delay);
}
sub _command_failed {
my ($self, $command, $callback, $args, $command_info, $error)= @_;
lib/Cassandra/Client.pm view on Meta::CPAN
Default value of the C<idempotent> query attribute that indicates if a write query may be retried without harm. It defaults to false.
=item max_page_size
Default max page size to pass to the server. This defaults to C<5000>. Note that large values can cause trouble on Cassandra. Can be overridden by passing C<page_size> in query attributes.
=item max_connections
Maximum amount of connections to keep open in the Cassandra connection pool. Defaults to C<2> for historical reasons, raise this if appropriate.
=item timer_granularity
Timer granularity used for timeouts. Defaults to C<0.1> (100ms). Change this if you're setting timeouts to values lower than a second.
=item request_timeout
Maximum time to wait for a query, in seconds. Defaults to C<11>.
=item warmup
Whether to connect to the full cluster in C<connect()>, or delay that until queries come in.
lib/Cassandra/Client/AsyncAnyEvent.pm view on Meta::CPAN
use vars qw/@TIMEOUTS/;
sub new {
my ($class, %args)= @_;
my $options= $args{options};
require AnyEvent;
return bless {
timer_granularity => ($options->{timer_granularity} || 0.1),
ae_read => {},
ae_write => {},
ae_timeout => undef,
fh_to_obj => {},
timeouts => [],
}, $class;
}
sub register {
my ($self, $fh, $connection)= @_;
lib/Cassandra/Client/AsyncAnyEvent.pm view on Meta::CPAN
undef $self->{ae_write}{$fh};
return;
}
sub deadline {
my ($self, $fh, $id, $timeout)= @_;
local *TIMEOUTS= $self->{timeouts};
if (!$self->{ae_timeout}) {
$self->{ae_timeout}= AnyEvent->timer(
after => $self->{timer_granularity},
interval => $self->{timer_granularity},
cb => sub { $self->handle_timeouts(Time::HiRes::clock_gettime(CLOCK_MONOTONIC)) },
);
}
my $curtime= Time::HiRes::clock_gettime(CLOCK_MONOTONIC);
my $deadline= $curtime + $timeout;
my $additem= [ $deadline, $fh, $id, 0 ];
if (@TIMEOUTS && $TIMEOUTS[-1][0] > $deadline) {
# Grumble... that's slow
lib/Cassandra/Client/AsyncAnyEvent.pm view on Meta::CPAN
}
}
if (!@TIMEOUTS) {
$self->{ae_timeout}= undef;
}
return;
}
sub timer {
my ($self, $callback, $wait)= @_;
my $t; $t= AE::timer($wait, 0, sub {
undef $t;
$callback->();
});
}
sub later {
my ($self, $callback)= @_;
&AE::postpone($callback);
}
lib/Cassandra/Client/AsyncEV.pm view on Meta::CPAN
use vars qw/@TIMEOUTS/;
sub new {
my ($class, %args)= @_;
my $options= $args{options};
require EV;
return bless {
timer_granularity => ($options->{timer_granularity} || 0.1),
ev_read => {},
ev_write => {},
ev_timeout => undef,
fh_to_obj => {},
timeouts => [],
ev => EV::Loop->new(),
}, $class;
}
sub register {
lib/Cassandra/Client/AsyncEV.pm view on Meta::CPAN
undef $self->{ev_write}{$fh};
return;
}
sub deadline {
my ($self, $fh, $id, $timeout)= @_;
local *TIMEOUTS= $self->{timeouts};
if (!$self->{ev_timeout}) {
$self->{ev_timeout}= $self->{ev}->timer( $self->{timer_granularity}, $self->{timer_granularity}, sub {
$self->handle_timeouts(Time::HiRes::clock_gettime(CLOCK_MONOTONIC));
} );
}
my $curtime= Time::HiRes::clock_gettime(CLOCK_MONOTONIC);
my $deadline= $curtime + $timeout;
my $additem= [ $deadline, $fh, $id, 0 ];
if (@TIMEOUTS && $TIMEOUTS[-1][0] > $deadline) {
# Grumble... that's slow
lib/Cassandra/Client/AsyncEV.pm view on Meta::CPAN
}
}
if (!@TIMEOUTS) {
$self->{ev_timeout}= undef;
}
return;
}
sub timer {
my ($self, $callback, $wait)= @_;
my $t; $t= $self->{ev}->timer($wait, 0, sub {
undef $t;
$callback->();
});
}
sub later {
my ($self, $callback)= @_;
$self->timer($callback, 0);
}
# $something->($async->wait(my $w)); my ($error, $result)= $w->();
sub wait {
my ($self)= @_;
my $output= \$_[1];
my ($done, $in_run);
my @output;
my $callback= sub {
lib/Cassandra/Client/Config.pm view on Meta::CPAN
anyevent => 0,
contact_points => undef,
port => 9042,
cql_version => undef,
keyspace => undef,
compression => undef,
default_consistency => undef,
default_idempotency => 0,
max_page_size => 5000,
max_connections => 2,
timer_granularity => 0.1,
request_timeout => 11,
warmup => 0,
max_concurrent_queries => 1000,
tls => 0,
protocol_version => 4,
throttler => undef,
command_queue => undef,
retry_policy => undef,
load_balancing_policy => undef,
lib/Cassandra/Client/Config.pm view on Meta::CPAN
} else { die "contact_points not specified"; }
# Booleans
for (qw/anyevent warmup tls default_idempotency/) {
if (exists($config->{$_})) {
$self->{$_}= !!$config->{$_};
}
}
# Numbers, ignore undef
for (qw/port timer_granularity request_timeout max_connections max_concurrent_queries/) {
if (defined($config->{$_})) {
$self->{$_}= 0+ $config->{$_};
}
}
# Numbers, undef actually means undef
for (qw/max_page_size/) {
if (exists($config->{$_})) {
$self->{$_}= defined($config->{$_}) ? (0+ $config->{$_}) : undef;
}
}
lib/Cassandra/Client/Connection.pm view on Meta::CPAN
return $whilst_next->($error) if $error;
my %versions;
$versions{$_->{schema_version}}= 1 for values %$network_status;
if (keys %versions > 1) {
if ($waited >= $max_wait) {
return $whilst_next->("wait_for_schema_agreement timed out after $waited seconds");
}
$waited += $wait_delay;
return $self->{async_io}->timer($whilst_next, $wait_delay);
} else {
$done= 1;
return $whilst_next->();
}
});
},
$callback,
);
return;
lib/Cassandra/Client/NetworkStatus.pm view on Meta::CPAN
sub { # condition
!$self->{shutdown} && !$self->{master_id}
},
sub { # while
my ($wnext)= @_;
series([
sub {
my ($next)= @_;
if ($attempts++) {
# Don't retry immediately
$self->{async_io}->timer($next, 1);
} else {
$next->();
}
},
sub {
my ($next)= @_;
$pool->get_one_cb($next);
},
sub {
my ($next, $connection)= @_;