Cassandra-Client

 view release on metacpan or  search on metacpan

lib/Cassandra/Client/Connection.pm  view on Meta::CPAN

        eval {
            ($decoder, $paging_state)= unpack_metadata($self->{protocol_version}, 1, $_[3]);
            1;
        } or do {
            return $callback->("Unable to unpack query metadata: $@");
        };
        $decoder= $prepared->{decoder} || $decoder;

        $callback->(undef,
            Cassandra::Client::ResultSet->new(
                \$_[3],
                $decoder,
                $paging_state,
            )
        );

    } elsif ($result_type == RESULT_VOID) { # Void
        return $callback->();

    } elsif ($result_type == RESULT_SET_KEYSPACE) { # Set_keyspace
        my $new_keyspace= unpack_string($_[3]);
        return $callback->();

    } elsif ($result_type == RESULT_SCHEMA_CHANGE) { # Schema change
        return $self->wait_for_schema_agreement(sub {
            # We may be passed an error. Ignore it, our query succeeded
            $callback->();
        });

    } else {
        return $callback->("Query executed successfully but got an unexpected response type");
    }
    return;
}

sub wait_for_schema_agreement {
    my ($self, $callback)= @_;

    my $waited= 0;
    my $wait_delay= 0.5;
    my $max_wait= 20;

    my $done;
    whilst(
        sub { !$done },
        sub {
            my ($whilst_next)= @_;

            $self->get_network_status(sub {
                my ($error, $network_status)= @_;
                return $whilst_next->($error) if $error;

                my %versions;
                $versions{$_->{schema_version}}= 1 for values %$network_status;
                if (keys %versions > 1) {
                    if ($waited >= $max_wait) {
                        return $whilst_next->("wait_for_schema_agreement timed out after $waited seconds");
                    }

                    $waited += $wait_delay;
                    return $self->{async_io}->timer($whilst_next, $wait_delay);
                } else {
                    $done= 1;
                    return $whilst_next->();
                }
            });
        },
        $callback,
    );

    return;
}



###### PROTOCOL CODE
sub handshake {
    my ($self, $callback)= @_;
    series([
        sub { # Send the OPCODE_OPTIONS
            my ($next)= @_;
            $self->request($next, OPCODE_OPTIONS, '');
        },
        sub { # The server hopefully just told us what it supports, let's respond with a STARTUP message
            my ($next, $response_code, $body)= @_;
            if ($response_code != OPCODE_SUPPORTED) {
                return $next->("Server returned an unexpected handshake");
            }

            my $map= unpack_stringmultimap($body);

            unless ($map->{CQL_VERSION} && $map->{COMPRESSION}) {
                return $next->("Server did not return compression and cql version information");
            }

            my $selected_cql_version= $self->{options}{cql_version};
            if (!$selected_cql_version) {
                ($selected_cql_version)= reverse sort @{$map->{CQL_VERSION}};
            }

            my %ss_compression= map { $_, 1 } @{$map->{COMPRESSION}};
            my $selected_compression= $self->{options}{compression};
            if (!$selected_compression) {
                for (@compression_preference) {
                    if ($ss_compression{$_} && $available_compression{$_}) {
                        $selected_compression= $_;
                        last;
                    }
                }
            }
            $selected_compression= undef if $selected_compression && $selected_compression eq 'none';

            if ($selected_compression) {
                if (!$ss_compression{$selected_compression}) {
                    return $next->("Server did not support requested compression method <$selected_compression>");
                }
                if (!$available_compression{$selected_compression}) {
                    return $next->("Requested compression method <$selected_compression> is supported by the server but not by us");
                }
            }



( run in 0.513 second using v1.01-cache-2.11-cpan-140bd7fdf52 )