Cassandra-Client
view release on metacpan or search on metacpan
lib/Cassandra/Client.pm view on Meta::CPAN
our $XS_VERSION = ($Cassandra::Client::VERSION || '');
$XS_VERSION =~ s/\A(\d+)\.(\d+)(\d{3})\z/$1.$2_$3/;
XSLoader::load(__PACKAGE__, $XS_VERSION);
sub new {
my ($class, %args)= @_;
my $self= bless {
connected => 0,
connect_callbacks => undef,
shutdown => 0,
active_queries => 0,
}, $class;
my $options= Cassandra::Client::Config->new(
\%args
);
$self->{throttler}= $options->{throttler} || Cassandra::Client::Policy::Throttle::Default->new();
lib/Cassandra/Client.pm view on Meta::CPAN
sub _connect {
my ($self, $callback)= @_;
return _cb($callback) if $self->{connected};
return _cb($callback, 'Cannot connect: shutdown() has been called') if $self->{shutdown};
# This is ONLY useful if the user doesn't throw away the C::C object on connect errors.
if (!$self->{connecting} && (my $error= $self->{throttler}->should_fail())) {
return _cb($callback, $error);
}
push @{$self->{connect_callbacks}||=[]}, $callback;
if ($self->{connecting}++) {
return;
}
my @contact_points= shuffle @{$self->{options}{contact_points}};
my $last_error= "No hosts to connect to";
my $next_connect;
$next_connect= sub {
my $contact_point= shift @contact_points;
if (!$contact_point) {
delete $self->{connecting};
undef $next_connect;
_cb($_, "Unable to connect to any Cassandra server. Last error: $last_error") for @{delete $self->{connect_callbacks}};
return;
}
my $connection= Cassandra::Client::Connection->new(
client => $self,
options => $self->{options},
host => $contact_point,
async_io => $self->{async_io},
metadata => $self->{metadata},
);
lib/Cassandra/Client.pm view on Meta::CPAN
my $error= shift;
$self->{throttler}->count($error);
if ($error) {
$last_error= "On $contact_point: $error";
return $next_connect->();
}
undef $next_connect;
$self->{connected}= 1;
delete $self->{connecting};
_cb($_) for @{delete $self->{connect_callbacks}};
});
};
$next_connect->();
return;
}
sub shutdown {
my ($self)= @_;
lib/Cassandra/Client.pm view on Meta::CPAN
All C<Cassandra::Client> methods are available as synchronous methods by using their normal names. For example, C<< $client->connect(); >> will block until the client has connected. Similarly, C<< $client->execute($query) >> will wait for the query r...
my $client= Cassandra::Client->new( ... );
$client->connect;
$client->execute("INSERT INTO my_table (id, value) VALUES (?, ?) USING TTL ?",
[ 1, "test", 86400 ],
{ consistency => "quorum" });
=head2 Promises
C<Cassandra::Client> methods are also available as promises (see perldoc L<AnyEvent::XSPromises>). This integrates well with other libraries that deal with promises or asynchronous callbacks. Note that for promises to work, C<AnyEvent> is required, a...
Promise variants are available by prefixing method names with C<async_>, eg. C<async_connect>, C<async_execute>, etc. The usual result of the method is passed to the promise's success handler, or to the failure handler if there was an error.
# Asynchronously pages through the result set, processing data as it comes in.
my $promise= $client->async_each_page("SELECT id, column FROM my_table WHERE id=?", [ 5 ], undef, sub {
for my $row (@{shift->rows}) {
my ($id, $column)= @$row;
say "Row: $id $column";
}
})->then(sub {
lib/Cassandra/Client/AsyncEV.pm view on Meta::CPAN
my ($done, $in_run);
my @output;
my $callback= sub {
$done= 1;
@output= @_;
$self->{ev}->break() if $in_run;
};
$$output= sub {
if ($self->{in_wait}) {
die "Unable to recursively wait for callbacks; are you doing synchronous Cassandra queries from asynchronous callbacks?";
}
local $self->{in_wait}= 1;
$in_run= 1;
$self->{ev}->run unless $done;
return @output;
};
return $callback;
}
lib/Cassandra/Client/Pool.pm view on Meta::CPAN
$connection->connect(sub {
my ($error)= @_;
delete $self->{connecting}{$host};
if ($error) {
$self->{policy}->set_disconnected($host);
if (my $waiters= delete $self->{wait_connect}) {
if ($self->{count} && @$waiters) {
warn 'We have callbacks waiting for a connection while we\'re connected';
}
my $max_conn= $self->{max_connections};
my $known_node_count= $self->{policy}->known_node_count;
my $max_attempts = ($max_conn < $known_node_count ? $max_conn : $known_node_count) + 1;
for my $waiter (@$waiters) {
if ((++$waiter->{attempts}) >= $max_attempts || !%{$self->{connecting}}) {
$waiter->{callback}->("Failed to connect to server: $error");
} else {
( run in 0.675 second using v1.01-cache-2.11-cpan-9b1e4054eb1 )