Database-Async
view release on metacpan or search on metacpan
lib/Database/Async/Query.pm view on Meta::CPAN
my $query = $db->query('select * from some_table');
my $engine = await $query->engine;
$engine->source('notification')
->map('payload')
->say;
=head2 Cancelling queries
In cases where you want to terminate a query early, use the L</cancel> method.
This will ask the engine to stop query execution if already scheduled. For a query
which has not yet been assigned to an engine, the L</cancel> method will cancel
the schedule request.
=head2 Cursors
Cursors are handled as normal SQL queries.
$db->txn(async sub {
my ($txn) = @_;
await $txn->query(q{declare c cursor for select id from xyz})->void;
say while await $txn->query(q{fetch next from c})->single;
await $txn->query(q{close c})->void;
});
=cut
no indirect;
use Database::Async::Row;
use Future;
use Syntax::Keyword::Try;
use Ryu::Async;
use Scalar::Util qw(blessed);
use Log::Any qw($log);
use overload
'""' => sub { my ($self) = @_; sprintf '%s[%s]', ref($self), $self->sql },
bool => sub { 1 },
fallback => 1;
sub new {
my ($class, %args) = @_;
Scalar::Util::weaken($args{db});
bless \%args, $class;
}
=head2 in
This is a L<Ryu::Sink> used for queries which stream data to the server.
It's buffered internally.
=cut
sub in {
my ($self) = @_;
$self->{in} //= do {
my $sink = $self->db->new_sink;
die 'already have streaming input but no original ->{in} sink' if $self->{streaming_input};
$sink->source->completed->on_ready(sub { $log->debugf('Sink for %s completed with %s', $self, shift->state) });
$self->{streaming_input} = $sink->source->buffer->pause;
$self->ready_to_stream->on_done(sub {
$log->debugf('Ready to stream, resuming streaming input');
$self->streaming_input->resume;
});
$sink
}
}
#sub {
# my ($self) = @_;
# my $engine = $self->{engine} or die 'needs a valid ::Engine instance';
# my $sink = $self->in or die 'had no valid sink for streaming input';
#
# my $src = $sink->buffer;
# $src->pause;
# $engine->stream_from($src);
#
# $self->ready_to_stream
# ->on_done(sub {
# $log->tracef('Ready to stream for %s', $sink);
# $src->resume;
# })->on_fail(sub {
# $src->completed->fail(@_) unless $sink->completed->is_ready;
# })->on_cancel(sub {
# $src->completed->cancel unless $sink->completed->is_ready;
# });
#}
sub streaming_input { shift->{streaming_input} // die '->in has not yet been called' }
sub finish {
my ($self) = @_;
if($self->{in}) {
$self->input_stream->done;
} else {
$self->input_stream->cancel;
}
}
=head2 db
Accessor for the L<Database::Async> instance.
=cut
sub db { shift->{db} }
=head2 sql
The SQL string that this query would be running.
=cut
sub sql { shift->{sql} }
=head2 bind
A list of bind parameters for this query, can be empty.
=cut
sub bind { @{shift->{bind}} }
sub row_description {
my ($self, $desc) = @_;
$log->tracef('Have row description %s', $desc);
my @names = map { $_->{name} } $desc->@*;
$self->{field_names} = \@names;
$self->{field_by_name} = {
# First column wins by default if we have multiple hits
map { $names[$_] => $_ } reverse 0..$#names
};
$self
}
sub row {
my ($self, $row) = @_;
$log->tracef('Have row %s', $row);
$self->row_data->emit($row);
}
sub row_hashrefs {
my ($self) = @_;
$self->{row_hashrefs} //= $self->row_data
->map(sub {
my ($row) = @_;
+{
map {;
$self->{field_names}[$_] => $row->[$_]
( run in 3.279 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )