Database-Async

 view release on metacpan or  search on metacpan

Changes  view on Meta::CPAN


0.014     2021-06-16 00:58:19+08:00 Asia/Kuala_Lumpur
    [New features]

    - encoding now supported, for cases where all interaction with the database
    is text-only

    [Bugs fixed]

    - query completion logic has been refactored to cover the different states better,
    particularly regarding streaming in/out (e.g. COPY in PostgreSQL)

0.013     2020-12-03 00:13:37+08:00 Asia/Kuala_Lumpur
    New features:

    - support for `type` configuration parameter, allowing connection to databases
    without needing to pass a specific URI
    - notification channel support (currently undocumented, since the interface
    may change slightly to make it compatible with non-PostgreSQL databases)

    Bugs fixed:

lib/Database/Async/Query.pm  view on Meta::CPAN

This is a L<Ryu::Sink> used for queries which stream data to the server.

It's buffered internally.

=cut

sub in {
    my ($self) = @_;
    $self->{in} //= do {
        my $sink = $self->db->new_sink;
        die 'already have streaming input but no original ->{in} sink' if $self->{streaming_input};
        $sink->source->completed->on_ready(sub { $log->debugf('Sink for %s completed with %s', $self, shift->state) });
        $self->{streaming_input} = $sink->source->buffer->pause;
        $self->ready_to_stream->on_done(sub {
            $log->debugf('Ready to stream, resuming streaming input');
            $self->streaming_input->resume;
        });
        $sink
    }
}

#sub {
#    my ($self) = @_;
#    my $engine = $self->{engine} or die 'needs a valid ::Engine instance';
#    my $sink = $self->in or die 'had no valid sink for streaming input';
#
#    my $src = $sink->buffer;
#    $src->pause;
#    $engine->stream_from($src);
#
#    $self->ready_to_stream
#        ->on_done(sub {
#            $log->tracef('Ready to stream for %s', $sink);
#            $src->resume;
#        })->on_fail(sub {
#            $src->completed->fail(@_) unless $sink->completed->is_ready;
#        })->on_cancel(sub {
#            $src->completed->cancel unless $sink->completed->is_ready;
#        });
#}

sub streaming_input { shift->{streaming_input} // die '->in has not yet been called' }

sub finish {
    my ($self) = @_;
    if($self->{in}) {
        $self->input_stream->done;
    } else {
        $self->input_stream->cancel;
    }
}



( run in 0.228 second using v1.01-cache-2.11-cpan-4d50c553e7e )