AnyMQ-Pg

 view release on metacpan or  search on metacpan

lib/AnyMQ/Trait/Pg.pm  view on Meta::CPAN


has 'on_connect' => (
    is => 'ro',
    isa => 'Maybe[CodeRef]',
);

has 'on_error' => (
    is => 'ro',
    isa => 'Maybe[CodeRef]',
);

has 'channels' => (
    is => 'ro',
    isa => 'ArrayRef',
    default => sub { [] },
    traits => [ 'Array' ],
    handles => {
        'add_channel'  => 'push',
        'all_channels' => 'elements',
    },
);

has 'publish_queue' => (
    is => 'ro',
    isa => 'ArrayRef',
    default => sub { [] },
    traits => [ 'Array' ],
    handles => {
        'publish_queue_push'    => 'push',
        'publish_queue_unshift' => 'unshift',
    },
);

has 'is_connected' => (
    is => 'rw',
    isa => 'Bool',
);

has '_json' => ( is => 'rw', lazy_build => 1, isa => 'JSON' );

has '_pg_query_watchers' => (
    is => 'ro',
    isa => 'ArrayRef',
    default => sub { [] },
    traits => [ 'Array' ],
    handles => {
        '_pg_query_watcher_push' => 'push',
    }
);

sub BUILD {
    my ($self) = @_;
    
    # once everything is set up, we can construct and connect our client object
    $self->_client;
}

# JSON codec pack
sub _build__json {
    my ($self) = @_;
    return JSON->new->utf8;
}

sub _build_client {
    my ($self) = @_;

    my $dsn = $self->dsn;
    my $pg = AnyEvent::Pg->new(
        $dsn,
        on_connect       => sub { $self->_on_connect(@_) },
        on_connect_error => sub { $self->_on_connect_error(@_) },
        on_error         => sub { $self->_on_error(@_) },
        on_notify        => sub { $self->_on_notify(@_) },
    );

    return $pg;
}

sub listen {
    my ($self, $channel, %query_opts) = @_;

    $self->add_channel($channel);
    return unless $self->is_connected;
    
    $self->_push_listen($channel, %query_opts);
}

sub _push_listen {
    my ($self, $channel, %query_opts) = @_;
    return $self->_push_notif_command('LISTEN', $channel, %query_opts);
}

sub unlisten {
    my ($self, $channel, %query_opts) = @_;

    return $self->_push_notif_command('UNLISTEN', $channel, %query_opts);
}

# publishes notification with $payload on channel
sub notify {
    my ($self, @rest) = @_;
    my ($channel, $payload, %query_opts) = @rest;

    unless ($self->is_connected) {
        $self->publish_queue_push(\@rest);
        return;
    }
    
    my $query = 'NOTIFY "' . $self->_client->dbc->escapeString($channel) . '"';
    $query = join(',', $query, $self->_client->dbc->escapeLiteral($payload)) if $payload;
    warn $query if $self->debug;
    my $qw = $self->_client->push_query(query => $query, %query_opts);
    $self->_pg_query_watcher_push($qw);
}

# handles LISTEN/UNLISTEN
sub _push_notif_command {
    my ($self, $cmd, $channel, %opts) = @_;

    my $query = $cmd . ' "' . $self->_client->dbc->escapeString($channel) . '"';
    my $qw = $self->_client->push_query(



( run in 2.449 seconds using v1.01-cache-2.11-cpan-e93a5daba3e )