Cassandra-Client
view release on metacpan or search on metacpan
lib/Cassandra/Client/Connection.pm view on Meta::CPAN
}
my $request_body= pack_stringmap({
CQL_VERSION => $selected_cql_version,
($selected_compression ? (COMPRESSION => $selected_compression) : ()),
});
$self->request($next, OPCODE_STARTUP, $request_body);
# This needs to happen after we send the STARTUP message
$self->setup_compression($selected_compression);
},
sub { # By now we should know whether we need to authenticate
my ($next, $response_code, $body)= @_;
if ($response_code == OPCODE_READY) {
return $next->(undef, $body); # Pass it along
}
if ($response_code == OPCODE_AUTHENTICATE) {
return $self->authenticate($next, $body);
}
return $next->("Unexpected response from the server");
},
sub {
my ($next)= @_;
if ($self->{options}{keyspace}) {
return $self->execute_prepared($next, \('use "'.$self->{options}{keyspace}.'"'));
}
return $next->();
},
sub {
my ($next)= @_;
if (!$self->{ipaddress}) {
return $self->get_local_status($next);
}
return $next->();
},
sub {
my ($next, $status)= @_;
if ($status) {
my ($local)= values %$status;
$self->{ipaddress}= $local->{peer};
$self->{datacenter}= $local->{data_center};
}
if (!$self->{ipaddress}) {
return $next->("Unable to determine node's IP address");
}
return $next->();
}
], $callback);
return;
}
sub authenticate {
my ($self, $callback, $initial_challenge)= @_;
my $authenticator= unpack_string($initial_challenge);
if (!$self->{options}{authentication}) {
return $callback->("Server expected authentication using <$authenticator> but no credentials were set");
}
my $auth;
eval {
$auth= $self->{options}{authentication}->begin($authenticator);
1;
} or do {
my $error= "Failed to initialize authentication mechanism: $@";
return $callback->($error);
};
my $auth_done;
my $next_challenge= undef;
whilst(
sub { !$auth_done },
sub {
my ($whilst_next)= @_;
series([
sub {
my $next= shift;
eval {
$auth->evaluate($next, $next_challenge);
1;
} or do {
return $next->("Failed to evaluate challenge: $@");
};
},
sub {
my ($next, $auth_response)= @_;
$self->request($next, OPCODE_AUTH_RESPONSE, pack_bytes($auth_response));
},
sub {
my ($next, $opcode, $body)= @_;
if ($opcode == OPCODE_AUTH_CHALLENGE) {
$next_challenge= unpack_bytes($body);
return $next->();
}
if ($opcode == OPCODE_AUTH_SUCCESS) {
$auth_done= 1;
eval {
$auth->success(unpack_bytes($body));
1;
} or do {
return $next->("Failed while finishing authentication: $@");
};
return $next->();
}
return $next->("Received unexpected opcode $opcode during authentication");
},
], $whilst_next);
},
$callback,
);
return;
}
sub handle_event {
my ($self, $eventdata)= @_;
( run in 1.475 second using v1.01-cache-2.11-cpan-39bf76dae61 )