AnyEvent-Pg
view release on metacpan or search on metacpan
lib/AnyEvent/Pg.pm view on Meta::CPAN
my $next_seq = 1;
sub new {
my ($class, $conninfo, %opts) = @_;
my $on_connect = delete $opts{on_connect};
my $on_connect_error = delete $opts{on_connect_error};
my $on_empty_queue = delete $opts{on_empty_queue};
my $on_notify = delete $opts{on_notify};
my $on_error = delete $opts{on_error};
my $timeout = delete $opts{timeout};
my $seq = delete($opts{seq}) // ($next_seq++);
%opts and croak "unknown option(s) ".join(", ", keys %opts)." found";
my $dbc = Pg::PQ::Conn->start($conninfo);
# $dbc->trace(\*STDERR);
# FIXME: $dbc may be undef
my $self = { state => 'connecting',
dbc => $dbc,
on_connect => $on_connect,
on_connect_error => $on_connect_error,
on_error => $on_error,
on_empty_queue => $on_empty_queue,
on_notify => $on_notify,
queries => [],
timeout => $timeout,
seq => $seq,
call_on_empty_queue => 1,
};
bless $self, $class;
&AE::postpone(weak_method_callback($self, '_connectPoll'));
$self;
}
sub dbc { shift->{dbc} }
sub _connectPoll {
my $self = shift;
my $dbc = $self->{dbc};
my $fd = $self->{fd};
$debug and $debug & 1 and $self->_debug("enter");
my ($r, $goto, $rw, $ww);
if (defined $fd) {
$r = $dbc->connectPoll;
}
else {
$fd = $self->{fd} = $dbc->socket;
if ($fd < 0) {
$debug and $debug & 1 and $self->_debug("error");
$self->_on_connect_error;
return;
}
$r = PGRES_POLLING_WRITING;
}
$debug and $debug & 1 and $self->_debug("wants to: $r");
if ($r == PGRES_POLLING_READING) {
$rw = $self->{read_watcher} // AE::io $fd, 0, weak_method_callback_cached($self, '_connectPoll');
# say "fd: $fd, read_watcher: $rw";
}
elsif ($r == PGRES_POLLING_WRITING) {
$ww = $self->{write_watcher} // AE::io $fd, 1, weak_method_callback_cached($self, '_connectPoll');
# say "fd: $fd, write_watcher: $ww";
}
elsif ($r == PGRES_POLLING_FAILED) {
$goto = '_on_connect_error';
}
elsif ($r == PGRES_POLLING_OK or
$r == PGRES_POLLING_ACTIVE) {
$goto = '_on_connect';
}
$self->{read_watcher} = $rw;
$self->{write_watcher} = $ww;
# warn "read_watcher: $rw, write_watcher: $ww";
if ($goto) {
delete $self->{timeout_watcher};
$debug and $debug & 1 and $self->_debug("goto $goto");
$self->$goto;
}
elsif ($self->{timeout}) {
$self->{timeout_watcher} = AE::timer $self->{timeout}, 0, weak_method_callback_cached($self, '_connectPollTimeout');
}
}
sub _connectPollTimeout {
my $self = shift;
$debug and $debug & 2 and $self->_debug("_connectPoll timed out");
delete @{$self}{qw(timeout_watcher read_watcher write_watcher)};
$self->{timedout} = 1;
$self->_on_connect_error;
}
sub _maybe_callback {
my $self = shift;
my $obj = (ref $_[0] ? shift : $self);
my $cb = shift;
my $sub = $obj->{$cb};
if (defined $sub and not $obj->{canceled}) {
if ($debug & 2) {
local ($@, $ENV{__DIE__});
my $name = eval {
require Devel::Peek;
Devel::Peek::CvGV($sub)
} // 'unknown';
$self->_debug("calling $cb as $sub ($name)");
}
$sub->($self, @_);
}
else {
$debug and $debug & 1 and $self->_debug("no callback for $cb");
}
}
sub _on_connect {
my $self = shift;
my $dbc = $self->{dbc};
$dbc->nonBlocking(1);
$self->{state} = 'connected';
$debug and $debug & 2 and $self->_debug('connected to database');
$self->{read_watcher} = AE::io $self->{fd}, 0, weak_method_callback_cached($self, '_on_consume_input');
$self->_maybe_callback('on_connect');
delete @{$self}{qw(on_connect on_connect_error)};
( run in 1.799 second using v1.01-cache-2.11-cpan-d7a12ab2c7f )