Database-Async

 view release on metacpan or  search on metacpan

lib/Database/Async/Query.pm  view on Meta::CPAN

   my $q = $self->db->query(q{select name from "site"."user" where id = ?});
   $q->prepare(
    statement => 'name_by_user_id'
   )
  };
  my ($name) = await $q->execute([ $id ])->single;
  return $name;
 }

=head2 Custom engine features

Different engines support additional features or events.

Once a query is scheduled onto an engine, it will resolve the L</engine> L<Future>
instance:

 my $query = $db->query('select * from some_table');
 my $engine = await $query->engine;
 $engine->source('notification')
  ->map('payload')
  ->say;

=head2 Cancelling queries

In cases where you want to terminate a query early, use the L</cancel> method.
This will ask the engine to stop query execution if already scheduled. For a query
which has not yet been assigned to an engine, the L</cancel> method will cancel
the schedule request.

=head2 Cursors

Cursors are handled as normal SQL queries.

 $db->txn(async sub {
  my ($txn) = @_;
  await $txn->query(q{declare c cursor for select id from xyz})->void;
  say while await $txn->query(q{fetch next from c})->single;
  await $txn->query(q{close c})->void;
 });

=cut

no indirect;

use Database::Async::Row;

use Future;
use Syntax::Keyword::Try;
use Ryu::Async;
use Scalar::Util qw(blessed);

use Log::Any qw($log);

use overload
    '""' => sub { my ($self) = @_; sprintf '%s[%s]', ref($self), $self->sql },
    bool => sub { 1 },
    fallback => 1;

sub new {
    my ($class, %args) = @_;
    Scalar::Util::weaken($args{db});
    bless \%args, $class;
}

=head2 in

This is a L<Ryu::Sink> used for queries which stream data to the server.

It's buffered internally.

=cut

sub in {
    my ($self) = @_;
    $self->{in} //= do {
        my $sink = $self->db->new_sink;
        die 'already have streaming input but no original ->{in} sink' if $self->{streaming_input};
        $sink->source->completed->on_ready(sub { $log->debugf('Sink for %s completed with %s', $self, shift->state) });
        $self->{streaming_input} = $sink->source->buffer->pause;
        $self->ready_to_stream->on_done(sub {
            $log->debugf('Ready to stream, resuming streaming input');
            $self->streaming_input->resume;
        });
        $sink
    }
}

#sub {
#    my ($self) = @_;
#    my $engine = $self->{engine} or die 'needs a valid ::Engine instance';
#    my $sink = $self->in or die 'had no valid sink for streaming input';
#
#    my $src = $sink->buffer;
#    $src->pause;
#    $engine->stream_from($src);
#
#    $self->ready_to_stream
#        ->on_done(sub {
#            $log->tracef('Ready to stream for %s', $sink);
#            $src->resume;
#        })->on_fail(sub {
#            $src->completed->fail(@_) unless $sink->completed->is_ready;
#        })->on_cancel(sub {
#            $src->completed->cancel unless $sink->completed->is_ready;
#        });
#}

sub streaming_input { shift->{streaming_input} // die '->in has not yet been called' }

sub finish {
    my ($self) = @_;
    if($self->{in}) {
        $self->input_stream->done;
    } else {
        $self->input_stream->cancel;
    }
}

=head2 db

Accessor for the L<Database::Async> instance.



( run in 0.779 second using v1.01-cache-2.11-cpan-39bf76dae61 )