Cassandra-Client

 view release on metacpan or  search on metacpan

lib/Cassandra/Client.pm  view on Meta::CPAN

    });
    return;
}

sub _command_retry {
    my ($self, $command, $callback, $args, $command_info)= @_;

    $command_info->{retries}++;

    my $delay= 0.1 * (2 ** $command_info->{retries});
    $self->{async_io}->timer(sub {
        if ($self->{active_queries} >= $self->{options}{max_concurrent_queries}) {
            $self->_command_enqueue($command, $callback, $args, $command_info);
        } else {
            $self->_command_slowpath($command, $callback, $args, $command_info);
        }
    }, $delay);
}

sub _command_failed {
    my ($self, $command, $callback, $args, $command_info, $error)= @_;

lib/Cassandra/Client.pm  view on Meta::CPAN

Default value of the C<idempotent> query attribute that indicates if a write query may be retried without harm. It defaults to false.

=item max_page_size

Default max page size to pass to the server. This defaults to C<5000>. Note that large values can cause trouble on Cassandra. Can be overridden by passing C<page_size> in query attributes.

=item max_connections

Maximum amount of connections to keep open in the Cassandra connection pool. Defaults to C<2> for historical reasons, raise this if appropriate.

=item timer_granularity

Timer granularity used for timeouts. Defaults to C<0.1> (100ms). Change this if you're setting timeouts to values lower than a second.

=item request_timeout

Maximum time to wait for a query, in seconds. Defaults to C<11>.

=item warmup

Whether to connect to the full cluster in C<connect()>, or delay that until queries come in.

lib/Cassandra/Client/AsyncAnyEvent.pm  view on Meta::CPAN

use vars qw/@TIMEOUTS/;

sub new {
    my ($class, %args)= @_;

    my $options= $args{options};

    require AnyEvent;

    return bless {
        timer_granularity => ($options->{timer_granularity} || 0.1),
        ae_read => {},
        ae_write => {},
        ae_timeout => undef,
        fh_to_obj => {},
        timeouts => [],
    }, $class;
}

sub register {
    my ($self, $fh, $connection)= @_;

lib/Cassandra/Client/AsyncAnyEvent.pm  view on Meta::CPAN

    undef $self->{ae_write}{$fh};

    return;
}

sub deadline {
    my ($self, $fh, $id, $timeout)= @_;
    local *TIMEOUTS= $self->{timeouts};

    if (!$self->{ae_timeout}) {
        $self->{ae_timeout}= AnyEvent->timer(
            after => $self->{timer_granularity},
            interval => $self->{timer_granularity},
            cb => sub { $self->handle_timeouts(Time::HiRes::clock_gettime(CLOCK_MONOTONIC)) },
        );
    }

    my $curtime= Time::HiRes::clock_gettime(CLOCK_MONOTONIC);
    my $deadline= $curtime + $timeout;
    my $additem= [ $deadline, $fh, $id, 0 ];

    if (@TIMEOUTS && $TIMEOUTS[-1][0] > $deadline) {
        # Grumble... that's slow

lib/Cassandra/Client/AsyncAnyEvent.pm  view on Meta::CPAN

        }
    }

    if (!@TIMEOUTS) {
        $self->{ae_timeout}= undef;
    }

    return;
}

sub timer {
    my ($self, $callback, $wait)= @_;
    my $t; $t= AE::timer($wait, 0, sub {
        undef $t;
        $callback->();
    });
}

sub later {
    my ($self, $callback)= @_;
    &AE::postpone($callback);
}

lib/Cassandra/Client/AsyncEV.pm  view on Meta::CPAN

use vars qw/@TIMEOUTS/;

sub new {
    my ($class, %args)= @_;

    my $options= $args{options};

    require EV;

    return bless {
        timer_granularity => ($options->{timer_granularity} || 0.1),
        ev_read => {},
        ev_write => {},
        ev_timeout => undef,
        fh_to_obj => {},
        timeouts => [],
        ev => EV::Loop->new(),
    }, $class;
}

sub register {

lib/Cassandra/Client/AsyncEV.pm  view on Meta::CPAN

    undef $self->{ev_write}{$fh};

    return;
}

sub deadline {
    my ($self, $fh, $id, $timeout)= @_;
    local *TIMEOUTS= $self->{timeouts};

    if (!$self->{ev_timeout}) {
        $self->{ev_timeout}= $self->{ev}->timer( $self->{timer_granularity}, $self->{timer_granularity}, sub {
            $self->handle_timeouts(Time::HiRes::clock_gettime(CLOCK_MONOTONIC));
        } );
    }

    my $curtime= Time::HiRes::clock_gettime(CLOCK_MONOTONIC);
    my $deadline= $curtime + $timeout;
    my $additem= [ $deadline, $fh, $id, 0 ];

    if (@TIMEOUTS && $TIMEOUTS[-1][0] > $deadline) {
        # Grumble... that's slow

lib/Cassandra/Client/AsyncEV.pm  view on Meta::CPAN

        }
    }

    if (!@TIMEOUTS) {
        $self->{ev_timeout}= undef;
    }

    return;
}

sub timer {
    my ($self, $callback, $wait)= @_;
    my $t; $t= $self->{ev}->timer($wait, 0, sub {
        undef $t;
        $callback->();
    });
}

sub later {
    my ($self, $callback)= @_;
    $self->timer($callback, 0);
}

# $something->($async->wait(my $w)); my ($error, $result)= $w->();
sub wait {
    my ($self)= @_;
    my $output= \$_[1];

    my ($done, $in_run);
    my @output;
    my $callback= sub {

lib/Cassandra/Client/Config.pm  view on Meta::CPAN

        anyevent                => 0,
        contact_points          => undef,
        port                    => 9042,
        cql_version             => undef,
        keyspace                => undef,
        compression             => undef,
        default_consistency     => undef,
        default_idempotency     => 0,
        max_page_size           => 5000,
        max_connections         => 2,
        timer_granularity       => 0.1,
        request_timeout         => 11,
        warmup                  => 0,
        max_concurrent_queries  => 1000,
        tls                     => 0,
        protocol_version        => 4,

        throttler               => undef,
        command_queue           => undef,
        retry_policy            => undef,
        load_balancing_policy   => undef,

lib/Cassandra/Client/Config.pm  view on Meta::CPAN

    } else { die "contact_points not specified"; }

    # Booleans
    for (qw/anyevent warmup tls default_idempotency/) {
        if (exists($config->{$_})) {
            $self->{$_}= !!$config->{$_};
        }
    }

    # Numbers, ignore undef
    for (qw/port timer_granularity request_timeout max_connections max_concurrent_queries/) {
        if (defined($config->{$_})) {
            $self->{$_}= 0+ $config->{$_};
        }
    }
    # Numbers, undef actually means undef
    for (qw/max_page_size/) {
        if (exists($config->{$_})) {
            $self->{$_}= defined($config->{$_}) ? (0+ $config->{$_}) : undef;
        }
    }

lib/Cassandra/Client/Connection.pm  view on Meta::CPAN

                return $whilst_next->($error) if $error;

                my %versions;
                $versions{$_->{schema_version}}= 1 for values %$network_status;
                if (keys %versions > 1) {
                    if ($waited >= $max_wait) {
                        return $whilst_next->("wait_for_schema_agreement timed out after $waited seconds");
                    }

                    $waited += $wait_delay;
                    return $self->{async_io}->timer($whilst_next, $wait_delay);
                } else {
                    $done= 1;
                    return $whilst_next->();
                }
            });
        },
        $callback,
    );

    return;

lib/Cassandra/Client/NetworkStatus.pm  view on Meta::CPAN

        sub { # condition
            !$self->{shutdown} && !$self->{master_id}
        },
        sub { # while
            my ($wnext)= @_;
            series([
                sub {
                    my ($next)= @_;
                    if ($attempts++) {
                        # Don't retry immediately
                        $self->{async_io}->timer($next, 1);
                    } else {
                        $next->();
                    }
                },
                sub {
                    my ($next)= @_;
                    $pool->get_one_cb($next);
                },
                sub {
                    my ($next, $connection)= @_;



( run in 0.649 second using v1.01-cache-2.11-cpan-49f99fa48dc )