AnyEvent-Pg

 view release on metacpan or  search on metacpan

lib/AnyEvent/Pg.pm  view on Meta::CPAN

                 on_notify => $on_notify,
                 queries => [],
                 timeout => $timeout,
                 seq => $seq,
                 call_on_empty_queue => 1,
               };
    bless $self, $class;
    &AE::postpone(weak_method_callback($self, '_connectPoll'));
    $self;
}

sub dbc { shift->{dbc} }

sub _connectPoll {
    my $self = shift;
    my $dbc = $self->{dbc};
    my $fd = $self->{fd};

    $debug and $debug & 1 and $self->_debug("enter");

    my ($r, $goto, $rw, $ww);
    if (defined $fd) {
        $r = $dbc->connectPoll;
    }
    else {
        $fd = $self->{fd} = $dbc->socket;
        if ($fd < 0) {
            $debug and $debug & 1 and $self->_debug("error");
            $self->_on_connect_error;
            return;
        }
        $r = PGRES_POLLING_WRITING;
    }

    $debug and $debug & 1 and $self->_debug("wants to: $r");
    if    ($r == PGRES_POLLING_READING) {
        $rw = $self->{read_watcher} // AE::io $fd, 0, weak_method_callback_cached($self, '_connectPoll');
        # say "fd: $fd, read_watcher: $rw";
    }
    elsif ($r == PGRES_POLLING_WRITING) {
        $ww = $self->{write_watcher} // AE::io $fd, 1, weak_method_callback_cached($self, '_connectPoll');
        # say "fd: $fd, write_watcher: $ww";
    }
    elsif ($r == PGRES_POLLING_FAILED) {
        $goto = '_on_connect_error';
    }
    elsif ($r == PGRES_POLLING_OK or
           $r == PGRES_POLLING_ACTIVE) {
        $goto = '_on_connect';
    }
    $self->{read_watcher} = $rw;
    $self->{write_watcher} = $ww;
    # warn "read_watcher: $rw, write_watcher: $ww";

    if ($goto) {
        delete $self->{timeout_watcher};
        $debug and $debug & 1 and $self->_debug("goto $goto");
        $self->$goto;
    }
    elsif ($self->{timeout}) {
        $self->{timeout_watcher} = AE::timer $self->{timeout}, 0, weak_method_callback_cached($self, '_connectPollTimeout');
    }
}

sub _connectPollTimeout {
    my $self = shift;
    $debug and $debug & 2 and $self->_debug("_connectPoll timed out");
    delete @{$self}{qw(timeout_watcher read_watcher write_watcher)};
    $self->{timedout} = 1;
    $self->_on_connect_error;
}

sub _maybe_callback {
    my $self = shift;
    my $obj = (ref $_[0] ? shift : $self);
    my $cb = shift;
    my $sub = $obj->{$cb};
    if (defined $sub and not $obj->{canceled}) {
        if ($debug & 2) {
            local ($@, $ENV{__DIE__});
            my $name = eval {
                require Devel::Peek;
                Devel::Peek::CvGV($sub)
                } // 'unknown';
            $self->_debug("calling $cb as $sub ($name)");
        }
        $sub->($self, @_);
    }
    else {
        $debug and $debug & 1 and $self->_debug("no callback for $cb");
    }
}

sub _on_connect {
    my $self = shift;
    my $dbc = $self->{dbc};
    $dbc->nonBlocking(1);
    $self->{state} = 'connected';
    $debug and $debug & 2 and $self->_debug('connected to database');
    $self->{read_watcher} = AE::io $self->{fd}, 0, weak_method_callback_cached($self, '_on_consume_input');
    $self->_maybe_callback('on_connect');
    delete @{$self}{qw(on_connect on_connect_error)};
    $self->_on_push_query;
}

sub _on_connect_error {
    my $self = shift;
    $debug and $debug & 2 and $self->_debug("connection failed");
    $self->_maybe_callback('on_connect_error');
    delete @{$self}{qw(on_connect on_connect_error)};
    $self->_on_fatal_error;
}

sub abort_all { shift->_on_fatal_error }

sub finish {
    my $self = shift;
    $self->_on_fatal_error;
}

sub _on_fatal_error {

lib/AnyEvent/Pg.pm  view on Meta::CPAN

    }
    %opts and croak "unsupported option(s) ".join(", ", keys %opts);

    my $query = \%query;
    if ($unshift) {
        unshift @{$self->{queries}}, $query;
    }
    else {
        push @{$self->{queries}}, $query;
    }

    $self->{call_on_empty_queue} = 1;

    $self->{current_query} or &AE::postpone(weak_method_callback_cached($self, '_on_postponed_push_query'));

    AnyEvent::Pg::Watcher->_new($query);
}

sub _on_postponed_push_query {
    my $self = shift;
    $debug and $debug & 4 and $self->_debug("postponed call to _on_push_query");
    $self->_on_push_query
}

sub queue_size {
    my $self = shift;
    my $size = @{$self->{queries}};
    $size++ if $self->{current_query};
    $size
}

sub push_query { shift->_push_query(_type => 'query', @_) }

sub push_query_prepared { shift->_push_query(_type => 'query_prepared', @_) }

sub push_prepare { shift->_push_query(_type => 'prepare', @_) }

sub unshift_query { shift->_push_query(_type => 'query', _unshift => 1, @_) }

sub unshift_query_prepared { shift->_push_query(_type => 'query_prepared', _unshift => 1, @_) }

sub last_query_start_time { shift->{query_start_time} }

sub _on_push_query {
    my $self = shift;
    $debug and $debug & 4 and $self->_debug("_on_push_query");
    if ($self->{current_query}) {
        $debug and $debug & 2 and $self->_debug("there is already a query being processed ($self->{current_query})");
    }
    else {
        my $queries = $self->{queries};
        if ($self->{state} eq 'connected') {
            while (@$queries) {
                if ($queries->[0]{canceled}) {
                    $debug and $debug & 2 and $self->_debug("the query at the head of the queue was canceled, looking again!");
                    shift @$queries;
                    next;
                }
                $debug and $debug & 1 and $self->_debug("want to write query");
                $self->{write_watcher} = AE::io $self->{fd}, 1, weak_method_callback_cached($self, '_on_push_query_writable');
                $self->{timeout_watcher} = AE::timer $self->{timeout}, 0, weak_method_callback_cached($self, '_on_timeout')
                    if $self->{timeout};
                return;
            }

            if (delete $self->{call_on_empty_queue}) {
                # This sub may be called repeatly from calls stacked by
                # AE::postponed, so we don't call the 'on_empty_queue'
                # callback unless this (ugly) flag is set
                $self->_maybe_callback('on_empty_queue');
            }
            else {
                $debug and $debug & 1 and $self->_debug("skipping on_empty_queue callback");
            }
        }
        elsif ($self->{state} eq 'failed') {
            $debug and $debug & 1 and $self->_debug("calling on_error queries because we are in state failed");
            $self->_maybe_callback($_, 'on_error') for @$queries;
            @$queries = ();
        }
        else {
            $debug and $debug & 1 and $self->_debug("not processing queued queries because we are in state $self->{state}");
            # else, do nothing
        }
    }
}

my %send_type2method = (query => 'sendQuery',
                        query_prepared => 'sendQueryPrepared',
                        prepare => 'sendPrepare' );

sub _on_push_query_writable {
    my $self = shift;
    $debug and $debug & 1 and $self->_debug("can write");
    # warn "_on_push_query_writable";
    undef $self->{write_watcher};
    undef $self->{timeout_watcher};
    $self->{current_query} and die "Internal error: _on_push_query_writable called when there is already a current query";
    my $dbc = $self->{dbc};
    my $query = shift @{$self->{queries}};
    # warn "sendQuery('" . join("', '", @query) . "')";
    my $method = $send_type2method{$query->{type}} //
        die "internal error: no method defined for push type $query->{type}";
    if ($debug and $debug & 1) {
        my $args = "'" . join("', '", @{$query->{args}}) . "'";
        $self->_debug("calling $method($args)");
    }
    $self->{query_start_time} = AE::now;
    if ($dbc->$method(@{$query->{args}})) {
        $self->{current_query} = $query;
        $self->_on_push_query_flushable;
    }
    else {
        $debug and $debug & 1 and $self->_debug("$method failed: ". $dbc->errorMessage);
        $self->_maybe_callback('on_error');
        # FIXME: this is broken in some way, sanitize it!
        # FIXME: check if the error is recoverable or fatal before continuing...
        $self->_on_push_query
    }
}

sub _on_push_query_flushable {
    my $self = shift;
    my $dbc = $self->{dbc};
    my $ww = delete $self->{write_watcher};
    undef $self->{timeout_watcher};

    $debug and $debug & 1 and $self->_debug("flushing");
    my $flush = $dbc->flush;
    if   ($flush == -1) {
        $self->_on_fatal_error;
    }
    elsif ($flush == 0) {
        $debug and $debug & 1 and $self->_debug("flushed");
        $self->_on_consume_input;
    }
    elsif ($flush == 1) {
        $debug and $debug & 1 and $self->_debug("wants to write");
        $self->{write_watcher} = $ww // AE::io $self->{fd}, 1, weak_method_callback_cached($self, '_on_push_query_flushable');
        $self->{timeout_watcher} = AE::timer $self->{timeout}, 0, weak_method_callback_cached($self, '_on_timeout')
            if $self->{timeout};
    }
    else {
        die "internal error: flush returned $flush";
    }
}

sub _on_consume_input {
    my $self = shift;
    my $dbc = $self->{dbc};

    undef $self->{timeout_watcher};

    $debug and $debug & 1 and $self->_debug("looking for data");
    unless ($dbc->consumeInput) {
        $debug and $debug & 1 and $self->_debug("consumeInput failed");
        return $self->_on_fatal_error;
    }

    $debug and $debug & 2 and $self->_debug("looking for notifications");
    while (my @notify = $dbc->notifies) {
        $debug and $debug & 2 and $self->_debug("notify recived: @notify");
        $self->_maybe_callback(on_notify => @notify);
    }

    if (defined (my $cq = $self->{current_query})) {
        while (1) {
            if ($self->{write_watcher} or $dbc->busy) {
                $debug and $debug & 1 and $self->_debug($self->{write_watcher}
                                                        ? "wants to write and read"
                                                        : "wants to read");
                $self->{timeout_watcher} = AE::timer $self->{timeout}, 0, weak_method_callback_cached($self, '_on_timeout')
                    if $self->{timeout};
                return;
            }
            else {
                $debug and $debug & 1 and $self->_debug("data available");

                my $result = $dbc->result;
                if ($result) {
                    if ($debug and $debug & 2) {
                        my $status = $result->status // '<undef>';
                        my $conn_status = $dbc->status // '<undef>';
                        my $cmdRows = $result->cmdRows // '<undef>';
                        my $rows = $result->rows // '<undef>';
                        my $cols = $result->columns // '<undef>';
                        my $sqlstate = $result->errorField('sqlstate') // '<undef>';
                        $self->_debug("calling on_result status: $status, sqlstate: $sqlstate, conn status: $conn_status, cmdRows: $cmdRows, columns: $cols, rows: $rows");
                    }
                    $self->_maybe_callback($cq, 'on_result', $result);
                }
                else {
                    $debug and $debug & 2 and $self->_debug("calling on_done");
                    $self->_maybe_callback($cq, 'on_done');
                    undef $self->{current_query};
                    $self->_on_push_query;
                    return;
                }
            }
        }
    }
}

sub _on_timeout {
    my $self = shift;
    $debug and $debug & 2 and $self->_debug("operation timed out");
    # _on_fatal_error already deletes watchers
    # delete @{$self}{qw(read_watcher write_watcher timeout_watcher)};
    $self->{timedout} = 1;
    $self->_on_fatal_error
}

sub destroy {
    my $self = shift;
    %$self = ();
}

package AnyEvent::Pg::Watcher;

sub _new {
    my ($class, $query) = @_;
    my $self = \$query;
    bless $self, $class;
}

sub DESTROY {
    # cancel query
    my $query = ${shift()};
    delete @{$query}{qw(on_error on_result on_done)};
    $query->{canceled} = 1;
}



( run in 0.770 second using v1.01-cache-2.11-cpan-39bf76dae61 )