Database-Async
view release on metacpan or search on metacpan
lib/Database/Async/Query.pm view on Meta::CPAN
my $q = $self->db->query(q{select name from "site"."user" where id = ?});
$q->prepare(
statement => 'name_by_user_id'
)
};
my ($name) = await $q->execute([ $id ])->single;
return $name;
}
=head2 Custom engine features
Different engines support additional features or events.
Once a query is scheduled onto an engine, it will resolve the L</engine> L<Future>
instance:
my $query = $db->query('select * from some_table');
my $engine = await $query->engine;
$engine->source('notification')
->map('payload')
->say;
=head2 Cancelling queries
In cases where you want to terminate a query early, use the L</cancel> method.
This will ask the engine to stop query execution if already scheduled. For a query
which has not yet been assigned to an engine, the L</cancel> method will cancel
the schedule request.
=head2 Cursors
Cursors are handled as normal SQL queries.
$db->txn(async sub {
my ($txn) = @_;
await $txn->query(q{declare c cursor for select id from xyz})->void;
say while await $txn->query(q{fetch next from c})->single;
await $txn->query(q{close c})->void;
});
=cut
no indirect;
use Database::Async::Row;
use Future;
use Syntax::Keyword::Try;
use Ryu::Async;
use Scalar::Util qw(blessed);
use Log::Any qw($log);
use overload
'""' => sub { my ($self) = @_; sprintf '%s[%s]', ref($self), $self->sql },
bool => sub { 1 },
fallback => 1;
sub new {
my ($class, %args) = @_;
Scalar::Util::weaken($args{db});
bless \%args, $class;
}
=head2 in
This is a L<Ryu::Sink> used for queries which stream data to the server.
It's buffered internally.
=cut
sub in {
my ($self) = @_;
$self->{in} //= do {
my $sink = $self->db->new_sink;
die 'already have streaming input but no original ->{in} sink' if $self->{streaming_input};
$sink->source->completed->on_ready(sub { $log->debugf('Sink for %s completed with %s', $self, shift->state) });
$self->{streaming_input} = $sink->source->buffer->pause;
$self->ready_to_stream->on_done(sub {
$log->debugf('Ready to stream, resuming streaming input');
$self->streaming_input->resume;
});
$sink
}
}
#sub {
# my ($self) = @_;
# my $engine = $self->{engine} or die 'needs a valid ::Engine instance';
# my $sink = $self->in or die 'had no valid sink for streaming input';
#
# my $src = $sink->buffer;
# $src->pause;
# $engine->stream_from($src);
#
# $self->ready_to_stream
# ->on_done(sub {
# $log->tracef('Ready to stream for %s', $sink);
# $src->resume;
# })->on_fail(sub {
# $src->completed->fail(@_) unless $sink->completed->is_ready;
# })->on_cancel(sub {
# $src->completed->cancel unless $sink->completed->is_ready;
# });
#}
sub streaming_input { shift->{streaming_input} // die '->in has not yet been called' }
sub finish {
my ($self) = @_;
if($self->{in}) {
$self->input_stream->done;
} else {
$self->input_stream->cancel;
}
}
=head2 db
Accessor for the L<Database::Async> instance.
( run in 0.779 second using v1.01-cache-2.11-cpan-39bf76dae61 )