AnyEvent-Handle-ZeroMQ
view release on metacpan or search on metacpan
lib/AnyEvent/Handle/ZeroMQ.pm view on Meta::CPAN
use 5.006;
use strict;
use warnings;
=head1 NAME
AnyEvent::Handle::ZeroMQ - Integrate AnyEvent and ZeroMQ with AnyEvent::Handle like ways.
=head1 VERSION
Version 0.09
=cut
our $VERSION = '0.09';
=head1 SYNOPSIS
use AnyEvent::Handle::ZeroMQ;
use AE;
use ZeroMQ;
my $ctx = ZeroMQ::Context->new;
my $socket = $ctx->socket(ZMQ_XREP);
$socket->bind('tcp://0:8888');
my $hdl = AnyEvent::Handle::ZeroMQ->new(
socket => $socket,
on_drain => sub { print "the write queue is empty\n" },
on_error => sub { my($error_msg) = @_; ... },
# catch errors when occured in the reading callback
);
# or $hdl->on_drain( sub { ... } );
# or $hdl->on_error( sub { ... } );
$hdl->push_read( sub {
my($hdl, $data) = @_;
my @out;
while( defined( my $msg = shift @$data ) ) {
push @out, $msg;
last if $msg->size == 0;
}
while( my $msg = shift @$data ) {
print "get: ",$msg->data,$/;
}
push @out, "get!";
$hdl->push_write(\@out);
} );
AE::cv->recv;
=cut
use strict;
use warnings;
use AE;
use ZeroMQ qw(:all);
use Scalar::Util qw(weaken);
use base qw(Exporter);
our %EXPORT_TAGS = ( constant => [qw(SOCKET RQUEUE WQUEUE RWATCHER WWATCHER ON_DRAIN DEALER ROUTER)] );
our @EXPORT_OK = map { @$_ } values %EXPORT_TAGS;
use constant {
SOCKET => 0,
RQUEUE => 1,
WQUEUE => 2,
WATCHER => 3,
ON_DRAIN => 4,
DEALER => 5,
ROUTER => 6,
ON_ERROR => 7,
};
=head1 METHODS
=head2 new( socket => $zmq_socket, on_drain(optional) => cb(hdl) )
=cut
sub new {
my $class = shift;
my %args = @_;
my $socket = $args{socket};
my $fd = $socket->getsockopt(ZMQ_FD);
my($self, $wself);
$self = $wself = bless [
$socket,
[],
[],
AE::io($fd, 0, sub { _consume_read_write($wself) }),
undef,
], $class;
weaken $wself;
if( exists $args{on_drain} ) {
on_drain($self, $args{on_drain});
}
if( exists $args{on_error} ) {
on_error($self, $args{on_error});
}
return $self;
}
sub _consume_read_write {
_consume_write(@_);
_consume_read(@_);
}
=head2 push_read( cb(hdl, data (array_ref) ) )
=cut
sub _consume_read {
my $self = shift;
my $socket = $self->[SOCKET];
my $rqueue = $self->[RQUEUE];
while( $socket->getsockopt(ZMQ_EVENTS) & ZMQ_POLLIN && @$rqueue ) {
my @msgs;
{
push @msgs, $socket->recv;
redo if $socket->getsockopt(ZMQ_RCVMORE);
}
my $cb = shift @$rqueue;
eval { $cb->($self, \@msgs) };
if( $@ ) {
if( $self->[ON_ERROR] ) {
$self->[ON_ERROR]($@);
}
else {
warn $@;
}
}
}
}
sub push_read {
my $self = shift;
push @{$self->[RQUEUE]}, pop;
_consume_read($self);
}
=head2 push_write( data (array_ref) )
=cut
sub _consume_write {
my $self = shift;
my $socket = $self->[SOCKET];
my $wqueue = $self->[WQUEUE];
( run in 1.946 second using v1.01-cache-2.11-cpan-39bf76dae61 )