Cassandra-Client

 view release on metacpan or  search on metacpan

Changes  view on Meta::CPAN

      * Deduplicate parallel prepare() calls for the same query
      * Deal with some of Perl's special variables like $"
      * Avoid leaking file descriptors in some cases
      * Support for User-Defined Types (UDT)
      * Support for the 'Tuple' data type

0.10    2017/01/30

      * Add a retry policy implementation for generic request failures
      * Throttler: finally fix the feedback mechanism
      * Rework shutdown() to not need asynchronous code
      * Rework the pool management logic
      * Support for UTF-8 authentication data
      * Convenience method row_hashes() on a ResultSet
      * Basic TLS support
      * Make sure file handles always get closed when shutdown() is called

0.09    2016/11/15

      * Fix support for Perl 5.20 and later
      * Fix a class of memory leaks caused by recursion using closures
      * Add support for TIME/DATE/TINYINT/SMALLINT data types added in CQL 3.3
      * Avoid SIGPIPE when we connect to localhost but Cassandra is not present
      * Add support for named parameters

0.08    2016/10/25

      * Change event loop implementation to EV
      * Redesign wait_for_schema_agreement, to actually wait for the schema agreement
      * Support for proper retry settings
      * Handle request backlogs by queueing them or failing them early
      * Add stack traces to asynchronous queries with errors
      * Fix handling of nested types, like list<frozen<map<int,boolean>>>

0.07    2016/10/10

      * Experimental support for client-side throttling
      * No more Perl 5.8 support -- 5.10 is now required
      * Addresses an error that can occur when a query is retried against a node that doesn't know that query
      * Fix a rare issue causing random "Request timed out" when utf8-flagged data is passed to non-utf8 types (ascii, blob)

0.06    2016/09/26

      * Hopefully fixes support for Perl 5.8
      * Addresses a documentation hole about the 'timestamp' type
      * Support for the 'varint' type
      * Support for the 'decimal' type
      * Addresses an issue that occurred when both the AnyEvent and synchronous options were used
      * Some refactoring to allow for future changes

0.05    2016/09/06

      * Updates Makefile.PL to list the correct requirements

0.04    2016/09/05

      * First version, released on an unsuspecting world.

lib/Cassandra/Client.pm  view on Meta::CPAN

    eval {
        &$cb; 1
    } or do {
        my $error= $@ || "unknown error";
        warn "Ignoring unhandled exception in callback: $error";
    };

    return;
}

sub _mksync { # Translates an asynchronous call into something that looks like Perl
    my ($sub)= @_;
    return sub {
        my $self= shift;
        $sub->($self, $self->{async_io}->wait(my $w), @_);
        my ($err, @output)= $w->();
        if ($err) { die $err; }
        return @output;
    };
}

lib/Cassandra/Client.pm  view on Meta::CPAN

=head1 NAME

Cassandra::Client - Perl library for accessing Cassandra using its binary network protocol

=head1 VERSION

version 0.21

=head1 DESCRIPTION

C<Cassandra::Client> is a Perl library giving its users access to the Cassandra database, through the native protocol. Both synchronous and asynchronous querying is supported, through various common calling styles.

=head1 EXAMPLE

    use Cassandra::Client;
    my $client= Cassandra::Client->new(
        contact_points => [ '127.0.0.1', '192.168.0.1' ],
        username => "my_user",
        password => "my_password",
        keyspace => "my_keyspace",
    );

lib/Cassandra/Client.pm  view on Meta::CPAN

Disconnect all connections and abort all current queries. After this, the C<Cassandra::Client> object considers itself shut down and must be reconstructed with C<new()>.

=item $client->wait_for_schema_agreement()

Wait until all nodes agree on the schema version. Useful after changing table or keyspace definitions.

=back

=head1 (A?)SYNCHRONOUS

It's up to the user to choose which calling style to use: synchronous, asynchronous with promises, or through returned coderefs.

=head2 Synchronous

All C<Cassandra::Client> methods are available as synchronous methods by using their normal names. For example, C<< $client->connect(); >> will block until the client has connected. Similarly, C<< $client->execute($query) >> will wait for the query r...

    my $client= Cassandra::Client->new( ... );
    $client->connect;
    $client->execute("INSERT INTO my_table (id, value) VALUES (?, ?) USING TTL ?",
        [ 1, "test", 86400 ],
        { consistency => "quorum" });

=head2 Promises

C<Cassandra::Client> methods are also available as promises (see perldoc L<AnyEvent::XSPromises>). This integrates well with other libraries that deal with promises or asynchronous callbacks. Note that for promises to work, C<AnyEvent> is required, a...

Promise variants are available by prefixing method names with C<async_>, eg. C<async_connect>, C<async_execute>, etc. The usual result of the method is passed to the promise's success handler, or to the failure handler if there was an error.

    # Asynchronously pages through the result set, processing data as it comes in.
    my $promise= $client->async_each_page("SELECT id, column FROM my_table WHERE id=?", [ 5 ], undef, sub {
        for my $row (@{shift->rows}) {
            my ($id, $column)= @$row;
            say "Row: $id $column";
        }
    })->then(sub {
        say "We finished paging through all the rows";
    }, sub {
        my $error= shift;
    });

lib/Cassandra/Client.pm  view on Meta::CPAN

        my $error= shift;
        warn "Unhandled error! $error";
        $condvar->send;
    });
    $condvar->recv; # Wait for the promsie to resolve or fail

How you integrate this into your infrastructure is of course up to you, and beyond the scope of the C<Cassandra::Client> documentation.

=head2 Coderefs

These are the simplest form of asynchronous querying in C<Cassandra::Client>. Instead of dealing with complex callback resolution, the client simply returns a coderef that, once invoked, returns what the original method would have retruned.

The variants are available by prefixing method names with C<future_>, eg. C<future_connect>, C<future_execute>, etc. These methods return a coderef.

    my $coderef= $client->future_execute("INSERT INTO table (id, value) VALUES (?, ?), [ $id, $value ]);

    # Do other things
    ...

    # Wait for the query to finish
    $coderef->();

Upon errors, the coderef will die, just like the synchronous methods would. Because of this, invoking the coderef immediately after getting it is equal to using the synchronous methods :

    # This :
    $client->connect;

    # Is the same as :
    $client->future_connect->();

When used properly, coderefs can give a modest performance boost, but their real value is in the ease of use compared to promises.

=head1 CAVEATS, BUGS, TODO

lib/Cassandra/Client/AsyncEV.pm  view on Meta::CPAN

    my ($done, $in_run);
    my @output;
    my $callback= sub {
        $done= 1;
        @output= @_;
        $self->{ev}->break() if $in_run;
    };

    $$output= sub {
        if ($self->{in_wait}) {
            die "Unable to recursively wait for callbacks; are you doing synchronous Cassandra queries from asynchronous callbacks?";
        }
        local $self->{in_wait}= 1;

        $in_run= 1;
        $self->{ev}->run unless $done;
        return @output;
    };

    return $callback;
}

t/50-bench.t  view on Meta::CPAN

my $multiply= $ENV{BENCH_MULTIPLY} || 1;
for (1..$rounds) {
    SYNC_INS: {
        next if $ENV{BENCH_SKIP_SYNC};
        my $num= 1000 * $multiply;
        my $t0= -time();
        for (1..$num) {
            $client->execute($insert_query, [ $_, $_ * 2 ]);
        }
        my $diff= time() + $t0;
        ok(1, sprintf "$num synchronous inserts: %.1f seconds", $diff);
    }
    SYNC_SEL: {
        next if $ENV{BENCH_SKIP_SYNC};
        my $num= 1000 * $multiply;
        my $t0= -time();
        for (1..$num) {
            $client->execute($select_query, [ $_ ]);
        }
        my $diff= time() + $t0;
        ok(1, sprintf "$num synchronous selects: %.1f seconds", $diff);
    }
    PROMISES_INS: {
        my $num= 1000 * $multiply;
        my $t0= -time();
        my @promises;
        for (1..$num) {
            push @promises, $client->async_execute($insert_query, [ $_, $_ * 2 ]);
        }
        my $cv= AE::cv;
        my $fail;
        collect(@promises)->then(sub { $cv->send; }, sub { $fail= 1; $cv->send; });
        $cv->recv;
        my $diff= time() + $t0;
        ok(1, sprintf "$num asynchronous inserts: %.1f seconds", $diff);
    }
    PROMISES_SEL: {
        my $num= 1000 * $multiply;
        my $t0= -time();
        my @promises;
        for (1..$num) {
            push @promises, $client->async_execute($select_query, [ $_ ]);
        }
        my $cv= AE::cv;
        my $fail;
        collect(@promises)->then(sub { $cv->send; }, sub { $fail= 1; $cv->send; });
        $cv->recv;
        my $diff= time() + $t0;
        ok(1, sprintf "$num asynchronous selects: %.1f seconds", $diff);
    }
    FUTURES_INS: {
        my $num= 1000 * $multiply;
        my $t0= -time();
        my @futures;
        for (1..$num) {
            push @futures, $client->future_execute($insert_query, [ $_, $_ * 2 ]);
        }
        $_->() for @futures;
        my $diff= time() + $t0;
        ok(1, sprintf "$num synchronous inserts via futures: %.1f seconds", $diff);
    }
    FUTURES_SEL: {
        my $num= 1000 * $multiply;
        my $t0= -time();
        my @futures;
        for (1..$num) {
            push @futures, $client->future_execute($select_query, [ $_ ]);
        }
        $_->() for @futures;
        my $diff= time() + $t0;
        ok(1, sprintf "$num synchronous selects via futures: %.1f seconds", $diff);
    }
}

done_testing;



( run in 0.519 second using v1.01-cache-2.11-cpan-0d8aa00de5b )