Cassandra-Client
view release on metacpan or search on metacpan
lib/Cassandra/Client/Connection.pm view on Meta::CPAN
eval {
($decoder, $paging_state)= unpack_metadata($self->{protocol_version}, 1, $_[3]);
1;
} or do {
return $callback->("Unable to unpack query metadata: $@");
};
$decoder= $prepared->{decoder} || $decoder;
$callback->(undef,
Cassandra::Client::ResultSet->new(
\$_[3],
$decoder,
$paging_state,
)
);
} elsif ($result_type == RESULT_VOID) { # Void
return $callback->();
} elsif ($result_type == RESULT_SET_KEYSPACE) { # Set_keyspace
my $new_keyspace= unpack_string($_[3]);
return $callback->();
} elsif ($result_type == RESULT_SCHEMA_CHANGE) { # Schema change
return $self->wait_for_schema_agreement(sub {
# We may be passed an error. Ignore it, our query succeeded
$callback->();
});
} else {
return $callback->("Query executed successfully but got an unexpected response type");
}
return;
}
sub wait_for_schema_agreement {
my ($self, $callback)= @_;
my $waited= 0;
my $wait_delay= 0.5;
my $max_wait= 20;
my $done;
whilst(
sub { !$done },
sub {
my ($whilst_next)= @_;
$self->get_network_status(sub {
my ($error, $network_status)= @_;
return $whilst_next->($error) if $error;
my %versions;
$versions{$_->{schema_version}}= 1 for values %$network_status;
if (keys %versions > 1) {
if ($waited >= $max_wait) {
return $whilst_next->("wait_for_schema_agreement timed out after $waited seconds");
}
$waited += $wait_delay;
return $self->{async_io}->timer($whilst_next, $wait_delay);
} else {
$done= 1;
return $whilst_next->();
}
});
},
$callback,
);
return;
}
###### PROTOCOL CODE
sub handshake {
my ($self, $callback)= @_;
series([
sub { # Send the OPCODE_OPTIONS
my ($next)= @_;
$self->request($next, OPCODE_OPTIONS, '');
},
sub { # The server hopefully just told us what it supports, let's respond with a STARTUP message
my ($next, $response_code, $body)= @_;
if ($response_code != OPCODE_SUPPORTED) {
return $next->("Server returned an unexpected handshake");
}
my $map= unpack_stringmultimap($body);
unless ($map->{CQL_VERSION} && $map->{COMPRESSION}) {
return $next->("Server did not return compression and cql version information");
}
my $selected_cql_version= $self->{options}{cql_version};
if (!$selected_cql_version) {
($selected_cql_version)= reverse sort @{$map->{CQL_VERSION}};
}
my %ss_compression= map { $_, 1 } @{$map->{COMPRESSION}};
my $selected_compression= $self->{options}{compression};
if (!$selected_compression) {
for (@compression_preference) {
if ($ss_compression{$_} && $available_compression{$_}) {
$selected_compression= $_;
last;
}
}
}
$selected_compression= undef if $selected_compression && $selected_compression eq 'none';
if ($selected_compression) {
if (!$ss_compression{$selected_compression}) {
return $next->("Server did not support requested compression method <$selected_compression>");
}
if (!$available_compression{$selected_compression}) {
return $next->("Requested compression method <$selected_compression> is supported by the server but not by us");
}
}
( run in 0.513 second using v1.01-cache-2.11-cpan-140bd7fdf52 )