AnyEvent-ZeroMQ
view release on metacpan or search on metacpan
lib/AnyEvent/ZeroMQ/Handle.pm view on Meta::CPAN
$AnyEvent::ZeroMQ::Handle::VERSION = '0.01';
}
# ABSTRACT: AnyEvent::Handle-like interface for 0MQ sockets
use Moose;
use AnyEvent::ZeroMQ;
use AnyEvent::ZeroMQ::Types qw(IdentityStr);
use ZeroMQ::Raw::Constants qw(ZMQ_NOBLOCK ZMQ_IDENTITY);
use Params::Util qw(_CODELIKE);
use Scalar::Util qw(weaken);
use Try::Tiny;
use POSIX qw(EAGAIN EWOULDBLOCK);
use true;
use namespace::autoclean;
has 'socket' => (
is => 'ro',
isa => 'ZeroMQ::Raw::Socket',
handles => [qw/bind connect/],
lib/AnyEvent/ZeroMQ/Handle.pm view on Meta::CPAN
);
has [qw/read_buffer write_buffer/] => (
init_arg => undef,
is => 'ro',
default => sub { [] },
);
sub _build_read_watcher {
my $self = shift;
weaken $self;
return AnyEvent::ZeroMQ->io(
poll => 'r',
socket => $self->socket,
cb => sub { $self->read },
);
}
sub _build_write_watcher {
my $self = shift;
weaken $self;
return AnyEvent::ZeroMQ->io(
poll => 'w',
socket => $self->socket,
cb => sub { $self->write },
);
}
sub _build_identity {
my ($self) = @_;
return $self->socket->getsockopt( ZMQ_IDENTITY );
lib/AnyEvent/ZeroMQ/Reply.pm view on Meta::CPAN
package AnyEvent::ZeroMQ::Reply;
BEGIN {
$AnyEvent::ZeroMQ::Reply::VERSION = '0.01';
}
# ABSTRACT: Non-blocking OO abstraction over ZMQ_REP request/reply sockets
use Moose;
use true;
use namespace::autoclean;
use Scalar::Util qw(weaken);
use ZeroMQ::Raw::Constants qw(ZMQ_REP);
with 'AnyEvent::ZeroMQ::Role::WithHandle' =>
{ socket_type => ZMQ_REP, socket_direction => '' };
has 'on_request' => (
is => 'ro',
isa => 'CodeRef',
required => 1,
);
after 'BUILD' => sub {
my $self = shift;
my $h = $self->handle;
weaken $self;
$h->on_read(sub {
my ($h, $msg) = @_;
my $res = $self->on_request->($self, $msg);
$h->push_write($res);
});
};
with 'AnyEvent::ZeroMQ::Handle::Role::Generic';
__PACKAGE__->meta->make_immutable;
lib/AnyEvent/ZeroMQ/Subscribe.pm view on Meta::CPAN
package AnyEvent::ZeroMQ::Subscribe;
BEGIN {
$AnyEvent::ZeroMQ::Subscribe::VERSION = '0.01';
}
# ABSTRACT: Non-blocking OO abstraction over ZMQ_SUB publish/subscribe sockets
use Moose;
use true;
use namespace::autoclean;
use MooseX::Types::Set::Object;
use Scalar::Util qw(weaken);
use ZeroMQ::Raw::Constants qw(ZMQ_SUB ZMQ_SUBSCRIBE ZMQ_UNSUBSCRIBE);
with 'AnyEvent::ZeroMQ::Role::WithHandle' =>
{ socket_type => ZMQ_SUB, socket_direction => '' },
'MooseX::Traits';
has '+_trait_namespace' => ( default => 'AnyEvent::ZeroMQ::Subscribe::Trait' );
has 'topics' => (
is => 'rw',
lib/AnyEvent/ZeroMQ/Subscribe.pm view on Meta::CPAN
return $new;
}
has 'on_read' => (
is => 'rw',
isa => 'CodeRef',
predicate => 'has_on_read',
clearer => 'clear_on_read',
trigger => sub {
my ($self, $val) = @_;
weaken $self;
$self->handle->on_read(sub { $self->_receive_item(@_) });
},
);
sub _receive_item {
my ($self, $h, $item, @rest) = @_;
# if we don't has_on_read, got_item can never be called.
confess 'BUG: receive_item called but there is no on_read'
unless $self->has_on_read; # but check anyway.
$self->_call_callback( $self->on_read, $item, @rest );
}
sub _call_callback { # i wonder what this does
my ($self, $cb, $item, @rest) = @_;
return $cb->($self, $item, @rest); # who would have guessed!
}
sub push_read {
my ($self, $cb) = @_;
weaken $self;
$self->handle->push_read(sub {
my ($h, $item, @rest) = @_;
$self->_call_callback($cb, $item, @rest);
});
}
sub _subscribe {
my ($self, $topic) = @_;
$self->handle->socket->setsockopt(ZMQ_SUBSCRIBE, $topic);
}
( run in 0.424 second using v1.01-cache-2.11-cpan-a9ef4e587e4 )