Database-Async-Engine-PostgreSQL
view release on metacpan or search on metacpan
lib/Database/Async/Engine/PostgreSQL.pm view on Meta::CPAN
delete $self->{fc};
$log->tracef('Completed query %s with no data', $self->active_query);
# my $query = delete $self->{active_query};
# $query->done if $query;
}),
send_request => $self->$curry::weak(sub {
my ($self, $msg) = @_;
$log->tracef('Send request for %s', $msg);
$self->stream->write($msg);
}),
ready_for_query => $self->$curry::weak(sub {
my ($self, $msg) = @_;
# Flow control is only valid for the current stream, so we discard it here
delete $self->{fc};
$log->tracef('Ready for query, state is %s', $msg->state);
$self->ready_for_query->set_string($msg->state);
$self->db->engine_ready($self) if $self->db;
}),
backend_key_data => $self->$curry::weak(sub {
my ($self, $msg) = @_;
$log->tracef('Backend key data: pid %d, key 0x%08x', $msg->pid, $msg->key);
}),
parse_complete => $self->$curry::weak(sub {
my ($self, $msg) = @_;
$log->tracef('Parsing complete for query %s', $self->active_query);
}),
bind_complete => $self->$curry::weak(sub {
my ($self, $msg) = @_;
$log->tracef('Bind complete for query %s', $self->active_query);
}),
close_complete => $self->$curry::weak(sub {
my ($self, $msg) = @_;
# Flow control is only valid for the current stream, so we discard it here
delete $self->{fc};
$log->tracef('Close complete for query %s', $self->active_query);
}),
empty_query_response => $self->$curry::weak(sub {
my ($self, $msg) = @_;
# Flow control is only valid for the current stream, so we discard it here
delete $self->{fc};
$log->tracef('Query returned no results for %s', $self->active_query);
}),
error_response => $self->$curry::weak(sub {
my ($self, $msg) = @_;
# Flow control is only valid for the current stream, so we discard it here
delete $self->{fc};
if(my $query = delete $self->{active_query}) {
$log->warnf('Query returned error %s for %s', $msg->error, $self->active_query);
my $f = $query->completed;
$f->fail($msg->error) unless $f->is_ready;
} else {
$log->errorf('Received error %s with no active query', $msg->error);
}
}),
copy_in_response => $self->$curry::weak(sub {
my ($self, $msg) = @_;
my $query = $self->active_query;
$log->tracef('Ready to copy data for %s', $query);
my $proto = $self->protocol;
{
my $src = $query->streaming_input;
$src->completed
->on_ready(sub {
my ($f) = @_;
$log->tracef('Sending copy done notification, stream status was %s', $f->state);
$proto->send_message(
'CopyDone',
data => '',
);
$proto->send_message(
'Close',
portal => '',
statement => '',
);
$proto->send_message(
'Sync',
portal => '',
statement => '',
);
});
$src->each(sub {
$log->tracef('Sending %s', $_);
$proto->send_copy_data($_);
});
}
$query->ready_to_stream->done unless $query->ready_to_stream->is_ready;
}),
copy_out_response => $self->$curry::weak(sub {
my ($self, $msg) = @_;
$log->tracef('copy out starts %s', $msg);
# $self->active_query->row([ $msg->fields ]);
}),
copy_data => $self->$curry::weak(sub {
my ($self, $msg) = @_;
$log->tracef('Have copy data %s', $msg);
my $query = $self->active_query or do {
$log->warnf('No active query for copy data');
return;
};
$query->row([ map $self->decode_text($_), @$_ ]) for $msg->rows;
}),
copy_done => $self->$curry::weak(sub {
my ($self, $msg) = @_;
delete $self->{fc};
$log->tracef('Copy done - %s', $msg);
}),
notification_response => $self->$curry::weak(sub {
my ($self, $msg) = @_;
my ($chan, $data) = @{$msg}{qw(channel data)};
$log->tracef('Notification on channel %s containing %s', $chan, $data);
$self->db->notification($self, map $self->decode_text($_), $chan, $data);
}),
sub { $log->errorf('Unknown message %s (type %s)', $_, $_->type) }
);
$pg
}
}
sub stream_from {
my ($self, $src) = @_;
my $proto = $self->proto;
( run in 0.695 second using v1.01-cache-2.11-cpan-39bf76dae61 )