AnyEvent-Handle-ZeroMQ

 view release on metacpan or  search on metacpan

lib/AnyEvent/Handle/ZeroMQ/Router.pm  view on Meta::CPAN

package AnyEvent::Handle::ZeroMQ::Router;

use 5.006;
use strict;
use warnings;

use base qw(AnyEvent::Handle::ZeroMQ);

our $VERSION = $AnyEvent::Handle::ZeroMQ::Version;

=head1 NAME

AnyEvent::Handle::ZeroMQ::Router - use AnyEvent::Handle::ZeroMQ as concurrent reply-request pattern

=head1 SYNOPSIS

    use AnyEvent::Handle::ZeroMQ::Router;
    use AE;
    use ZeroMQ;

    my $ctx = ZeroMQ::Context->new;
    my $socket = $ctx->socket(ZMQ_XREP);
    $socket->bind('tcp://0:8888');

    my $hdl = AnyEvent::Handle::ZeroMQ::Router->new(
	socket => $socket,
	on_drain => sub { print "the write queue is empty\n" },
    );
    # or $hdl->on_drain( sub { ... } );
    $hdl->push_read( sub {
	my($hdl, $request, $cb) = @_;
	my @reply = ...
	$cb->(\@reply);
    } );

    AE::cv->recv;

=head1 METHODS

=head2 new( socket => ..., on_drain => ... )

get an AnyEvent::Handle::ZeroMQ::Dealer instance

=cut

#sub new {
#    my $class = shift;
#    my $self = $class->SUPER::new(@_);
#}

=head2 push_read( cb(hdl, request_data(array_ref), reply_cb( reply_data(array_ref) ) ) )

=cut

sub push_read {
    my($self, $cb) = @_;

    $self->SUPER::push_read( sub {
	my($self, $msgs) = @_;

	my $i = 0;
	++$i while( $msgs->[$i]->size );
	my @header = splice @$msgs, 0, $i;
	shift @$msgs;

	$cb->($self, $msgs, sub {
	    my($msgs) = @_;
	    unshift @$msgs, @header, '';
	    $self->SUPER::push_write($msgs);
	});
    } );
}

=head2 push_write

Don't use this.

=cut

sub push_write {
    warn __PACKAGE__."::push_write shouldn't be called.";
}
1;



( run in 0.931 second using v1.01-cache-2.11-cpan-98e64b0badf )