AnyEvent-Pg
view release on metacpan or search on metacpan
lib/AnyEvent/Pg.pm view on Meta::CPAN
$self->{query_start_time} = AE::now;
if ($dbc->$method(@{$query->{args}})) {
$self->{current_query} = $query;
$self->_on_push_query_flushable;
}
else {
$debug and $debug & 1 and $self->_debug("$method failed: ". $dbc->errorMessage);
$self->_maybe_callback('on_error');
# FIXME: this is broken in some way, sanitize it!
# FIXME: check if the error is recoverable or fatal before continuing...
$self->_on_push_query
}
}
sub _on_push_query_flushable {
my $self = shift;
my $dbc = $self->{dbc};
my $ww = delete $self->{write_watcher};
undef $self->{timeout_watcher};
$debug and $debug & 1 and $self->_debug("flushing");
my $flush = $dbc->flush;
if ($flush == -1) {
$self->_on_fatal_error;
}
elsif ($flush == 0) {
$debug and $debug & 1 and $self->_debug("flushed");
$self->_on_consume_input;
}
elsif ($flush == 1) {
$debug and $debug & 1 and $self->_debug("wants to write");
$self->{write_watcher} = $ww // AE::io $self->{fd}, 1, weak_method_callback_cached($self, '_on_push_query_flushable');
$self->{timeout_watcher} = AE::timer $self->{timeout}, 0, weak_method_callback_cached($self, '_on_timeout')
if $self->{timeout};
}
else {
die "internal error: flush returned $flush";
}
}
sub _on_consume_input {
my $self = shift;
my $dbc = $self->{dbc};
undef $self->{timeout_watcher};
$debug and $debug & 1 and $self->_debug("looking for data");
unless ($dbc->consumeInput) {
$debug and $debug & 1 and $self->_debug("consumeInput failed");
return $self->_on_fatal_error;
}
$debug and $debug & 2 and $self->_debug("looking for notifications");
while (my @notify = $dbc->notifies) {
$debug and $debug & 2 and $self->_debug("notify recived: @notify");
$self->_maybe_callback(on_notify => @notify);
}
if (defined (my $cq = $self->{current_query})) {
while (1) {
if ($self->{write_watcher} or $dbc->busy) {
$debug and $debug & 1 and $self->_debug($self->{write_watcher}
? "wants to write and read"
: "wants to read");
$self->{timeout_watcher} = AE::timer $self->{timeout}, 0, weak_method_callback_cached($self, '_on_timeout')
if $self->{timeout};
return;
}
else {
$debug and $debug & 1 and $self->_debug("data available");
my $result = $dbc->result;
if ($result) {
if ($debug and $debug & 2) {
my $status = $result->status // '<undef>';
my $conn_status = $dbc->status // '<undef>';
my $cmdRows = $result->cmdRows // '<undef>';
my $rows = $result->rows // '<undef>';
my $cols = $result->columns // '<undef>';
my $sqlstate = $result->errorField('sqlstate') // '<undef>';
$self->_debug("calling on_result status: $status, sqlstate: $sqlstate, conn status: $conn_status, cmdRows: $cmdRows, columns: $cols, rows: $rows");
}
$self->_maybe_callback($cq, 'on_result', $result);
}
else {
$debug and $debug & 2 and $self->_debug("calling on_done");
$self->_maybe_callback($cq, 'on_done');
undef $self->{current_query};
$self->_on_push_query;
return;
}
}
}
}
}
sub _on_timeout {
my $self = shift;
$debug and $debug & 2 and $self->_debug("operation timed out");
# _on_fatal_error already deletes watchers
# delete @{$self}{qw(read_watcher write_watcher timeout_watcher)};
$self->{timedout} = 1;
$self->_on_fatal_error
}
sub destroy {
my $self = shift;
%$self = ();
}
package AnyEvent::Pg::Watcher;
sub _new {
my ($class, $query) = @_;
my $self = \$query;
bless $self, $class;
}
sub DESTROY {
# cancel query
my $query = ${shift()};
( run in 0.704 second using v1.01-cache-2.11-cpan-39bf76dae61 )