AnyMQ-Pg
view release on metacpan or search on metacpan
lib/AnyMQ/Trait/Pg.pm view on Meta::CPAN
);
sub BUILD {
my ($self) = @_;
# once everything is set up, we can construct and connect our client object
$self->_client;
}
# JSON codec pack
sub _build__json {
my ($self) = @_;
return JSON->new->utf8;
}
sub _build_client {
my ($self) = @_;
my $dsn = $self->dsn;
my $pg = AnyEvent::Pg->new(
$dsn,
on_connect => sub { $self->_on_connect(@_) },
on_connect_error => sub { $self->_on_connect_error(@_) },
on_error => sub { $self->_on_error(@_) },
on_notify => sub { $self->_on_notify(@_) },
);
return $pg;
}
sub listen {
my ($self, $channel, %query_opts) = @_;
$self->add_channel($channel);
return unless $self->is_connected;
$self->_push_listen($channel, %query_opts);
}
sub _push_listen {
my ($self, $channel, %query_opts) = @_;
return $self->_push_notif_command('LISTEN', $channel, %query_opts);
}
sub unlisten {
my ($self, $channel, %query_opts) = @_;
return $self->_push_notif_command('UNLISTEN', $channel, %query_opts);
}
# publishes notification with $payload on channel
sub notify {
my ($self, @rest) = @_;
my ($channel, $payload, %query_opts) = @rest;
unless ($self->is_connected) {
$self->publish_queue_push(\@rest);
return;
}
my $query = 'NOTIFY "' . $self->_client->dbc->escapeString($channel) . '"';
$query = join(',', $query, $self->_client->dbc->escapeLiteral($payload)) if $payload;
warn $query if $self->debug;
my $qw = $self->_client->push_query(query => $query, %query_opts);
$self->_pg_query_watcher_push($qw);
}
# handles LISTEN/UNLISTEN
sub _push_notif_command {
my ($self, $cmd, $channel, %opts) = @_;
my $query = $cmd . ' "' . $self->_client->dbc->escapeString($channel) . '"';
my $qw = $self->_client->push_query(
query => $query,
%opts
);
warn $query if $self->debug;
$self->_pg_query_watcher_push($qw);
return $qw;
}
sub encode_event {
my ($self, $evt) = @_;
return $evt unless ref $evt;
# encode refs with JSON
return $self->_json->encode($evt);
}
sub _on_connect {
my $self = shift;
$self->is_connected(1);
# subscribe to channels
if ($self->all_channels) {
$self->_push_listen($_) for $self->all_channels;
}
# publish outstanding notifs
my $pub_queue = $self->publish_queue;
if ($pub_queue) {
foreach my $evt (@$pub_queue) {
$self->notify(@$evt);
}
}
# time to call our connect callback
$self->on_connect->($self, @_) if $self->on_connect;
}
sub _on_connect_error {
my ($self, @rest) = @_;
$self->is_connected(0);
$self->_on_error(@rest);
}
sub _on_error {
my $self = shift;
my ($pg) = @_;
my $err = $pg->dbc->errorMessage;
if ($self->on_error) {
$self->on_error->($self, $err);
} else {
warn "AnyMQ::Pg error: $err";
}
}
( run in 1.096 second using v1.01-cache-2.11-cpan-5735350b133 )