Cassandra-Client

 view release on metacpan or  search on metacpan

lib/Cassandra/Client/Connection.pm  view on Meta::CPAN

            }

            my $request_body= pack_stringmap({
                CQL_VERSION => $selected_cql_version,
                ($selected_compression ? (COMPRESSION => $selected_compression) : ()),
            });

            $self->request($next, OPCODE_STARTUP, $request_body);

            # This needs to happen after we send the STARTUP message
            $self->setup_compression($selected_compression);
        },
        sub { # By now we should know whether we need to authenticate
            my ($next, $response_code, $body)= @_;
            if ($response_code == OPCODE_READY) {
                return $next->(undef, $body); # Pass it along
            }

            if ($response_code == OPCODE_AUTHENTICATE) {
                return $self->authenticate($next, $body);
            }

            return $next->("Unexpected response from the server");
        },
        sub {
            my ($next)= @_;
            if ($self->{options}{keyspace}) {
                return $self->execute_prepared($next, \('use "'.$self->{options}{keyspace}.'"'));
            }
            return $next->();
        },
        sub {
            my ($next)= @_;
            if (!$self->{ipaddress}) {
                return $self->get_local_status($next);
            }
            return $next->();
        },
        sub {
            my ($next, $status)= @_;
            if ($status) {
                my ($local)= values %$status;
                $self->{ipaddress}= $local->{peer};
                $self->{datacenter}= $local->{data_center};
            }
            if (!$self->{ipaddress}) {
                return $next->("Unable to determine node's IP address");
            }
            return $next->();
        }
    ], $callback);

    return;
}

sub authenticate {
    my ($self, $callback, $initial_challenge)= @_;

    my $authenticator= unpack_string($initial_challenge);
    if (!$self->{options}{authentication}) {
        return $callback->("Server expected authentication using <$authenticator> but no credentials were set");
    }

    my $auth;
    eval {
        $auth= $self->{options}{authentication}->begin($authenticator);
        1;
    } or do {
        my $error= "Failed to initialize authentication mechanism: $@";
        return $callback->($error);
    };

    my $auth_done;
    my $next_challenge= undef;
    whilst(
        sub { !$auth_done },
        sub {
            my ($whilst_next)= @_;

            series([
                sub {
                    my $next= shift;
                    eval {
                        $auth->evaluate($next, $next_challenge);
                        1;
                    } or do {
                        return $next->("Failed to evaluate challenge: $@");
                    };
                },
                sub {
                    my ($next, $auth_response)= @_;
                    $self->request($next, OPCODE_AUTH_RESPONSE, pack_bytes($auth_response));
                },
                sub {
                    my ($next, $opcode, $body)= @_;
                    if ($opcode == OPCODE_AUTH_CHALLENGE) {
                        $next_challenge= unpack_bytes($body);
                        return $next->();
                    }
                    if ($opcode == OPCODE_AUTH_SUCCESS) {
                        $auth_done= 1;
                        eval {
                            $auth->success(unpack_bytes($body));
                            1;
                        } or do {
                            return $next->("Failed while finishing authentication: $@");
                        };
                        return $next->();
                    }
                    return $next->("Received unexpected opcode $opcode during authentication");
                },
            ], $whilst_next);
        },
        $callback,
    );

    return;
}

sub handle_event {
    my ($self, $eventdata)= @_;



( run in 1.475 second using v1.01-cache-2.11-cpan-39bf76dae61 )