AnyEvent-Pg
view release on metacpan or search on metacpan
lib/AnyEvent/Pg.pm view on Meta::CPAN
on_notify => $on_notify,
queries => [],
timeout => $timeout,
seq => $seq,
call_on_empty_queue => 1,
};
bless $self, $class;
&AE::postpone(weak_method_callback($self, '_connectPoll'));
$self;
}
sub dbc { shift->{dbc} }
sub _connectPoll {
my $self = shift;
my $dbc = $self->{dbc};
my $fd = $self->{fd};
$debug and $debug & 1 and $self->_debug("enter");
my ($r, $goto, $rw, $ww);
if (defined $fd) {
$r = $dbc->connectPoll;
}
else {
$fd = $self->{fd} = $dbc->socket;
if ($fd < 0) {
$debug and $debug & 1 and $self->_debug("error");
$self->_on_connect_error;
return;
}
$r = PGRES_POLLING_WRITING;
}
$debug and $debug & 1 and $self->_debug("wants to: $r");
if ($r == PGRES_POLLING_READING) {
$rw = $self->{read_watcher} // AE::io $fd, 0, weak_method_callback_cached($self, '_connectPoll');
# say "fd: $fd, read_watcher: $rw";
}
elsif ($r == PGRES_POLLING_WRITING) {
$ww = $self->{write_watcher} // AE::io $fd, 1, weak_method_callback_cached($self, '_connectPoll');
# say "fd: $fd, write_watcher: $ww";
}
elsif ($r == PGRES_POLLING_FAILED) {
$goto = '_on_connect_error';
}
elsif ($r == PGRES_POLLING_OK or
$r == PGRES_POLLING_ACTIVE) {
$goto = '_on_connect';
}
$self->{read_watcher} = $rw;
$self->{write_watcher} = $ww;
# warn "read_watcher: $rw, write_watcher: $ww";
if ($goto) {
delete $self->{timeout_watcher};
$debug and $debug & 1 and $self->_debug("goto $goto");
$self->$goto;
}
elsif ($self->{timeout}) {
$self->{timeout_watcher} = AE::timer $self->{timeout}, 0, weak_method_callback_cached($self, '_connectPollTimeout');
}
}
sub _connectPollTimeout {
my $self = shift;
$debug and $debug & 2 and $self->_debug("_connectPoll timed out");
delete @{$self}{qw(timeout_watcher read_watcher write_watcher)};
$self->{timedout} = 1;
$self->_on_connect_error;
}
sub _maybe_callback {
my $self = shift;
my $obj = (ref $_[0] ? shift : $self);
my $cb = shift;
my $sub = $obj->{$cb};
if (defined $sub and not $obj->{canceled}) {
if ($debug & 2) {
local ($@, $ENV{__DIE__});
my $name = eval {
require Devel::Peek;
Devel::Peek::CvGV($sub)
} // 'unknown';
$self->_debug("calling $cb as $sub ($name)");
}
$sub->($self, @_);
}
else {
$debug and $debug & 1 and $self->_debug("no callback for $cb");
}
}
sub _on_connect {
my $self = shift;
my $dbc = $self->{dbc};
$dbc->nonBlocking(1);
$self->{state} = 'connected';
$debug and $debug & 2 and $self->_debug('connected to database');
$self->{read_watcher} = AE::io $self->{fd}, 0, weak_method_callback_cached($self, '_on_consume_input');
$self->_maybe_callback('on_connect');
delete @{$self}{qw(on_connect on_connect_error)};
$self->_on_push_query;
}
sub _on_connect_error {
my $self = shift;
$debug and $debug & 2 and $self->_debug("connection failed");
$self->_maybe_callback('on_connect_error');
delete @{$self}{qw(on_connect on_connect_error)};
$self->_on_fatal_error;
}
sub abort_all { shift->_on_fatal_error }
sub finish {
my $self = shift;
$self->_on_fatal_error;
}
sub _on_fatal_error {
lib/AnyEvent/Pg.pm view on Meta::CPAN
}
%opts and croak "unsupported option(s) ".join(", ", keys %opts);
my $query = \%query;
if ($unshift) {
unshift @{$self->{queries}}, $query;
}
else {
push @{$self->{queries}}, $query;
}
$self->{call_on_empty_queue} = 1;
$self->{current_query} or &AE::postpone(weak_method_callback_cached($self, '_on_postponed_push_query'));
AnyEvent::Pg::Watcher->_new($query);
}
sub _on_postponed_push_query {
my $self = shift;
$debug and $debug & 4 and $self->_debug("postponed call to _on_push_query");
$self->_on_push_query
}
sub queue_size {
my $self = shift;
my $size = @{$self->{queries}};
$size++ if $self->{current_query};
$size
}
sub push_query { shift->_push_query(_type => 'query', @_) }
sub push_query_prepared { shift->_push_query(_type => 'query_prepared', @_) }
sub push_prepare { shift->_push_query(_type => 'prepare', @_) }
sub unshift_query { shift->_push_query(_type => 'query', _unshift => 1, @_) }
sub unshift_query_prepared { shift->_push_query(_type => 'query_prepared', _unshift => 1, @_) }
sub last_query_start_time { shift->{query_start_time} }
sub _on_push_query {
my $self = shift;
$debug and $debug & 4 and $self->_debug("_on_push_query");
if ($self->{current_query}) {
$debug and $debug & 2 and $self->_debug("there is already a query being processed ($self->{current_query})");
}
else {
my $queries = $self->{queries};
if ($self->{state} eq 'connected') {
while (@$queries) {
if ($queries->[0]{canceled}) {
$debug and $debug & 2 and $self->_debug("the query at the head of the queue was canceled, looking again!");
shift @$queries;
next;
}
$debug and $debug & 1 and $self->_debug("want to write query");
$self->{write_watcher} = AE::io $self->{fd}, 1, weak_method_callback_cached($self, '_on_push_query_writable');
$self->{timeout_watcher} = AE::timer $self->{timeout}, 0, weak_method_callback_cached($self, '_on_timeout')
if $self->{timeout};
return;
}
if (delete $self->{call_on_empty_queue}) {
# This sub may be called repeatly from calls stacked by
# AE::postponed, so we don't call the 'on_empty_queue'
# callback unless this (ugly) flag is set
$self->_maybe_callback('on_empty_queue');
}
else {
$debug and $debug & 1 and $self->_debug("skipping on_empty_queue callback");
}
}
elsif ($self->{state} eq 'failed') {
$debug and $debug & 1 and $self->_debug("calling on_error queries because we are in state failed");
$self->_maybe_callback($_, 'on_error') for @$queries;
@$queries = ();
}
else {
$debug and $debug & 1 and $self->_debug("not processing queued queries because we are in state $self->{state}");
# else, do nothing
}
}
}
my %send_type2method = (query => 'sendQuery',
query_prepared => 'sendQueryPrepared',
prepare => 'sendPrepare' );
sub _on_push_query_writable {
my $self = shift;
$debug and $debug & 1 and $self->_debug("can write");
# warn "_on_push_query_writable";
undef $self->{write_watcher};
undef $self->{timeout_watcher};
$self->{current_query} and die "Internal error: _on_push_query_writable called when there is already a current query";
my $dbc = $self->{dbc};
my $query = shift @{$self->{queries}};
# warn "sendQuery('" . join("', '", @query) . "')";
my $method = $send_type2method{$query->{type}} //
die "internal error: no method defined for push type $query->{type}";
if ($debug and $debug & 1) {
my $args = "'" . join("', '", @{$query->{args}}) . "'";
$self->_debug("calling $method($args)");
}
$self->{query_start_time} = AE::now;
if ($dbc->$method(@{$query->{args}})) {
$self->{current_query} = $query;
$self->_on_push_query_flushable;
}
else {
$debug and $debug & 1 and $self->_debug("$method failed: ". $dbc->errorMessage);
$self->_maybe_callback('on_error');
# FIXME: this is broken in some way, sanitize it!
# FIXME: check if the error is recoverable or fatal before continuing...
$self->_on_push_query
}
}
sub _on_push_query_flushable {
my $self = shift;
my $dbc = $self->{dbc};
my $ww = delete $self->{write_watcher};
undef $self->{timeout_watcher};
$debug and $debug & 1 and $self->_debug("flushing");
my $flush = $dbc->flush;
if ($flush == -1) {
$self->_on_fatal_error;
}
elsif ($flush == 0) {
$debug and $debug & 1 and $self->_debug("flushed");
$self->_on_consume_input;
}
elsif ($flush == 1) {
$debug and $debug & 1 and $self->_debug("wants to write");
$self->{write_watcher} = $ww // AE::io $self->{fd}, 1, weak_method_callback_cached($self, '_on_push_query_flushable');
$self->{timeout_watcher} = AE::timer $self->{timeout}, 0, weak_method_callback_cached($self, '_on_timeout')
if $self->{timeout};
}
else {
die "internal error: flush returned $flush";
}
}
sub _on_consume_input {
my $self = shift;
my $dbc = $self->{dbc};
undef $self->{timeout_watcher};
$debug and $debug & 1 and $self->_debug("looking for data");
unless ($dbc->consumeInput) {
$debug and $debug & 1 and $self->_debug("consumeInput failed");
return $self->_on_fatal_error;
}
$debug and $debug & 2 and $self->_debug("looking for notifications");
while (my @notify = $dbc->notifies) {
$debug and $debug & 2 and $self->_debug("notify recived: @notify");
$self->_maybe_callback(on_notify => @notify);
}
if (defined (my $cq = $self->{current_query})) {
while (1) {
if ($self->{write_watcher} or $dbc->busy) {
$debug and $debug & 1 and $self->_debug($self->{write_watcher}
? "wants to write and read"
: "wants to read");
$self->{timeout_watcher} = AE::timer $self->{timeout}, 0, weak_method_callback_cached($self, '_on_timeout')
if $self->{timeout};
return;
}
else {
$debug and $debug & 1 and $self->_debug("data available");
my $result = $dbc->result;
if ($result) {
if ($debug and $debug & 2) {
my $status = $result->status // '<undef>';
my $conn_status = $dbc->status // '<undef>';
my $cmdRows = $result->cmdRows // '<undef>';
my $rows = $result->rows // '<undef>';
my $cols = $result->columns // '<undef>';
my $sqlstate = $result->errorField('sqlstate') // '<undef>';
$self->_debug("calling on_result status: $status, sqlstate: $sqlstate, conn status: $conn_status, cmdRows: $cmdRows, columns: $cols, rows: $rows");
}
$self->_maybe_callback($cq, 'on_result', $result);
}
else {
$debug and $debug & 2 and $self->_debug("calling on_done");
$self->_maybe_callback($cq, 'on_done');
undef $self->{current_query};
$self->_on_push_query;
return;
}
}
}
}
}
sub _on_timeout {
my $self = shift;
$debug and $debug & 2 and $self->_debug("operation timed out");
# _on_fatal_error already deletes watchers
# delete @{$self}{qw(read_watcher write_watcher timeout_watcher)};
$self->{timedout} = 1;
$self->_on_fatal_error
}
sub destroy {
my $self = shift;
%$self = ();
}
package AnyEvent::Pg::Watcher;
sub _new {
my ($class, $query) = @_;
my $self = \$query;
bless $self, $class;
}
sub DESTROY {
# cancel query
my $query = ${shift()};
delete @{$query}{qw(on_error on_result on_done)};
$query->{canceled} = 1;
}
( run in 0.770 second using v1.01-cache-2.11-cpan-39bf76dae61 )