Cassandra-Client
view release on metacpan or search on metacpan
* Deduplicate parallel prepare() calls for the same query
* Deal with some of Perl's special variables like $"
* Avoid leaking file descriptors in some cases
* Support for User-Defined Types (UDT)
* Support for the 'Tuple' data type
0.10 2017/01/30
* Add a retry policy implementation for generic request failures
* Throttler: finally fix the feedback mechanism
* Rework shutdown() to not need asynchronous code
* Rework the pool management logic
* Support for UTF-8 authentication data
* Convenience method row_hashes() on a ResultSet
* Basic TLS support
* Make sure file handles always get closed when shutdown() is called
0.09 2016/11/15
* Fix support for Perl 5.20 and later
* Fix a class of memory leaks caused by recursion using closures
* Add support for TIME/DATE/TINYINT/SMALLINT data types added in CQL 3.3
* Avoid SIGPIPE when we connect to localhost but Cassandra is not present
* Add support for named parameters
0.08 2016/10/25
* Change event loop implementation to EV
* Redesign wait_for_schema_agreement, to actually wait for the schema agreement
* Support for proper retry settings
* Handle request backlogs by queueing them or failing them early
* Add stack traces to asynchronous queries with errors
* Fix handling of nested types, like list<frozen<map<int,boolean>>>
0.07 2016/10/10
* Experimental support for client-side throttling
* No more Perl 5.8 support -- 5.10 is now required
* Addresses an error that can occur when a query is retried against a node that doesn't know that query
* Fix a rare issue causing random "Request timed out" when utf8-flagged data is passed to non-utf8 types (ascii, blob)
0.06 2016/09/26
* Hopefully fixes support for Perl 5.8
* Addresses a documentation hole about the 'timestamp' type
* Support for the 'varint' type
* Support for the 'decimal' type
* Addresses an issue that occurred when both the AnyEvent and synchronous options were used
* Some refactoring to allow for future changes
0.05 2016/09/06
* Updates Makefile.PL to list the correct requirements
0.04 2016/09/05
* First version, released on an unsuspecting world.
lib/Cassandra/Client.pm view on Meta::CPAN
eval {
&$cb; 1
} or do {
my $error= $@ || "unknown error";
warn "Ignoring unhandled exception in callback: $error";
};
return;
}
sub _mksync { # Translates an asynchronous call into something that looks like Perl
my ($sub)= @_;
return sub {
my $self= shift;
$sub->($self, $self->{async_io}->wait(my $w), @_);
my ($err, @output)= $w->();
if ($err) { die $err; }
return @output;
};
}
lib/Cassandra/Client.pm view on Meta::CPAN
=head1 NAME
Cassandra::Client - Perl library for accessing Cassandra using its binary network protocol
=head1 VERSION
version 0.21
=head1 DESCRIPTION
C<Cassandra::Client> is a Perl library giving its users access to the Cassandra database, through the native protocol. Both synchronous and asynchronous querying is supported, through various common calling styles.
=head1 EXAMPLE
use Cassandra::Client;
my $client= Cassandra::Client->new(
contact_points => [ '127.0.0.1', '192.168.0.1' ],
username => "my_user",
password => "my_password",
keyspace => "my_keyspace",
);
lib/Cassandra/Client.pm view on Meta::CPAN
Disconnect all connections and abort all current queries. After this, the C<Cassandra::Client> object considers itself shut down and must be reconstructed with C<new()>.
=item $client->wait_for_schema_agreement()
Wait until all nodes agree on the schema version. Useful after changing table or keyspace definitions.
=back
=head1 (A?)SYNCHRONOUS
It's up to the user to choose which calling style to use: synchronous, asynchronous with promises, or through returned coderefs.
=head2 Synchronous
All C<Cassandra::Client> methods are available as synchronous methods by using their normal names. For example, C<< $client->connect(); >> will block until the client has connected. Similarly, C<< $client->execute($query) >> will wait for the query r...
my $client= Cassandra::Client->new( ... );
$client->connect;
$client->execute("INSERT INTO my_table (id, value) VALUES (?, ?) USING TTL ?",
[ 1, "test", 86400 ],
{ consistency => "quorum" });
=head2 Promises
C<Cassandra::Client> methods are also available as promises (see perldoc L<AnyEvent::XSPromises>). This integrates well with other libraries that deal with promises or asynchronous callbacks. Note that for promises to work, C<AnyEvent> is required, a...
Promise variants are available by prefixing method names with C<async_>, eg. C<async_connect>, C<async_execute>, etc. The usual result of the method is passed to the promise's success handler, or to the failure handler if there was an error.
# Asynchronously pages through the result set, processing data as it comes in.
my $promise= $client->async_each_page("SELECT id, column FROM my_table WHERE id=?", [ 5 ], undef, sub {
for my $row (@{shift->rows}) {
my ($id, $column)= @$row;
say "Row: $id $column";
}
})->then(sub {
say "We finished paging through all the rows";
}, sub {
my $error= shift;
});
lib/Cassandra/Client.pm view on Meta::CPAN
my $error= shift;
warn "Unhandled error! $error";
$condvar->send;
});
$condvar->recv; # Wait for the promsie to resolve or fail
How you integrate this into your infrastructure is of course up to you, and beyond the scope of the C<Cassandra::Client> documentation.
=head2 Coderefs
These are the simplest form of asynchronous querying in C<Cassandra::Client>. Instead of dealing with complex callback resolution, the client simply returns a coderef that, once invoked, returns what the original method would have retruned.
The variants are available by prefixing method names with C<future_>, eg. C<future_connect>, C<future_execute>, etc. These methods return a coderef.
my $coderef= $client->future_execute("INSERT INTO table (id, value) VALUES (?, ?), [ $id, $value ]);
# Do other things
...
# Wait for the query to finish
$coderef->();
Upon errors, the coderef will die, just like the synchronous methods would. Because of this, invoking the coderef immediately after getting it is equal to using the synchronous methods :
# This :
$client->connect;
# Is the same as :
$client->future_connect->();
When used properly, coderefs can give a modest performance boost, but their real value is in the ease of use compared to promises.
=head1 CAVEATS, BUGS, TODO
lib/Cassandra/Client/AsyncEV.pm view on Meta::CPAN
my ($done, $in_run);
my @output;
my $callback= sub {
$done= 1;
@output= @_;
$self->{ev}->break() if $in_run;
};
$$output= sub {
if ($self->{in_wait}) {
die "Unable to recursively wait for callbacks; are you doing synchronous Cassandra queries from asynchronous callbacks?";
}
local $self->{in_wait}= 1;
$in_run= 1;
$self->{ev}->run unless $done;
return @output;
};
return $callback;
}
t/50-bench.t view on Meta::CPAN
my $multiply= $ENV{BENCH_MULTIPLY} || 1;
for (1..$rounds) {
SYNC_INS: {
next if $ENV{BENCH_SKIP_SYNC};
my $num= 1000 * $multiply;
my $t0= -time();
for (1..$num) {
$client->execute($insert_query, [ $_, $_ * 2 ]);
}
my $diff= time() + $t0;
ok(1, sprintf "$num synchronous inserts: %.1f seconds", $diff);
}
SYNC_SEL: {
next if $ENV{BENCH_SKIP_SYNC};
my $num= 1000 * $multiply;
my $t0= -time();
for (1..$num) {
$client->execute($select_query, [ $_ ]);
}
my $diff= time() + $t0;
ok(1, sprintf "$num synchronous selects: %.1f seconds", $diff);
}
PROMISES_INS: {
my $num= 1000 * $multiply;
my $t0= -time();
my @promises;
for (1..$num) {
push @promises, $client->async_execute($insert_query, [ $_, $_ * 2 ]);
}
my $cv= AE::cv;
my $fail;
collect(@promises)->then(sub { $cv->send; }, sub { $fail= 1; $cv->send; });
$cv->recv;
my $diff= time() + $t0;
ok(1, sprintf "$num asynchronous inserts: %.1f seconds", $diff);
}
PROMISES_SEL: {
my $num= 1000 * $multiply;
my $t0= -time();
my @promises;
for (1..$num) {
push @promises, $client->async_execute($select_query, [ $_ ]);
}
my $cv= AE::cv;
my $fail;
collect(@promises)->then(sub { $cv->send; }, sub { $fail= 1; $cv->send; });
$cv->recv;
my $diff= time() + $t0;
ok(1, sprintf "$num asynchronous selects: %.1f seconds", $diff);
}
FUTURES_INS: {
my $num= 1000 * $multiply;
my $t0= -time();
my @futures;
for (1..$num) {
push @futures, $client->future_execute($insert_query, [ $_, $_ * 2 ]);
}
$_->() for @futures;
my $diff= time() + $t0;
ok(1, sprintf "$num synchronous inserts via futures: %.1f seconds", $diff);
}
FUTURES_SEL: {
my $num= 1000 * $multiply;
my $t0= -time();
my @futures;
for (1..$num) {
push @futures, $client->future_execute($select_query, [ $_ ]);
}
$_->() for @futures;
my $diff= time() + $t0;
ok(1, sprintf "$num synchronous selects via futures: %.1f seconds", $diff);
}
}
done_testing;
( run in 0.519 second using v1.01-cache-2.11-cpan-0d8aa00de5b )