Cassandra-Client

 view release on metacpan or  search on metacpan

lib/Cassandra/Client.pm  view on Meta::CPAN

        $self->{throttler}->count($error);

        $self->{active_queries}--;
        $self->_schedule_command_dequeue if $self->{command_queue}{has_any};

        return $self->_command_failed($command, $callback, $args, $command_info, $error) if $error;

        $self->_report_stats($command, $command_info);
        return _cb($callback, $error, $result);
    }, @$args);

    return;

SLOWPATH:
    return $self->_command_slowpath($command, $callback, $args, $command_info);

OVERFLOW:
    return $self->_command_enqueue($command, $callback, $args, $command_info);
}

sub _command_slowpath {
    my ($self, $command, $callback, $args, $command_info)= @_;

    $self->{active_queries}++;

    series([
        sub {
            my ($next)= @_;
            $self->_connect($next);
        }, sub {
            my ($next)= @_;
            $self->{pool}->get_one_cb($next);
        }, sub {
            my ($next, $connection)= @_;
            if (my $error= $self->{throttler}->should_fail()) {
                return $next->($error);
            }
            $connection->$command($next, @$args);
        }
    ], sub {
        my ($error, $result)= @_;
        $self->{throttler}->count($error);

        $self->{active_queries}--;
        $self->_schedule_command_dequeue if $self->{command_queue}{has_any};

        return $self->_command_failed($command, $callback, $args, $command_info, $error) if $error;

        $self->_report_stats($command, $command_info);
        return _cb($callback, $error, $result);
    });
    return;
}

sub _command_retry {
    my ($self, $command, $callback, $args, $command_info)= @_;

    $command_info->{retries}++;

    my $delay= 0.1 * (2 ** $command_info->{retries});
    $self->{async_io}->timer(sub {
        if ($self->{active_queries} >= $self->{options}{max_concurrent_queries}) {
            $self->_command_enqueue($command, $callback, $args, $command_info);
        } else {
            $self->_command_slowpath($command, $callback, $args, $command_info);
        }
    }, $delay);
}

sub _command_failed {
    my ($self, $command, $callback, $args, $command_info, $error)= @_;

    if (is_ref($error)) {
        my $retry_decision;
        my $statement = $command eq 'execute_prepared' ? {idempotent => $args->[2]->{idempotent}} : {};

        if ($error->do_retry) {
            $retry_decision= Cassandra::Client::Policy::Retry::retry;
        } elsif ($error->is_request_error) {
            $retry_decision= $self->{retry_policy}->on_request_error($statement, undef, $error, ($command_info->{retries}||0));
        } elsif ($error->isa('Cassandra::Client::Error::WriteTimeoutException')) {
            $retry_decision= $self->{retry_policy}->on_write_timeout($statement, $error->cl, $error->write_type, $error->blockfor, $error->received, ($command_info->{retries}||0));
        } elsif ($error->isa('Cassandra::Client::Error::ReadTimeoutException')) {
            $retry_decision= $self->{retry_policy}->on_read_timeout($statement, $error->cl, $error->blockfor, $error->received, $error->data_retrieved, ($command_info->{retries}||0));
        } elsif ($error->isa('Cassandra::Client::Error::UnavailableException')) {
            $retry_decision= $self->{retry_policy}->on_unavailable($statement, $error->cl, $error->required, $error->alive, ($command_info->{retries}||0));
        } else {
            $retry_decision= Cassandra::Client::Policy::Retry::rethrow;
        }

        if ($retry_decision && $retry_decision eq 'retry') {
            return $self->_command_retry($command, $callback, $args, $command_info);
        }
    }

    $self->_report_stats($command, $command_info);
    return $callback->($error);
}

sub _command_enqueue {
    my ($self, $command, $callback, $args, $command_info)= @_;
    if (my $error= $self->{command_queue}->enqueue([$command, $callback, $args, $command_info])) {
        return $self->_command_failed($command, $callback, $args, $command_info, "Cannot $command: $error");
    }
    return;
}

sub _schedule_command_dequeue {
    my ($self)= @_;
    unless ($self->{command_callback_scheduled}++) {
        $self->{async_io}->later(sub {
            delete $self->{command_callback_scheduled};

            while ($self->{command_queue}{has_any} && $self->{active_queries} < $self->{options}{max_concurrent_queries}) {
                my $item= $self->{command_queue}->dequeue or return;
                $self->_command_slowpath(@$item);
            }
        });
    }
}

lib/Cassandra/Client.pm  view on Meta::CPAN

    );
    $client->connect;

    $client->each_page("SELECT id, column FROM my_table WHERE id=?", [ 5 ], undef, sub {
        for my $row (@{shift->rows}) {
            my ($id, $column)= @$row;
            say "$id: $column";
        }
    });

=head1 METHODS

=over

=item Cassandra::Client->new(%options)

Create a new C<Cassandra::Client> instance, with the given options.

=over

=item contact_points

B<Required.> Arrayref of seed hosts to use when connecting. Specify more than one for increased reliability. This array is shuffled before use, so that random hosts are picked from the array.

=item keyspace

Default keyspace to use on all underlying connections. Can be overridden by querying for specific keyspaces, eg C<SELECT * FROM system.peers>.

=item anyevent

Should our internal event loop be based on AnyEvent, or should we just use our own? A true value means enable AnyEvent. Needed for promises to work.

=item port

Port number to use. Defaults to C<9042>.

=item cql_version

CQL version to use. Defaults to the version the server is running. Override only if your client has specific CQL requirements.

=item compression

Compression method to use. Defaults to the best available version, based on server and client support. Possible values are C<snappy>, C<lz4>, and C<none>.

=item default_consistency

Default consistency level to use. Defaults to C<one>. Can be overridden on a query basis as well, by passing a C<consistency> attribute.

=item default_idempotency

Default value of the C<idempotent> query attribute that indicates if a write query may be retried without harm. It defaults to false.

=item max_page_size

Default max page size to pass to the server. This defaults to C<5000>. Note that large values can cause trouble on Cassandra. Can be overridden by passing C<page_size> in query attributes.

=item max_connections

Maximum amount of connections to keep open in the Cassandra connection pool. Defaults to C<2> for historical reasons, raise this if appropriate.

=item timer_granularity

Timer granularity used for timeouts. Defaults to C<0.1> (100ms). Change this if you're setting timeouts to values lower than a second.

=item request_timeout

Maximum time to wait for a query, in seconds. Defaults to C<11>.

=item warmup

Whether to connect to the full cluster in C<connect()>, or delay that until queries come in.

=item protocol_version

Cassandra protocol version to use. Currently defaults to C<4>, can also be set to C<3> for compatibility with older versions of Cassandra.

=back

=item $client->batch($queries[, $attributes])

Run one or more queries, in a batch, on Cassandra. Queries must be specified as an arrayref of C<[$query, \@bind]> pairs.

Defaults to a I<logged> batch, which can be overridden by passing C<logged>, C<unlogged> or C<counter> as the C<batch_type> attribute.

    $client->batch([
        [ "INSERT INTO my_table (a, b) VALUES (?, ?)", [ $row1_a, $row1_b ] ],
        [ "INSERT INTO my_table (a, b) VALUES (?, ?)", [ $row2_a, $row2_b ] ],
    ], { batch_type => "unlogged" });

=item $client->execute($query[, $bound_parameters[, $attributes]])

Executes a single query on Cassandra, and fetch the results (if any).

For queries that have large amounts of result rows and end up spanning multiple pages, C<each_page> is the function you need. C<execute> does not handle pagination, and may end up missing rows unless pagination is implemented by its user through the ...

    $client->execute(
        "UPDATE my_table SET column=:new_column WHERE id=:id",
        { new_column => 2, id => 5 },
        { consistency => "quorum" },
    );

The C<idempotent> attribute indicates that the query is idempotent and may be retried without harm.

=item $client->each_page($query, $bound_parameters, $attributes, $page_callback)

Executes a query and invokes C<$page_callback> with each page of the results, represented as L<Cassandra::Client::ResultSet> objects.

    # Downloads the entire table from the database, even if it's terabytes in size
    $client->each_page( "SELECT id, column FROM my_table", undef, undef, sub {
        my $page= shift;
        for my $row (@{$page->rows}) {
            say $row->[0];
        }
    });

=item $client->prepare($query)

Prepares a query on the server. C<execute> and C<each_page> already do this internally, so this method is only useful for preloading purposes (and to check whether queries even compile, I guess).

=item $client->shutdown()



( run in 1.260 second using v1.01-cache-2.11-cpan-140bd7fdf52 )