AnyEvent-Handle-ZeroMQ

 view release on metacpan or  search on metacpan

lib/AnyEvent/Handle/ZeroMQ.pm  view on Meta::CPAN


use 5.006;
use strict;
use warnings;

=head1 NAME

AnyEvent::Handle::ZeroMQ - Integrate AnyEvent and ZeroMQ with AnyEvent::Handle like ways.

=head1 VERSION

Version 0.09

=cut

our $VERSION = '0.09';


=head1 SYNOPSIS

    use AnyEvent::Handle::ZeroMQ;
    use AE;
    use ZeroMQ;

    my $ctx = ZeroMQ::Context->new;
    my $socket = $ctx->socket(ZMQ_XREP);
    $socket->bind('tcp://0:8888');

    my $hdl = AnyEvent::Handle::ZeroMQ->new(
	socket => $socket,
	on_drain => sub { print "the write queue is empty\n" },
	on_error => sub { my($error_msg) = @_; ... },
	    # catch errors when occured in the reading callback
    );
    # or $hdl->on_drain( sub { ... } );
    # or $hdl->on_error( sub { ... } );
    $hdl->push_read( sub {
	my($hdl, $data) = @_;

	my @out;
	while( defined( my $msg = shift @$data ) ) {
	    push @out, $msg;
	    last if $msg->size == 0;
	}
	while( my $msg = shift @$data ) {
	    print "get: ",$msg->data,$/;
	}
	push @out, "get!";
	$hdl->push_write(\@out);
    } );

    AE::cv->recv;

=cut

use strict;
use warnings;

use AE;
use ZeroMQ qw(:all);
use Scalar::Util qw(weaken);

use base qw(Exporter);
our %EXPORT_TAGS = ( constant => [qw(SOCKET RQUEUE WQUEUE RWATCHER WWATCHER ON_DRAIN DEALER ROUTER)] );
our @EXPORT_OK = map { @$_ } values %EXPORT_TAGS;

use constant {
    SOCKET => 0,
    RQUEUE => 1,
    WQUEUE => 2,
    WATCHER => 3,
    ON_DRAIN => 4,
    DEALER => 5,
    ROUTER => 6,
    ON_ERROR => 7,
};

=head1 METHODS

=head2 new( socket => $zmq_socket, on_drain(optional) => cb(hdl) )

=cut

sub new {
    my $class = shift;
    my %args = @_;
    my $socket = $args{socket};

    my $fd = $socket->getsockopt(ZMQ_FD);

    my($self, $wself);

    $self = $wself = bless [
	$socket,
	[],
	[],
	AE::io($fd, 0, sub { _consume_read_write($wself) }),
	undef,
    ], $class;

    weaken $wself;

    if( exists $args{on_drain} ) {
	on_drain($self, $args{on_drain});
    }
    if( exists $args{on_error} ) {
	on_error($self, $args{on_error});
    }

    return $self;
}

sub _consume_read_write {
    _consume_write(@_);
    _consume_read(@_);
}

=head2 push_read( cb(hdl, data (array_ref) ) )

=cut

sub _consume_read {
    my $self = shift;

    my $socket = $self->[SOCKET];
    my $rqueue = $self->[RQUEUE];

    while( $socket->getsockopt(ZMQ_EVENTS) & ZMQ_POLLIN && @$rqueue ) {
	my @msgs;
	{
	    push @msgs, $socket->recv;
	    redo if $socket->getsockopt(ZMQ_RCVMORE);
	}
	my $cb = shift @$rqueue;
	eval { $cb->($self, \@msgs) };
	if( $@ ) {
	    if( $self->[ON_ERROR] ) {
		$self->[ON_ERROR]($@);
	    }
	    else {
		warn $@;
	    }
	}
    }
}

sub push_read {
    my $self = shift;
    push @{$self->[RQUEUE]}, pop;
    _consume_read($self);
}

=head2 push_write( data (array_ref) )

=cut

sub _consume_write {
    my $self = shift;

    my $socket = $self->[SOCKET];
    my $wqueue = $self->[WQUEUE];



( run in 1.946 second using v1.01-cache-2.11-cpan-39bf76dae61 )