Cassandra-Client

 view release on metacpan or  search on metacpan

lib/Cassandra/Client.pm  view on Meta::CPAN

31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
our $XS_VERSION = ($Cassandra::Client::VERSION || '');
$XS_VERSION =~ s/\A(\d+)\.(\d+)(\d{3})\z/$1.$2_$3/;
XSLoader::load(__PACKAGE__, $XS_VERSION);
 
sub new {
    my ($class, %args)= @_;
 
    my $self= bless {
        connected         => 0,
        connect_callbacks => undef,
        shutdown          => 0,
 
        active_queries    => 0,
    }, $class;
 
    my $options= Cassandra::Client::Config->new(
        \%args
    );
 
    $self->{throttler}= $options->{throttler} || Cassandra::Client::Policy::Throttle::Default->new();

lib/Cassandra/Client.pm  view on Meta::CPAN

81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
sub _connect {
    my ($self, $callback)= @_;
    return _cb($callback) if $self->{connected};
    return _cb($callback, 'Cannot connect: shutdown() has been called') if $self->{shutdown};
 
    # This is ONLY useful if the user doesn't throw away the C::C object on connect errors.
    if (!$self->{connecting} && (my $error= $self->{throttler}->should_fail())) {
        return _cb($callback, $error);
    }
 
    push @{$self->{connect_callbacks}||=[]}, $callback;
    if ($self->{connecting}++) {
        return;
    }
 
    my @contact_points= shuffle @{$self->{options}{contact_points}};
    my $last_error= "No hosts to connect to";
 
    my $next_connect;
    $next_connect= sub {
        my $contact_point= shift @contact_points;
        if (!$contact_point) {
            delete $self->{connecting};
            undef $next_connect;
            _cb($_, "Unable to connect to any Cassandra server. Last error: $last_error") for @{delete $self->{connect_callbacks}};
            return;
        }
 
        my $connection= Cassandra::Client::Connection->new(
            client => $self,
            options => $self->{options},
            host => $contact_point,
            async_io => $self->{async_io},
            metadata => $self->{metadata},
        );

lib/Cassandra/Client.pm  view on Meta::CPAN

127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
            my $error= shift;
            $self->{throttler}->count($error);
            if ($error) {
                $last_error= "On $contact_point: $error";
                return $next_connect->();
            }
 
            undef $next_connect;
            $self->{connected}= 1;
            delete $self->{connecting};
            _cb($_) for @{delete $self->{connect_callbacks}};
        });
    };
    $next_connect->();
 
    return;
}
 
sub shutdown {
    my ($self)= @_;

lib/Cassandra/Client.pm  view on Meta::CPAN

706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
All C<Cassandra::Client> methods are available as synchronous methods by using their normal names. For example, C<< $client->connect(); >> will block until the client has connected. Similarly, C<< $client->execute($query) >> will wait for the query r...
 
    my $client= Cassandra::Client->new( ... );
    $client->connect;
    $client->execute("INSERT INTO my_table (id, value) VALUES (?, ?) USING TTL ?",
        [ 1, "test", 86400 ],
        { consistency => "quorum" });
 
=head2 Promises
 
C<Cassandra::Client> methods are also available as promises (see perldoc L<AnyEvent::XSPromises>). This integrates well with other libraries that deal with promises or asynchronous callbacks. Note that for promises to work, C<AnyEvent> is required, a...
 
Promise variants are available by prefixing method names with C<async_>, eg. C<async_connect>, C<async_execute>, etc. The usual result of the method is passed to the promise's success handler, or to the failure handler if there was an error.
 
    # Asynchronously pages through the result set, processing data as it comes in.
    my $promise= $client->async_each_page("SELECT id, column FROM my_table WHERE id=?", [ 5 ], undef, sub {
        for my $row (@{shift->rows}) {
            my ($id, $column)= @$row;
            say "Row: $id $column";
        }
    })->then(sub {

lib/Cassandra/Client/AsyncEV.pm  view on Meta::CPAN

143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
    my ($done, $in_run);
    my @output;
    my $callback= sub {
        $done= 1;
        @output= @_;
        $self->{ev}->break() if $in_run;
    };
 
    $$output= sub {
        if ($self->{in_wait}) {
            die "Unable to recursively wait for callbacks; are you doing synchronous Cassandra queries from asynchronous callbacks?";
        }
        local $self->{in_wait}= 1;
 
        $in_run= 1;
        $self->{ev}->run unless $done;
        return @output;
    };
 
    return $callback;
}

lib/Cassandra/Client/Pool.pm  view on Meta::CPAN

222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
$connection->connect(sub {
    my ($error)= @_;
 
    delete $self->{connecting}{$host};
    if ($error) {
        $self->{policy}->set_disconnected($host);
 
        if (my $waiters= delete $self->{wait_connect}) {
            if ($self->{count} && @$waiters) {
                warn 'We have callbacks waiting for a connection while we\'re connected';
            }
 
            my $max_conn= $self->{max_connections};
            my $known_node_count= $self->{policy}->known_node_count;
            my $max_attempts = ($max_conn < $known_node_count ? $max_conn : $known_node_count) + 1;
 
            for my $waiter (@$waiters) {
                if ((++$waiter->{attempts}) >= $max_attempts || !%{$self->{connecting}}) {
                    $waiter->{callback}->("Failed to connect to server: $error");
                } else {



( run in 0.300 second using v1.01-cache-2.11-cpan-496ff517765 )