Cassandra-Client
view release on metacpan or search on metacpan
lib/Cassandra/Client.pm view on Meta::CPAN
313233343536373839404142434445464748495051our
$XS_VERSION
= (
$Cassandra::Client::VERSION
||
''
);
$XS_VERSION
=~ s/\A(\d+)\.(\d+)(\d{3})\z/$1.$2_$3/;
XSLoader::load(__PACKAGE__,
$XS_VERSION
);
sub
new {
my
(
$class
,
%args
)=
@_
;
my
$self
=
bless
{
connected
=> 0,
connect_callbacks
=>
undef
,
shutdown
=> 0,
active_queries
=> 0,
},
$class
;
my
$options
= Cassandra::Client::Config->new(
\
%args
);
$self
->{throttler}=
$options
->{throttler} || Cassandra::Client::Policy::Throttle::Default->new();
lib/Cassandra/Client.pm view on Meta::CPAN
81828384858687888990919293949596979899100101102103104105106107108109110111112113114115sub
_connect {
my
(
$self
,
$callback
)=
@_
;
return
_cb(
$callback
)
if
$self
->{connected};
return
_cb(
$callback
,
'Cannot connect: shutdown() has been called'
)
if
$self
->{
shutdown
};
# This is ONLY useful if the user doesn't throw away the C::C object on connect errors.
if
(!
$self
->{connecting} && (
my
$error
=
$self
->{throttler}->should_fail())) {
return
_cb(
$callback
,
$error
);
}
push
@{
$self
->{connect_callbacks}||=[]},
$callback
;
if
(
$self
->{connecting}++) {
return
;
}
my
@contact_points
= shuffle @{
$self
->{options}{contact_points}};
my
$last_error
=
"No hosts to connect to"
;
my
$next_connect
;
$next_connect
=
sub
{
my
$contact_point
=
shift
@contact_points
;
if
(!
$contact_point
) {
delete
$self
->{connecting};
undef
$next_connect
;
_cb(
$_
,
"Unable to connect to any Cassandra server. Last error: $last_error"
)
for
@{
delete
$self
->{connect_callbacks}};
return
;
}
my
$connection
= Cassandra::Client::Connection->new(
client
=>
$self
,
options
=>
$self
->{options},
host
=>
$contact_point
,
async_io
=>
$self
->{async_io},
metadata
=>
$self
->{metadata},
);
lib/Cassandra/Client.pm view on Meta::CPAN
127128129130131132133134135136137138139140141142143144145146
my
$error
=
shift
;
$self
->{throttler}->count(
$error
);
if
(
$error
) {
$last_error
=
"On $contact_point: $error"
;
return
$next_connect
->();
}
undef
$next_connect
;
$self
->{connected}= 1;
delete
$self
->{connecting};
_cb(
$_
)
for
@{
delete
$self
->{connect_callbacks}};
});
};
$next_connect
->();
return
;
}
sub
shutdown
{
my
(
$self
)=
@_
;
lib/Cassandra/Client.pm view on Meta::CPAN
706707708709710711712713714715716717718719720721722723724725726All C<Cassandra::Client> methods are available as synchronous methods by using their normal names. For example, C<<
$client
->
connect
(); >> will block
until
the client
has
connected. Similarly, C<<
$client
->execute(
$query
) >> will
wait
for
the query r...
my
$client
= Cassandra::Client->new( ... );
$client
->
connect
;
$client
->execute(
"INSERT INTO my_table (id, value) VALUES (?, ?) USING TTL ?"
,
[ 1,
"test"
, 86400 ],
{
consistency
=>
"quorum"
});
=head2 Promises
C<Cassandra::Client> methods are also available as promises (see perldoc L<AnyEvent::XSPromises>). This integrates well with other libraries that deal with promises or asynchronous callbacks. Note that for promises to work, C<AnyEvent> is required, a...
Promise variants are available by prefixing method names with C<async_>, eg. C<async_connect>, C<async_execute>, etc. The usual result of the method is passed to the promise's success handler, or to the failure handler if there was an error.
# Asynchronously pages through the result set, processing data as it comes in.
my $promise= $client->async_each_page("SELECT id, column FROM my_table WHERE id=?", [ 5 ], undef, sub {
for my $row (@{shift->rows}) {
my ($id, $column)= @$row;
say "Row: $id $column";
}
})->then(sub {
lib/Cassandra/Client/AsyncEV.pm view on Meta::CPAN
143144145146147148149150151152153154155156157158159160161162163
my
(
$done
,
$in_run
);
my
@output
;
my
$callback
=
sub
{
$done
= 1;
@output
=
@_
;
$self
->{ev}->break()
if
$in_run
;
};
$$output
=
sub
{
if
(
$self
->{in_wait}) {
die
"Unable to recursively wait for callbacks; are you doing synchronous Cassandra queries from asynchronous callbacks?"
;
}
local
$self
->{in_wait}= 1;
$in_run
= 1;
$self
->{ev}->run
unless
$done
;
return
@output
;
};
return
$callback
;
}
lib/Cassandra/Client/Pool.pm view on Meta::CPAN
222223224225226227228229230231232233234235236237238239240241242$connection
->
connect
(
sub
{
my
(
$error
)=
@_
;
delete
$self
->{connecting}{
$host
};
if
(
$error
) {
$self
->{policy}->set_disconnected(
$host
);
if
(
my
$waiters
=
delete
$self
->{wait_connect}) {
if
(
$self
->{count} &&
@$waiters
) {
warn
'We have callbacks waiting for a connection while we\'re connected'
;
}
my
$max_conn
=
$self
->{max_connections};
my
$known_node_count
=
$self
->{policy}->known_node_count;
my
$max_attempts
= (
$max_conn
<
$known_node_count
?
$max_conn
:
$known_node_count
) + 1;
for
my
$waiter
(
@$waiters
) {
if
((++
$waiter
->{attempts}) >=
$max_attempts
|| !%{
$self
->{connecting}}) {
$waiter
->{callback}->(
"Failed to connect to server: $error"
);
}
else
{
( run in 0.300 second using v1.01-cache-2.11-cpan-496ff517765 )