AnyEvent-Pg

 view release on metacpan or  search on metacpan

lib/AnyEvent/Pg.pm  view on Meta::CPAN

    $self->{query_start_time} = AE::now;
    if ($dbc->$method(@{$query->{args}})) {
        $self->{current_query} = $query;
        $self->_on_push_query_flushable;
    }
    else {
        $debug and $debug & 1 and $self->_debug("$method failed: ". $dbc->errorMessage);
        $self->_maybe_callback('on_error');
        # FIXME: this is broken in some way, sanitize it!
        # FIXME: check if the error is recoverable or fatal before continuing...
        $self->_on_push_query
    }
}

sub _on_push_query_flushable {
    my $self = shift;
    my $dbc = $self->{dbc};
    my $ww = delete $self->{write_watcher};
    undef $self->{timeout_watcher};

    $debug and $debug & 1 and $self->_debug("flushing");
    my $flush = $dbc->flush;
    if   ($flush == -1) {
        $self->_on_fatal_error;
    }
    elsif ($flush == 0) {
        $debug and $debug & 1 and $self->_debug("flushed");
        $self->_on_consume_input;
    }
    elsif ($flush == 1) {
        $debug and $debug & 1 and $self->_debug("wants to write");
        $self->{write_watcher} = $ww // AE::io $self->{fd}, 1, weak_method_callback_cached($self, '_on_push_query_flushable');
        $self->{timeout_watcher} = AE::timer $self->{timeout}, 0, weak_method_callback_cached($self, '_on_timeout')
            if $self->{timeout};
    }
    else {
        die "internal error: flush returned $flush";
    }
}

sub _on_consume_input {
    my $self = shift;
    my $dbc = $self->{dbc};

    undef $self->{timeout_watcher};

    $debug and $debug & 1 and $self->_debug("looking for data");
    unless ($dbc->consumeInput) {
        $debug and $debug & 1 and $self->_debug("consumeInput failed");
        return $self->_on_fatal_error;
    }

    $debug and $debug & 2 and $self->_debug("looking for notifications");
    while (my @notify = $dbc->notifies) {
        $debug and $debug & 2 and $self->_debug("notify recived: @notify");
        $self->_maybe_callback(on_notify => @notify);
    }

    if (defined (my $cq = $self->{current_query})) {
        while (1) {
            if ($self->{write_watcher} or $dbc->busy) {
                $debug and $debug & 1 and $self->_debug($self->{write_watcher}
                                                        ? "wants to write and read"
                                                        : "wants to read");
                $self->{timeout_watcher} = AE::timer $self->{timeout}, 0, weak_method_callback_cached($self, '_on_timeout')
                    if $self->{timeout};
                return;
            }
            else {
                $debug and $debug & 1 and $self->_debug("data available");

                my $result = $dbc->result;
                if ($result) {
                    if ($debug and $debug & 2) {
                        my $status = $result->status // '<undef>';
                        my $conn_status = $dbc->status // '<undef>';
                        my $cmdRows = $result->cmdRows // '<undef>';
                        my $rows = $result->rows // '<undef>';
                        my $cols = $result->columns // '<undef>';
                        my $sqlstate = $result->errorField('sqlstate') // '<undef>';
                        $self->_debug("calling on_result status: $status, sqlstate: $sqlstate, conn status: $conn_status, cmdRows: $cmdRows, columns: $cols, rows: $rows");
                    }
                    $self->_maybe_callback($cq, 'on_result', $result);
                }
                else {
                    $debug and $debug & 2 and $self->_debug("calling on_done");
                    $self->_maybe_callback($cq, 'on_done');
                    undef $self->{current_query};
                    $self->_on_push_query;
                    return;
                }
            }
        }
    }
}

sub _on_timeout {
    my $self = shift;
    $debug and $debug & 2 and $self->_debug("operation timed out");
    # _on_fatal_error already deletes watchers
    # delete @{$self}{qw(read_watcher write_watcher timeout_watcher)};
    $self->{timedout} = 1;
    $self->_on_fatal_error
}

sub destroy {
    my $self = shift;
    %$self = ();
}

package AnyEvent::Pg::Watcher;

sub _new {
    my ($class, $query) = @_;
    my $self = \$query;
    bless $self, $class;
}

sub DESTROY {
    # cancel query
    my $query = ${shift()};



( run in 0.704 second using v1.01-cache-2.11-cpan-39bf76dae61 )