Cassandra-Client
view release on metacpan or search on metacpan
lib/Cassandra/Client.pm view on Meta::CPAN
$self->{throttler}->count($error);
$self->{active_queries}--;
$self->_schedule_command_dequeue if $self->{command_queue}{has_any};
return $self->_command_failed($command, $callback, $args, $command_info, $error) if $error;
$self->_report_stats($command, $command_info);
return _cb($callback, $error, $result);
}, @$args);
return;
SLOWPATH:
return $self->_command_slowpath($command, $callback, $args, $command_info);
OVERFLOW:
return $self->_command_enqueue($command, $callback, $args, $command_info);
}
sub _command_slowpath {
my ($self, $command, $callback, $args, $command_info)= @_;
$self->{active_queries}++;
series([
sub {
my ($next)= @_;
$self->_connect($next);
}, sub {
my ($next)= @_;
$self->{pool}->get_one_cb($next);
}, sub {
my ($next, $connection)= @_;
if (my $error= $self->{throttler}->should_fail()) {
return $next->($error);
}
$connection->$command($next, @$args);
}
], sub {
my ($error, $result)= @_;
$self->{throttler}->count($error);
$self->{active_queries}--;
$self->_schedule_command_dequeue if $self->{command_queue}{has_any};
return $self->_command_failed($command, $callback, $args, $command_info, $error) if $error;
$self->_report_stats($command, $command_info);
return _cb($callback, $error, $result);
});
return;
}
sub _command_retry {
my ($self, $command, $callback, $args, $command_info)= @_;
$command_info->{retries}++;
my $delay= 0.1 * (2 ** $command_info->{retries});
$self->{async_io}->timer(sub {
if ($self->{active_queries} >= $self->{options}{max_concurrent_queries}) {
$self->_command_enqueue($command, $callback, $args, $command_info);
} else {
$self->_command_slowpath($command, $callback, $args, $command_info);
}
}, $delay);
}
sub _command_failed {
my ($self, $command, $callback, $args, $command_info, $error)= @_;
if (is_ref($error)) {
my $retry_decision;
my $statement = $command eq 'execute_prepared' ? {idempotent => $args->[2]->{idempotent}} : {};
if ($error->do_retry) {
$retry_decision= Cassandra::Client::Policy::Retry::retry;
} elsif ($error->is_request_error) {
$retry_decision= $self->{retry_policy}->on_request_error($statement, undef, $error, ($command_info->{retries}||0));
} elsif ($error->isa('Cassandra::Client::Error::WriteTimeoutException')) {
$retry_decision= $self->{retry_policy}->on_write_timeout($statement, $error->cl, $error->write_type, $error->blockfor, $error->received, ($command_info->{retries}||0));
} elsif ($error->isa('Cassandra::Client::Error::ReadTimeoutException')) {
$retry_decision= $self->{retry_policy}->on_read_timeout($statement, $error->cl, $error->blockfor, $error->received, $error->data_retrieved, ($command_info->{retries}||0));
} elsif ($error->isa('Cassandra::Client::Error::UnavailableException')) {
$retry_decision= $self->{retry_policy}->on_unavailable($statement, $error->cl, $error->required, $error->alive, ($command_info->{retries}||0));
} else {
$retry_decision= Cassandra::Client::Policy::Retry::rethrow;
}
if ($retry_decision && $retry_decision eq 'retry') {
return $self->_command_retry($command, $callback, $args, $command_info);
}
}
$self->_report_stats($command, $command_info);
return $callback->($error);
}
sub _command_enqueue {
my ($self, $command, $callback, $args, $command_info)= @_;
if (my $error= $self->{command_queue}->enqueue([$command, $callback, $args, $command_info])) {
return $self->_command_failed($command, $callback, $args, $command_info, "Cannot $command: $error");
}
return;
}
sub _schedule_command_dequeue {
my ($self)= @_;
unless ($self->{command_callback_scheduled}++) {
$self->{async_io}->later(sub {
delete $self->{command_callback_scheduled};
while ($self->{command_queue}{has_any} && $self->{active_queries} < $self->{options}{max_concurrent_queries}) {
my $item= $self->{command_queue}->dequeue or return;
$self->_command_slowpath(@$item);
}
});
}
}
lib/Cassandra/Client.pm view on Meta::CPAN
);
$client->connect;
$client->each_page("SELECT id, column FROM my_table WHERE id=?", [ 5 ], undef, sub {
for my $row (@{shift->rows}) {
my ($id, $column)= @$row;
say "$id: $column";
}
});
=head1 METHODS
=over
=item Cassandra::Client->new(%options)
Create a new C<Cassandra::Client> instance, with the given options.
=over
=item contact_points
B<Required.> Arrayref of seed hosts to use when connecting. Specify more than one for increased reliability. This array is shuffled before use, so that random hosts are picked from the array.
=item keyspace
Default keyspace to use on all underlying connections. Can be overridden by querying for specific keyspaces, eg C<SELECT * FROM system.peers>.
=item anyevent
Should our internal event loop be based on AnyEvent, or should we just use our own? A true value means enable AnyEvent. Needed for promises to work.
=item port
Port number to use. Defaults to C<9042>.
=item cql_version
CQL version to use. Defaults to the version the server is running. Override only if your client has specific CQL requirements.
=item compression
Compression method to use. Defaults to the best available version, based on server and client support. Possible values are C<snappy>, C<lz4>, and C<none>.
=item default_consistency
Default consistency level to use. Defaults to C<one>. Can be overridden on a query basis as well, by passing a C<consistency> attribute.
=item default_idempotency
Default value of the C<idempotent> query attribute that indicates if a write query may be retried without harm. It defaults to false.
=item max_page_size
Default max page size to pass to the server. This defaults to C<5000>. Note that large values can cause trouble on Cassandra. Can be overridden by passing C<page_size> in query attributes.
=item max_connections
Maximum amount of connections to keep open in the Cassandra connection pool. Defaults to C<2> for historical reasons, raise this if appropriate.
=item timer_granularity
Timer granularity used for timeouts. Defaults to C<0.1> (100ms). Change this if you're setting timeouts to values lower than a second.
=item request_timeout
Maximum time to wait for a query, in seconds. Defaults to C<11>.
=item warmup
Whether to connect to the full cluster in C<connect()>, or delay that until queries come in.
=item protocol_version
Cassandra protocol version to use. Currently defaults to C<4>, can also be set to C<3> for compatibility with older versions of Cassandra.
=back
=item $client->batch($queries[, $attributes])
Run one or more queries, in a batch, on Cassandra. Queries must be specified as an arrayref of C<[$query, \@bind]> pairs.
Defaults to a I<logged> batch, which can be overridden by passing C<logged>, C<unlogged> or C<counter> as the C<batch_type> attribute.
$client->batch([
[ "INSERT INTO my_table (a, b) VALUES (?, ?)", [ $row1_a, $row1_b ] ],
[ "INSERT INTO my_table (a, b) VALUES (?, ?)", [ $row2_a, $row2_b ] ],
], { batch_type => "unlogged" });
=item $client->execute($query[, $bound_parameters[, $attributes]])
Executes a single query on Cassandra, and fetch the results (if any).
For queries that have large amounts of result rows and end up spanning multiple pages, C<each_page> is the function you need. C<execute> does not handle pagination, and may end up missing rows unless pagination is implemented by its user through the ...
$client->execute(
"UPDATE my_table SET column=:new_column WHERE id=:id",
{ new_column => 2, id => 5 },
{ consistency => "quorum" },
);
The C<idempotent> attribute indicates that the query is idempotent and may be retried without harm.
=item $client->each_page($query, $bound_parameters, $attributes, $page_callback)
Executes a query and invokes C<$page_callback> with each page of the results, represented as L<Cassandra::Client::ResultSet> objects.
# Downloads the entire table from the database, even if it's terabytes in size
$client->each_page( "SELECT id, column FROM my_table", undef, undef, sub {
my $page= shift;
for my $row (@{$page->rows}) {
say $row->[0];
}
});
=item $client->prepare($query)
Prepares a query on the server. C<execute> and C<each_page> already do this internally, so this method is only useful for preloading purposes (and to check whether queries even compile, I guess).
=item $client->shutdown()
( run in 1.260 second using v1.01-cache-2.11-cpan-140bd7fdf52 )