AnyEvent-Handle-ZeroMQ

 view release on metacpan or  search on metacpan

lib/AnyEvent/Handle/ZeroMQ/Dealer.pm  view on Meta::CPAN

package AnyEvent::Handle::ZeroMQ::Dealer;

use 5.006;
use strict;
use warnings;

use AnyEvent::Handle::ZeroMQ qw(:constant);
use base qw(AnyEvent::Handle::ZeroMQ);

our $VERSION = $AnyEvent::Handle::ZeroMQ::Version;

=head1 NAME

AnyEvent::Handle::ZeroMQ::Dealer - use AnyEvent::Handle::ZeroMQ as concurrent request-reply pattern

=head1 SYNOPSIS

    use AnyEvent::Handle::ZeroMQ::Dealer;
    use AE;
    use ZeroMQ;

    my $ctx = ZeroMQ::Context->new;
    my $socket = $ctx->socket(ZMQ_XREQ);
    $socket->bind('tcp://0:8888');

    my $hdl = AnyEvent::Handle::ZeroMQ::Dealer->new(
	socket => $socket,
	on_drain => sub { print "the write queue is empty\n" },
    );
    # or $hdl->on_drain( sub { ... } );
    my @request = ...;
    $hdl->push_write( \@request, sub {
	my($hdl, $reply) = @_;
	...
    } );

    AE::cv->recv;

=cut

use constant {
    SLOT => 0,
};

=head1 METHODS

=head2 new( socket => ..., on_drain => ... )

Get an AnyEvent::Handle::ZeroMQ::Dealer instance

=cut

sub new {
    my $class = shift;
    my $self = $class->SUPER::new(@_);

    $self->[DEALER] = [];
    $self->[DEALER][SLOT] = [];
    return $self;
}

sub _dealer_read_cb {
    my($self, $msgs) = @_;

    my $n = unpack 'V', shift(@$msgs)->data;

    my $cb = delete $self->[DEALER][SLOT][$n];
    if( !$cb ) {
	$self->SUPER::push_read(\&_dealer_read_cb);
	return;
    }

    0 while( @$msgs && shift(@$msgs)->size );
    $cb->($self, $msgs);
}

=head2 push_write( request_data(array_ref), cb(hdl, reply_data(array_ref) )

=cut

sub push_write {
    my($self, $msgs, $cb) = @_;

    my $n = 0;
    ++$n while $self->[DEALER][SLOT][$n];
    $self->[DEALER][SLOT][$n] = $cb;

    unshift @$msgs, pack('V', $n), '';

    $self->SUPER::push_write($msgs);
    $self->SUPER::push_read(\&_dealer_read_cb);
}

=head2 push_read

Don't use this.

=cut

sub push_read {
    use Carp;
    warn __PACKAGE__."::push_read shouldn't be called.";
}

1;



( run in 1.432 second using v1.01-cache-2.11-cpan-97f6503c9c8 )