Database-Async

 view release on metacpan or  search on metacpan

lib/Database/Async/Query.pm  view on Meta::CPAN

 my $query = $db->query('select * from some_table');
 my $engine = await $query->engine;
 $engine->source('notification')
  ->map('payload')
  ->say;

=head2 Cancelling queries

In cases where you want to terminate a query early, use the L</cancel> method.
This will ask the engine to stop query execution if already scheduled. For a query
which has not yet been assigned to an engine, the L</cancel> method will cancel
the schedule request.

=head2 Cursors

Cursors are handled as normal SQL queries.

 $db->txn(async sub {
  my ($txn) = @_;
  await $txn->query(q{declare c cursor for select id from xyz})->void;
  say while await $txn->query(q{fetch next from c})->single;
  await $txn->query(q{close c})->void;
 });

=cut

no indirect;

use Database::Async::Row;

use Future;
use Syntax::Keyword::Try;
use Ryu::Async;
use Scalar::Util qw(blessed);

use Log::Any qw($log);

use overload
    '""' => sub { my ($self) = @_; sprintf '%s[%s]', ref($self), $self->sql },
    bool => sub { 1 },
    fallback => 1;

sub new {
    my ($class, %args) = @_;
    Scalar::Util::weaken($args{db});
    bless \%args, $class;
}

=head2 in

This is a L<Ryu::Sink> used for queries which stream data to the server.

It's buffered internally.

=cut

sub in {
    my ($self) = @_;
    $self->{in} //= do {
        my $sink = $self->db->new_sink;
        die 'already have streaming input but no original ->{in} sink' if $self->{streaming_input};
        $sink->source->completed->on_ready(sub { $log->debugf('Sink for %s completed with %s', $self, shift->state) });
        $self->{streaming_input} = $sink->source->buffer->pause;
        $self->ready_to_stream->on_done(sub {
            $log->debugf('Ready to stream, resuming streaming input');
            $self->streaming_input->resume;
        });
        $sink
    }
}

#sub {
#    my ($self) = @_;
#    my $engine = $self->{engine} or die 'needs a valid ::Engine instance';
#    my $sink = $self->in or die 'had no valid sink for streaming input';
#
#    my $src = $sink->buffer;
#    $src->pause;
#    $engine->stream_from($src);
#
#    $self->ready_to_stream
#        ->on_done(sub {
#            $log->tracef('Ready to stream for %s', $sink);
#            $src->resume;
#        })->on_fail(sub {
#            $src->completed->fail(@_) unless $sink->completed->is_ready;
#        })->on_cancel(sub {
#            $src->completed->cancel unless $sink->completed->is_ready;
#        });
#}

sub streaming_input { shift->{streaming_input} // die '->in has not yet been called' }

sub finish {
    my ($self) = @_;
    if($self->{in}) {
        $self->input_stream->done;
    } else {
        $self->input_stream->cancel;
    }
}

=head2 db

Accessor for the L<Database::Async> instance.

=cut

sub db { shift->{db} }

=head2 sql

The SQL string that this query would be running.

=cut

sub sql { shift->{sql} }

=head2 bind

A list of bind parameters for this query, can be empty.

=cut

sub bind { @{shift->{bind}} }

sub row_description {
    my ($self, $desc) = @_;
    $log->tracef('Have row description %s', $desc);
    my @names = map { $_->{name} } $desc->@*;
    $self->{field_names} = \@names;
    $self->{field_by_name} = {
        # First column wins by default if we have multiple hits
        map { $names[$_] => $_ } reverse 0..$#names
    };
    $self
}

sub row {
    my ($self, $row) = @_;
    $log->tracef('Have row %s', $row);
    $self->row_data->emit($row);
}

sub row_hashrefs {
    my ($self) = @_;
    $self->{row_hashrefs} //= $self->row_data
        ->map(sub {
            my ($row) = @_;
            +{
                map {;
                    $self->{field_names}[$_] => $row->[$_]



( run in 3.279 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )