AnyEvent-ZeroMQ
view release on metacpan or search on metacpan
lib/AnyEvent/ZeroMQ/Subscribe.pm view on Meta::CPAN
package AnyEvent::ZeroMQ::Subscribe;
BEGIN {
$AnyEvent::ZeroMQ::Subscribe::VERSION = '0.01';
}
# ABSTRACT: Non-blocking OO abstraction over ZMQ_SUB publish/subscribe sockets
use Moose;
use true;
use namespace::autoclean;
use MooseX::Types::Set::Object;
use Scalar::Util qw(weaken);
use ZeroMQ::Raw::Constants qw(ZMQ_SUB ZMQ_SUBSCRIBE ZMQ_UNSUBSCRIBE);
with 'AnyEvent::ZeroMQ::Role::WithHandle' =>
{ socket_type => ZMQ_SUB, socket_direction => '' },
'MooseX::Traits';
has '+_trait_namespace' => ( default => 'AnyEvent::ZeroMQ::Subscribe::Trait' );
has 'topics' => (
is => 'rw',
isa => 'Set::Object',
coerce => 1,
default => sub { [''] },
trigger => sub {
my ($self, $new, $old) = @_;
$self->_topics_changed($new, $old);
},
);
sub _topics_changed {
my ($self, $new, $old) = @_;
return unless $old;
# sets are excellent, let's go shopping
my $subscribe = $new - $old;
my $unsubscribe = $old - $new;
$self->_unsubscribe($_) for $unsubscribe->members;
$self->_subscribe($_) for $subscribe->members;
return $new;
}
has 'on_read' => (
is => 'rw',
isa => 'CodeRef',
predicate => 'has_on_read',
clearer => 'clear_on_read',
trigger => sub {
my ($self, $val) = @_;
weaken $self;
$self->handle->on_read(sub { $self->_receive_item(@_) });
},
);
sub _receive_item {
my ($self, $h, $item, @rest) = @_;
# if we don't has_on_read, got_item can never be called.
confess 'BUG: receive_item called but there is no on_read'
unless $self->has_on_read; # but check anyway.
$self->_call_callback( $self->on_read, $item, @rest );
}
sub _call_callback { # i wonder what this does
my ($self, $cb, $item, @rest) = @_;
return $cb->($self, $item, @rest); # who would have guessed!
}
sub push_read {
my ($self, $cb) = @_;
( run in 1.781 second using v1.01-cache-2.11-cpan-39bf76dae61 )