Database-Async
view release on metacpan or search on metacpan
0.014 2021-06-16 00:58:19+08:00 Asia/Kuala_Lumpur
[New features]
- encoding now supported, for cases where all interaction with the database
is text-only
[Bugs fixed]
- query completion logic has been refactored to cover the different states better,
particularly regarding streaming in/out (e.g. COPY in PostgreSQL)
0.013 2020-12-03 00:13:37+08:00 Asia/Kuala_Lumpur
New features:
- support for `type` configuration parameter, allowing connection to databases
without needing to pass a specific URI
- notification channel support (currently undocumented, since the interface
may change slightly to make it compatible with non-PostgreSQL databases)
Bugs fixed:
lib/Database/Async/Query.pm view on Meta::CPAN
This is a L<Ryu::Sink> used for queries which stream data to the server.
It's buffered internally.
=cut
sub in {
my ($self) = @_;
$self->{in} //= do {
my $sink = $self->db->new_sink;
die 'already have streaming input but no original ->{in} sink' if $self->{streaming_input};
$sink->source->completed->on_ready(sub { $log->debugf('Sink for %s completed with %s', $self, shift->state) });
$self->{streaming_input} = $sink->source->buffer->pause;
$self->ready_to_stream->on_done(sub {
$log->debugf('Ready to stream, resuming streaming input');
$self->streaming_input->resume;
});
$sink
}
}
#sub {
# my ($self) = @_;
# my $engine = $self->{engine} or die 'needs a valid ::Engine instance';
# my $sink = $self->in or die 'had no valid sink for streaming input';
#
# my $src = $sink->buffer;
# $src->pause;
# $engine->stream_from($src);
#
# $self->ready_to_stream
# ->on_done(sub {
# $log->tracef('Ready to stream for %s', $sink);
# $src->resume;
# })->on_fail(sub {
# $src->completed->fail(@_) unless $sink->completed->is_ready;
# })->on_cancel(sub {
# $src->completed->cancel unless $sink->completed->is_ready;
# });
#}
sub streaming_input { shift->{streaming_input} // die '->in has not yet been called' }
sub finish {
my ($self) = @_;
if($self->{in}) {
$self->input_stream->done;
} else {
$self->input_stream->cancel;
}
}
( run in 0.228 second using v1.01-cache-2.11-cpan-4d50c553e7e )