Cassandra-Client
view release on metacpan or search on metacpan
lib/Cassandra/Client.pm view on Meta::CPAN
sub _command_slowpath {
my ($self, $command, $callback, $args, $command_info)= @_;
$self->{active_queries}++;
series([
sub {
my ($next)= @_;
$self->_connect($next);
}, sub {
my ($next)= @_;
$self->{pool}->get_one_cb($next);
}, sub {
my ($next, $connection)= @_;
if (my $error= $self->{throttler}->should_fail()) {
return $next->($error);
}
$connection->$command($next, @$args);
}
], sub {
my ($error, $result)= @_;
$self->{throttler}->count($error);
$self->{active_queries}--;
$self->_schedule_command_dequeue if $self->{command_queue}{has_any};
return $self->_command_failed($command, $callback, $args, $command_info, $error) if $error;
$self->_report_stats($command, $command_info);
return _cb($callback, $error, $result);
});
return;
}
sub _command_retry {
my ($self, $command, $callback, $args, $command_info)= @_;
$command_info->{retries}++;
my $delay= 0.1 * (2 ** $command_info->{retries});
$self->{async_io}->timer(sub {
if ($self->{active_queries} >= $self->{options}{max_concurrent_queries}) {
$self->_command_enqueue($command, $callback, $args, $command_info);
} else {
$self->_command_slowpath($command, $callback, $args, $command_info);
}
}, $delay);
}
sub _command_failed {
my ($self, $command, $callback, $args, $command_info, $error)= @_;
if (is_ref($error)) {
my $retry_decision;
my $statement = $command eq 'execute_prepared' ? {idempotent => $args->[2]->{idempotent}} : {};
if ($error->do_retry) {
$retry_decision= Cassandra::Client::Policy::Retry::retry;
} elsif ($error->is_request_error) {
$retry_decision= $self->{retry_policy}->on_request_error($statement, undef, $error, ($command_info->{retries}||0));
} elsif ($error->isa('Cassandra::Client::Error::WriteTimeoutException')) {
$retry_decision= $self->{retry_policy}->on_write_timeout($statement, $error->cl, $error->write_type, $error->blockfor, $error->received, ($command_info->{retries}||0));
} elsif ($error->isa('Cassandra::Client::Error::ReadTimeoutException')) {
$retry_decision= $self->{retry_policy}->on_read_timeout($statement, $error->cl, $error->blockfor, $error->received, $error->data_retrieved, ($command_info->{retries}||0));
} elsif ($error->isa('Cassandra::Client::Error::UnavailableException')) {
$retry_decision= $self->{retry_policy}->on_unavailable($statement, $error->cl, $error->required, $error->alive, ($command_info->{retries}||0));
} else {
$retry_decision= Cassandra::Client::Policy::Retry::rethrow;
}
if ($retry_decision && $retry_decision eq 'retry') {
return $self->_command_retry($command, $callback, $args, $command_info);
}
}
$self->_report_stats($command, $command_info);
return $callback->($error);
}
sub _command_enqueue {
my ($self, $command, $callback, $args, $command_info)= @_;
if (my $error= $self->{command_queue}->enqueue([$command, $callback, $args, $command_info])) {
return $self->_command_failed($command, $callback, $args, $command_info, "Cannot $command: $error");
}
return;
}
sub _schedule_command_dequeue {
my ($self)= @_;
unless ($self->{command_callback_scheduled}++) {
$self->{async_io}->later(sub {
delete $self->{command_callback_scheduled};
while ($self->{command_queue}{has_any} && $self->{active_queries} < $self->{options}{max_concurrent_queries}) {
my $item= $self->{command_queue}->dequeue or return;
$self->_command_slowpath(@$item);
}
});
}
}
sub _report_stats {
my ($self, $command, $command_info)= @_;
$command_info->{end_time}= Time::HiRes::time();
if (my $stats_hook= $self->{options}{stats_hook}) {
_cb($stats_hook, timing => {
command => $command,
start_time => $command_info->{start_time},
end_time => $command_info->{end_time},
});
}
}
# Utility functions that wrap query functions
sub _each_page {
my ($self, $callback, $query, $params, $attribs, $page_callback)= @_;
my $params_copy= $params ? clone($params) : undef;
my $attribs_copy= $attribs ? clone($attribs) : undef;
my $done= 0;
( run in 2.836 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )