AnyEvent-Pg

 view release on metacpan or  search on metacpan

lib/AnyEvent/Pg.pm  view on Meta::CPAN

                 on_notify => $on_notify,
                 queries => [],
                 timeout => $timeout,
                 seq => $seq,
                 call_on_empty_queue => 1,
               };
    bless $self, $class;
    &AE::postpone(weak_method_callback($self, '_connectPoll'));
    $self;
}

sub dbc { shift->{dbc} }

sub _connectPoll {
    my $self = shift;
    my $dbc = $self->{dbc};
    my $fd = $self->{fd};

    $debug and $debug & 1 and $self->_debug("enter");

    my ($r, $goto, $rw, $ww);
    if (defined $fd) {
        $r = $dbc->connectPoll;
    }
    else {
        $fd = $self->{fd} = $dbc->socket;
        if ($fd < 0) {
            $debug and $debug & 1 and $self->_debug("error");
            $self->_on_connect_error;
            return;
        }
        $r = PGRES_POLLING_WRITING;
    }

    $debug and $debug & 1 and $self->_debug("wants to: $r");
    if    ($r == PGRES_POLLING_READING) {
        $rw = $self->{read_watcher} // AE::io $fd, 0, weak_method_callback_cached($self, '_connectPoll');
        # say "fd: $fd, read_watcher: $rw";
    }
    elsif ($r == PGRES_POLLING_WRITING) {
        $ww = $self->{write_watcher} // AE::io $fd, 1, weak_method_callback_cached($self, '_connectPoll');
        # say "fd: $fd, write_watcher: $ww";
    }
    elsif ($r == PGRES_POLLING_FAILED) {
        $goto = '_on_connect_error';
    }
    elsif ($r == PGRES_POLLING_OK or
           $r == PGRES_POLLING_ACTIVE) {
        $goto = '_on_connect';
    }
    $self->{read_watcher} = $rw;
    $self->{write_watcher} = $ww;
    # warn "read_watcher: $rw, write_watcher: $ww";

    if ($goto) {
        delete $self->{timeout_watcher};
        $debug and $debug & 1 and $self->_debug("goto $goto");
        $self->$goto;
    }
    elsif ($self->{timeout}) {
        $self->{timeout_watcher} = AE::timer $self->{timeout}, 0, weak_method_callback_cached($self, '_connectPollTimeout');
    }
}

sub _connectPollTimeout {
    my $self = shift;
    $debug and $debug & 2 and $self->_debug("_connectPoll timed out");
    delete @{$self}{qw(timeout_watcher read_watcher write_watcher)};
    $self->{timedout} = 1;
    $self->_on_connect_error;
}

sub _maybe_callback {
    my $self = shift;
    my $obj = (ref $_[0] ? shift : $self);
    my $cb = shift;
    my $sub = $obj->{$cb};
    if (defined $sub and not $obj->{canceled}) {
        if ($debug & 2) {
            local ($@, $ENV{__DIE__});
            my $name = eval {
                require Devel::Peek;
                Devel::Peek::CvGV($sub)
                } // 'unknown';
            $self->_debug("calling $cb as $sub ($name)");
        }
        $sub->($self, @_);
    }
    else {
        $debug and $debug & 1 and $self->_debug("no callback for $cb");
    }
}

sub _on_connect {
    my $self = shift;
    my $dbc = $self->{dbc};
    $dbc->nonBlocking(1);
    $self->{state} = 'connected';
    $debug and $debug & 2 and $self->_debug('connected to database');
    $self->{read_watcher} = AE::io $self->{fd}, 0, weak_method_callback_cached($self, '_on_consume_input');
    $self->_maybe_callback('on_connect');
    delete @{$self}{qw(on_connect on_connect_error)};
    $self->_on_push_query;
}

sub _on_connect_error {
    my $self = shift;
    $debug and $debug & 2 and $self->_debug("connection failed");
    $self->_maybe_callback('on_connect_error');
    delete @{$self}{qw(on_connect on_connect_error)};
    $self->_on_fatal_error;
}

sub abort_all { shift->_on_fatal_error }

sub finish {
    my $self = shift;
    $self->_on_fatal_error;
}

sub _on_fatal_error {
    my $self = shift;
    $self->{state} = 'failed';
    delete @{$self}{qw(write_watcher read_watcher timeout_watcher
                       on_connect on_connect_error on_empty_query)};

 view all matches for this distribution
 view release on metacpan -  search on metacpan

( run in 0.664 second using v1.00-cache-2.02-grep-82fe00e-cpan-cec75d87357c )