AnyMQ-ZeroMQ

 view release on metacpan or  search on metacpan

lib/AnyMQ/Topic/Trait/ZeroMQ.pm  view on Meta::CPAN

    # uninstall our callback
    $self->bus->unsubscribe($self->name, $self->read_callback_ref)
        if $self->read_callback_ref;
};

# send events to ZeroMQ server
after 'dispatch_messages' => sub {
    my ($self, @events) = @_;
    
    # if this bus is just listening for events, we don't need to
    # publish the event to the zeromq server, just call callbacks
    return unless $self->bus->publish_address;

    my $pub = $self->bus->_zmq_pub;

    # encode events as JSON and transmit them
    foreach my $event (@events) {
        my $json = $event;
        if (ref $json) {
            $json = $self->bus->_zmq_json->encode($event);
        }

lib/AnyMQ/Trait/ZeroMQ.pm  view on Meta::CPAN

use JSON;

has 'publish_address'   => ( is => 'rw', isa => 'Str' );
has 'subscribe_address' => ( is => 'rw', isa => 'Str' );

has '_zmq_sub' => ( is => 'rw', lazy_build => 1, isa => 'AnyEvent::ZeroMQ::Subscribe' );
has '_zmq_pub' => ( is => 'rw', lazy_build => 1, isa => 'AnyEvent::ZeroMQ::Publish' );
has '_zmq_context' => ( is => 'rw', lazy_build => 1, isa => 'ZeroMQ::Raw::Context' );
has '_zmq_json' => ( is => 'rw', lazy_build => 1, isa => 'JSON' );

# topic => [ callbacks ]
has 'subscriptions' => (
    traits     => ['Hash'],
    is         => 'ro',
    isa        => 'HashRef[ArrayRef[CodeRef]]',
    default    => sub { {} },
    handles    => {
        subscription_topics => 'keys',
    },
);        

lib/AnyMQ/Trait/ZeroMQ.pm  view on Meta::CPAN

    unless ($event) {
        warn "Got invalid JSON: $json";
        return;
    }

    my $topic = $event->{type};
    unless ($topic) {
        warn "Got event with no topic type\n";
    }

    # call event handler callbacks
    my $cbs = $self->subscriptions->{$topic};

    unless ($cbs && @$cbs) {
        #warn "Got event $topic but no callbacks found\n";
        return;
    }
    
    foreach my $cb (@$cbs) {
        $cb->($event);
    }
}

# calls $cb when we receive a $topic event
# returns ref that can be passed to unsubscribe()



( run in 0.951 second using v1.01-cache-2.11-cpan-9b1e4054eb1 )