Database-Async-Engine-PostgreSQL

 view release on metacpan or  search on metacpan

lib/Database/Async/Engine/PostgreSQL.pm  view on Meta::CPAN

                    delete $self->{fc};
                    $log->tracef('Completed query %s with no data', $self->active_query);
                    # my $query = delete $self->{active_query};
                    # $query->done if $query;
                }),
                send_request => $self->$curry::weak(sub {
                    my ($self, $msg) = @_;
                    $log->tracef('Send request for %s', $msg);
                    $self->stream->write($msg);
                }),
                ready_for_query => $self->$curry::weak(sub {
                    my ($self, $msg) = @_;
                    # Flow control is only valid for the current stream, so we discard it here
                    delete $self->{fc};
                    $log->tracef('Ready for query, state is %s', $msg->state);
                    $self->ready_for_query->set_string($msg->state);
                    $self->db->engine_ready($self) if $self->db;
                }),
                backend_key_data => $self->$curry::weak(sub {
                    my ($self, $msg) = @_;
                    $log->tracef('Backend key data: pid %d, key 0x%08x', $msg->pid, $msg->key);
                }),
                parse_complete => $self->$curry::weak(sub {
                    my ($self, $msg) = @_;
                    $log->tracef('Parsing complete for query %s', $self->active_query);
                }),
                bind_complete => $self->$curry::weak(sub {
                    my ($self, $msg) = @_;
                    $log->tracef('Bind complete for query %s', $self->active_query);
                }),
                close_complete => $self->$curry::weak(sub {
                    my ($self, $msg) = @_;
                    # Flow control is only valid for the current stream, so we discard it here
                    delete $self->{fc};
                    $log->tracef('Close complete for query %s', $self->active_query);
                }),
                empty_query_response => $self->$curry::weak(sub {
                    my ($self, $msg) = @_;
                    # Flow control is only valid for the current stream, so we discard it here
                    delete $self->{fc};
                    $log->tracef('Query returned no results for %s', $self->active_query);
                }),
                error_response => $self->$curry::weak(sub {
                    my ($self, $msg) = @_;
                    # Flow control is only valid for the current stream, so we discard it here
                    delete $self->{fc};
                    if(my $query = delete $self->{active_query}) {
                        $log->warnf('Query returned error %s for %s', $msg->error, $self->active_query);
                        my $f = $query->completed;
                        $f->fail($msg->error) unless $f->is_ready;
                    } else {
                        $log->errorf('Received error %s with no active query', $msg->error);
                    }
                }),
                copy_in_response => $self->$curry::weak(sub {
                    my ($self, $msg) = @_;
                    my $query = $self->active_query;
                    $log->tracef('Ready to copy data for %s', $query);
                    my $proto = $self->protocol;
                    {
                        my $src = $query->streaming_input;
                        $src->completed
                            ->on_ready(sub {
                                my ($f) = @_;
                                $log->tracef('Sending copy done notification, stream status was %s', $f->state);
                                $proto->send_message(
                                    'CopyDone',
                                    data => '',
                                );
                                $proto->send_message(
                                    'Close',
                                    portal    => '',
                                    statement => '',
                                );
                                $proto->send_message(
                                    'Sync',
                                    portal    => '',
                                    statement => '',
                                );
                            });
                            $src->each(sub {
                                $log->tracef('Sending %s', $_);
                                $proto->send_copy_data($_);
                            });
                    }
                    $query->ready_to_stream->done unless $query->ready_to_stream->is_ready;
                }),
                copy_out_response => $self->$curry::weak(sub {
                    my ($self, $msg) = @_;
                    $log->tracef('copy out starts %s', $msg);
                    # $self->active_query->row([ $msg->fields ]);
                }),
                copy_data => $self->$curry::weak(sub {
                    my ($self, $msg) = @_;
                    $log->tracef('Have copy data %s', $msg);
                    my $query = $self->active_query or do {
                        $log->warnf('No active query for copy data');
                        return;
                    };
                    $query->row([ map $self->decode_text($_), @$_ ]) for $msg->rows;
                }),
                copy_done => $self->$curry::weak(sub {
                    my ($self, $msg) = @_;
                    delete $self->{fc};
                    $log->tracef('Copy done - %s', $msg);
                }),
                notification_response => $self->$curry::weak(sub {
                    my ($self, $msg) = @_;
                    my ($chan, $data) = @{$msg}{qw(channel data)};
                    $log->tracef('Notification on channel %s containing %s', $chan, $data);
                    $self->db->notification($self, map $self->decode_text($_), $chan, $data);
                }),
                sub { $log->errorf('Unknown message %s (type %s)', $_, $_->type) }
            );
        $pg
    }
}

sub stream_from {
    my ($self, $src) = @_;
    my $proto = $self->proto;



( run in 0.695 second using v1.01-cache-2.11-cpan-39bf76dae61 )