AnyEvent-ZeroMQ
view release on metacpan or search on metacpan
lib/AnyEvent/ZeroMQ/Handle.pm view on Meta::CPAN
package AnyEvent::ZeroMQ::Handle;
BEGIN {
$AnyEvent::ZeroMQ::Handle::VERSION = '0.01';
}
# ABSTRACT: AnyEvent::Handle-like interface for 0MQ sockets
use Moose;
use AnyEvent::ZeroMQ;
use AnyEvent::ZeroMQ::Types qw(IdentityStr);
use ZeroMQ::Raw::Constants qw(ZMQ_NOBLOCK ZMQ_IDENTITY);
use Params::Util qw(_CODELIKE);
use Scalar::Util qw(weaken);
use Try::Tiny;
use POSIX qw(EAGAIN EWOULDBLOCK);
use true;
use namespace::autoclean;
has 'socket' => (
is => 'ro',
isa => 'ZeroMQ::Raw::Socket',
handles => [qw/bind connect/],
required => 1,
);
before qw/bind connect/ => sub {
$_[0]->identity;
};
after qw/bind connect/ => sub {
my $self = shift;
# this can change readability/writability status, so do the checks
# again
$self->read;
$self->write;
};
has 'identity' => (
is => 'rw', # note: you can change this, but it has
# no effect until a new bind/connect.
isa => IdentityStr,
lazy_build => 1,
trigger => sub { shift->_change_identity(@_) },
);
has 'on_read' => (
is => 'rw',
isa => 'CodeRef',
predicate => 'has_on_read',
clearer => 'clear_on_read',
trigger => sub { $_[0]->read },
);
has 'on_error' => (
is => 'rw',
isa => 'CodeRef',
predicate => 'has_on_error',
clearer => 'clear_on_error',
);
sub handle_error {
my ($self, $str) = @_;
return $self->on_error->($str)
if $self->has_on_error;
warn "AnyEvent::ZeroMQ::Handle: error in callback (ignoring): $str";
}
has 'on_drain' => (
is => 'rw',
isa => 'CodeRef',
predicate => 'has_on_drain',
clearer => 'clear_on_drain',
# i don't think we need to trigger this, since if we were
# writable, we would be drained.
);
has [qw/read_watcher write_watcher/] => (
init_arg => undef,
is => 'ro',
lazy_build => 1,
);
has [qw/read_buffer write_buffer/] => (
init_arg => undef,
is => 'ro',
default => sub { [] },
);
sub _build_read_watcher {
my $self = shift;
weaken $self;
return AnyEvent::ZeroMQ->io(
poll => 'r',
socket => $self->socket,
cb => sub { $self->read },
);
}
sub _build_write_watcher {
my $self = shift;
weaken $self;
return AnyEvent::ZeroMQ->io(
poll => 'w',
socket => $self->socket,
cb => sub { $self->write },
);
}
sub _build_identity {
my ($self) = @_;
return $self->socket->getsockopt( ZMQ_IDENTITY );
}
sub _change_identity {
my ($self, $new, $old) = @_;
return $self->socket->setsockopt( ZMQ_IDENTITY, $new );
}
sub has_read_todo {
my $self = shift;
return exists $self->read_buffer->[0];
}
sub readable {
my $self = shift;
return AnyEvent::ZeroMQ->probe( poll => 'r', socket => $self->socket );
}
sub _read_once {
my ($self, $cb) = @_;
local $! = 0;
try {
my $msg = ZeroMQ::Raw::Message->new;
$self->socket->recv($msg, ZMQ_NOBLOCK);
$cb->($self, $msg->data);
}
catch {
if($! == EWOULDBLOCK || $! == EAGAIN){
return;
}
else {
$self->handle_error($_);
}
};
}
sub read {
my $self = shift;
while($self->readable && $self->has_read_todo){
$self->_read_once(shift @{$self->read_buffer});
}
while($self->readable && $self->has_on_read){
$self->_read_once($self->on_read);
}
if($self->has_read_todo || $self->has_on_read){
# ensure we have a watcher
$self->read_watcher;
}
( run in 1.870 second using v1.01-cache-2.11-cpan-39bf76dae61 )