AnyMQ-Pg

 view release on metacpan or  search on metacpan

lib/AnyMQ/Trait/Pg.pm  view on Meta::CPAN

);

sub BUILD {
    my ($self) = @_;
    
    # once everything is set up, we can construct and connect our client object
    $self->_client;
}

# JSON codec pack
sub _build__json {
    my ($self) = @_;
    return JSON->new->utf8;
}

sub _build_client {
    my ($self) = @_;

    my $dsn = $self->dsn;
    my $pg = AnyEvent::Pg->new(
        $dsn,
        on_connect       => sub { $self->_on_connect(@_) },
        on_connect_error => sub { $self->_on_connect_error(@_) },
        on_error         => sub { $self->_on_error(@_) },
        on_notify        => sub { $self->_on_notify(@_) },
    );

    return $pg;
}

sub listen {
    my ($self, $channel, %query_opts) = @_;

    $self->add_channel($channel);
    return unless $self->is_connected;
    
    $self->_push_listen($channel, %query_opts);
}

sub _push_listen {
    my ($self, $channel, %query_opts) = @_;
    return $self->_push_notif_command('LISTEN', $channel, %query_opts);
}

sub unlisten {
    my ($self, $channel, %query_opts) = @_;

    return $self->_push_notif_command('UNLISTEN', $channel, %query_opts);
}

# publishes notification with $payload on channel
sub notify {
    my ($self, @rest) = @_;
    my ($channel, $payload, %query_opts) = @rest;

    unless ($self->is_connected) {
        $self->publish_queue_push(\@rest);
        return;
    }
    
    my $query = 'NOTIFY "' . $self->_client->dbc->escapeString($channel) . '"';
    $query = join(',', $query, $self->_client->dbc->escapeLiteral($payload)) if $payload;
    warn $query if $self->debug;
    my $qw = $self->_client->push_query(query => $query, %query_opts);
    $self->_pg_query_watcher_push($qw);
}

# handles LISTEN/UNLISTEN
sub _push_notif_command {
    my ($self, $cmd, $channel, %opts) = @_;

    my $query = $cmd . ' "' . $self->_client->dbc->escapeString($channel) . '"';
    my $qw = $self->_client->push_query(
        query => $query,
        %opts
    );
    warn $query if $self->debug;
    $self->_pg_query_watcher_push($qw);
    return $qw;
}

sub encode_event {
    my ($self, $evt) = @_;

    return $evt unless ref $evt;

    # encode refs with JSON
    return $self->_json->encode($evt);
}

sub _on_connect {
    my $self = shift;

    $self->is_connected(1);

    # subscribe to channels
    if ($self->all_channels) {
        $self->_push_listen($_) for $self->all_channels;
    }
    
    # publish outstanding notifs
    my $pub_queue = $self->publish_queue;
    if ($pub_queue) {
        foreach my $evt (@$pub_queue) {
            $self->notify(@$evt);
        }
    }
    
    # time to call our connect callback
    $self->on_connect->($self, @_) if $self->on_connect;
}

sub _on_connect_error {
    my ($self, @rest) = @_;

    $self->is_connected(0);
    $self->_on_error(@rest);
}

sub _on_error {
    my $self = shift;
    my ($pg) = @_;

    my $err = $pg->dbc->errorMessage;

    if ($self->on_error) {
        $self->on_error->($self, $err);
    } else {
        warn "AnyMQ::Pg error: $err";
    }
}



( run in 1.096 second using v1.01-cache-2.11-cpan-5735350b133 )