Cassandra-Client

 view release on metacpan or  search on metacpan

lib/Cassandra/Client.pm  view on Meta::CPAN

sub _command_slowpath {
    my ($self, $command, $callback, $args, $command_info)= @_;

    $self->{active_queries}++;

    series([
        sub {
            my ($next)= @_;
            $self->_connect($next);
        }, sub {
            my ($next)= @_;
            $self->{pool}->get_one_cb($next);
        }, sub {
            my ($next, $connection)= @_;
            if (my $error= $self->{throttler}->should_fail()) {
                return $next->($error);
            }
            $connection->$command($next, @$args);
        }
    ], sub {
        my ($error, $result)= @_;
        $self->{throttler}->count($error);

        $self->{active_queries}--;
        $self->_schedule_command_dequeue if $self->{command_queue}{has_any};

        return $self->_command_failed($command, $callback, $args, $command_info, $error) if $error;

        $self->_report_stats($command, $command_info);
        return _cb($callback, $error, $result);
    });
    return;
}

sub _command_retry {
    my ($self, $command, $callback, $args, $command_info)= @_;

    $command_info->{retries}++;

    my $delay= 0.1 * (2 ** $command_info->{retries});
    $self->{async_io}->timer(sub {
        if ($self->{active_queries} >= $self->{options}{max_concurrent_queries}) {
            $self->_command_enqueue($command, $callback, $args, $command_info);
        } else {
            $self->_command_slowpath($command, $callback, $args, $command_info);
        }
    }, $delay);
}

sub _command_failed {
    my ($self, $command, $callback, $args, $command_info, $error)= @_;

    if (is_ref($error)) {
        my $retry_decision;
        my $statement = $command eq 'execute_prepared' ? {idempotent => $args->[2]->{idempotent}} : {};

        if ($error->do_retry) {
            $retry_decision= Cassandra::Client::Policy::Retry::retry;
        } elsif ($error->is_request_error) {
            $retry_decision= $self->{retry_policy}->on_request_error($statement, undef, $error, ($command_info->{retries}||0));
        } elsif ($error->isa('Cassandra::Client::Error::WriteTimeoutException')) {
            $retry_decision= $self->{retry_policy}->on_write_timeout($statement, $error->cl, $error->write_type, $error->blockfor, $error->received, ($command_info->{retries}||0));
        } elsif ($error->isa('Cassandra::Client::Error::ReadTimeoutException')) {
            $retry_decision= $self->{retry_policy}->on_read_timeout($statement, $error->cl, $error->blockfor, $error->received, $error->data_retrieved, ($command_info->{retries}||0));
        } elsif ($error->isa('Cassandra::Client::Error::UnavailableException')) {
            $retry_decision= $self->{retry_policy}->on_unavailable($statement, $error->cl, $error->required, $error->alive, ($command_info->{retries}||0));
        } else {
            $retry_decision= Cassandra::Client::Policy::Retry::rethrow;
        }

        if ($retry_decision && $retry_decision eq 'retry') {
            return $self->_command_retry($command, $callback, $args, $command_info);
        }
    }

    $self->_report_stats($command, $command_info);
    return $callback->($error);
}

sub _command_enqueue {
    my ($self, $command, $callback, $args, $command_info)= @_;
    if (my $error= $self->{command_queue}->enqueue([$command, $callback, $args, $command_info])) {
        return $self->_command_failed($command, $callback, $args, $command_info, "Cannot $command: $error");
    }
    return;
}

sub _schedule_command_dequeue {
    my ($self)= @_;
    unless ($self->{command_callback_scheduled}++) {
        $self->{async_io}->later(sub {
            delete $self->{command_callback_scheduled};

            while ($self->{command_queue}{has_any} && $self->{active_queries} < $self->{options}{max_concurrent_queries}) {
                my $item= $self->{command_queue}->dequeue or return;
                $self->_command_slowpath(@$item);
            }
        });
    }
}

sub _report_stats {
    my ($self, $command, $command_info)= @_;

    $command_info->{end_time}= Time::HiRes::time();

    if (my $stats_hook= $self->{options}{stats_hook}) {
        _cb($stats_hook, timing => {
            command     => $command,
            start_time  => $command_info->{start_time},
            end_time    => $command_info->{end_time},
        });
    }
}

# Utility functions that wrap query functions
sub _each_page {
    my ($self, $callback, $query, $params, $attribs, $page_callback)= @_;

    my $params_copy= $params ? clone($params) : undef;
    my $attribs_copy= $attribs ? clone($attribs) : undef;

    my $done= 0;



( run in 2.836 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )