AnyEvent-Pg
view release on metacpan - search on metacpan
view release on metacpan or search on metacpan
lib/AnyEvent/Pg.pm view on Meta::CPAN
on_notify => $on_notify,
queries => [],
timeout => $timeout,
seq => $seq,
call_on_empty_queue => 1,
};
bless $self, $class;
&AE::postpone(weak_method_callback($self, '_connectPoll'));
$self;
}
sub dbc { shift->{dbc} }
sub _connectPoll {
my $self = shift;
my $dbc = $self->{dbc};
my $fd = $self->{fd};
$debug and $debug & 1 and $self->_debug("enter");
my ($r, $goto, $rw, $ww);
if (defined $fd) {
$r = $dbc->connectPoll;
}
else {
$fd = $self->{fd} = $dbc->socket;
if ($fd < 0) {
$debug and $debug & 1 and $self->_debug("error");
$self->_on_connect_error;
return;
}
$r = PGRES_POLLING_WRITING;
}
$debug and $debug & 1 and $self->_debug("wants to: $r");
if ($r == PGRES_POLLING_READING) {
$rw = $self->{read_watcher} // AE::io $fd, 0, weak_method_callback_cached($self, '_connectPoll');
# say "fd: $fd, read_watcher: $rw";
}
elsif ($r == PGRES_POLLING_WRITING) {
$ww = $self->{write_watcher} // AE::io $fd, 1, weak_method_callback_cached($self, '_connectPoll');
# say "fd: $fd, write_watcher: $ww";
}
elsif ($r == PGRES_POLLING_FAILED) {
$goto = '_on_connect_error';
}
elsif ($r == PGRES_POLLING_OK or
$r == PGRES_POLLING_ACTIVE) {
$goto = '_on_connect';
}
$self->{read_watcher} = $rw;
$self->{write_watcher} = $ww;
# warn "read_watcher: $rw, write_watcher: $ww";
if ($goto) {
delete $self->{timeout_watcher};
$debug and $debug & 1 and $self->_debug("goto $goto");
$self->$goto;
}
elsif ($self->{timeout}) {
$self->{timeout_watcher} = AE::timer $self->{timeout}, 0, weak_method_callback_cached($self, '_connectPollTimeout');
}
}
sub _connectPollTimeout {
my $self = shift;
$debug and $debug & 2 and $self->_debug("_connectPoll timed out");
delete @{$self}{qw(timeout_watcher read_watcher write_watcher)};
$self->{timedout} = 1;
$self->_on_connect_error;
}
sub _maybe_callback {
my $self = shift;
my $obj = (ref $_[0] ? shift : $self);
my $cb = shift;
my $sub = $obj->{$cb};
if (defined $sub and not $obj->{canceled}) {
if ($debug & 2) {
local ($@, $ENV{__DIE__});
my $name = eval {
require Devel::Peek;
Devel::Peek::CvGV($sub)
} // 'unknown';
$self->_debug("calling $cb as $sub ($name)");
}
$sub->($self, @_);
}
else {
$debug and $debug & 1 and $self->_debug("no callback for $cb");
}
}
sub _on_connect {
my $self = shift;
my $dbc = $self->{dbc};
$dbc->nonBlocking(1);
$self->{state} = 'connected';
$debug and $debug & 2 and $self->_debug('connected to database');
$self->{read_watcher} = AE::io $self->{fd}, 0, weak_method_callback_cached($self, '_on_consume_input');
$self->_maybe_callback('on_connect');
delete @{$self}{qw(on_connect on_connect_error)};
$self->_on_push_query;
}
sub _on_connect_error {
my $self = shift;
$debug and $debug & 2 and $self->_debug("connection failed");
$self->_maybe_callback('on_connect_error');
delete @{$self}{qw(on_connect on_connect_error)};
$self->_on_fatal_error;
}
sub abort_all { shift->_on_fatal_error }
sub finish {
my $self = shift;
$self->_on_fatal_error;
}
sub _on_fatal_error {
my $self = shift;
$self->{state} = 'failed';
delete @{$self}{qw(write_watcher read_watcher timeout_watcher
on_connect on_connect_error on_empty_query)};
view all matches for this distributionview release on metacpan - search on metacpan
( run in 0.664 second using v1.00-cache-2.02-grep-82fe00e-cpan-cec75d87357c )