Cassandra-Client

 view release on metacpan or  search on metacpan

lib/Cassandra/Client.pm  view on Meta::CPAN


our $XS_VERSION = ($Cassandra::Client::VERSION || '');
$XS_VERSION =~ s/\A(\d+)\.(\d+)(\d{3})\z/$1.$2_$3/;
XSLoader::load(__PACKAGE__, $XS_VERSION);

sub new {
    my ($class, %args)= @_;

    my $self= bless {
        connected         => 0,
        connect_callbacks => undef,
        shutdown          => 0,

        active_queries    => 0,
    }, $class;

    my $options= Cassandra::Client::Config->new(
        \%args
    );

    $self->{throttler}= $options->{throttler} || Cassandra::Client::Policy::Throttle::Default->new();

lib/Cassandra/Client.pm  view on Meta::CPAN

sub _connect {
    my ($self, $callback)= @_;
    return _cb($callback) if $self->{connected};
    return _cb($callback, 'Cannot connect: shutdown() has been called') if $self->{shutdown};

    # This is ONLY useful if the user doesn't throw away the C::C object on connect errors.
    if (!$self->{connecting} && (my $error= $self->{throttler}->should_fail())) {
        return _cb($callback, $error);
    }

    push @{$self->{connect_callbacks}||=[]}, $callback;
    if ($self->{connecting}++) {
        return;
    }

    my @contact_points= shuffle @{$self->{options}{contact_points}};
    my $last_error= "No hosts to connect to";

    my $next_connect;
    $next_connect= sub {
        my $contact_point= shift @contact_points;
        if (!$contact_point) {
            delete $self->{connecting};
            undef $next_connect;
            _cb($_, "Unable to connect to any Cassandra server. Last error: $last_error") for @{delete $self->{connect_callbacks}};
            return;
        }

        my $connection= Cassandra::Client::Connection->new(
            client => $self,
            options => $self->{options},
            host => $contact_point,
            async_io => $self->{async_io},
            metadata => $self->{metadata},
        );

lib/Cassandra/Client.pm  view on Meta::CPAN

            my $error= shift;
            $self->{throttler}->count($error);
            if ($error) {
                $last_error= "On $contact_point: $error";
                return $next_connect->();
            }

            undef $next_connect;
            $self->{connected}= 1;
            delete $self->{connecting};
            _cb($_) for @{delete $self->{connect_callbacks}};
        });
    };
    $next_connect->();

    return;
}

sub shutdown {
    my ($self)= @_;

lib/Cassandra/Client.pm  view on Meta::CPAN

All C<Cassandra::Client> methods are available as synchronous methods by using their normal names. For example, C<< $client->connect(); >> will block until the client has connected. Similarly, C<< $client->execute($query) >> will wait for the query r...

    my $client= Cassandra::Client->new( ... );
    $client->connect;
    $client->execute("INSERT INTO my_table (id, value) VALUES (?, ?) USING TTL ?",
        [ 1, "test", 86400 ],
        { consistency => "quorum" });

=head2 Promises

C<Cassandra::Client> methods are also available as promises (see perldoc L<AnyEvent::XSPromises>). This integrates well with other libraries that deal with promises or asynchronous callbacks. Note that for promises to work, C<AnyEvent> is required, a...

Promise variants are available by prefixing method names with C<async_>, eg. C<async_connect>, C<async_execute>, etc. The usual result of the method is passed to the promise's success handler, or to the failure handler if there was an error.

    # Asynchronously pages through the result set, processing data as it comes in.
    my $promise= $client->async_each_page("SELECT id, column FROM my_table WHERE id=?", [ 5 ], undef, sub {
        for my $row (@{shift->rows}) {
            my ($id, $column)= @$row;
            say "Row: $id $column";
        }
    })->then(sub {

lib/Cassandra/Client/AsyncEV.pm  view on Meta::CPAN

    my ($done, $in_run);
    my @output;
    my $callback= sub {
        $done= 1;
        @output= @_;
        $self->{ev}->break() if $in_run;
    };

    $$output= sub {
        if ($self->{in_wait}) {
            die "Unable to recursively wait for callbacks; are you doing synchronous Cassandra queries from asynchronous callbacks?";
        }
        local $self->{in_wait}= 1;

        $in_run= 1;
        $self->{ev}->run unless $done;
        return @output;
    };

    return $callback;
}

lib/Cassandra/Client/Pool.pm  view on Meta::CPAN


    $connection->connect(sub {
        my ($error)= @_;

        delete $self->{connecting}{$host};
        if ($error) {
            $self->{policy}->set_disconnected($host);

            if (my $waiters= delete $self->{wait_connect}) {
                if ($self->{count} && @$waiters) {
                    warn 'We have callbacks waiting for a connection while we\'re connected';
                }

                my $max_conn= $self->{max_connections};
                my $known_node_count= $self->{policy}->known_node_count;
                my $max_attempts = ($max_conn < $known_node_count ? $max_conn : $known_node_count) + 1;

                for my $waiter (@$waiters) {
                    if ((++$waiter->{attempts}) >= $max_attempts || !%{$self->{connecting}}) {
                        $waiter->{callback}->("Failed to connect to server: $error");
                    } else {



( run in 0.675 second using v1.01-cache-2.11-cpan-9b1e4054eb1 )