AnyEvent-ZeroMQ
view release on metacpan or search on metacpan
t/handle-basic-tcp.t view on Meta::CPAN
use strict;
use warnings;
use Test::More;
use EV;
use AnyEvent::ZeroMQ::Handle;
use ZeroMQ::Raw;
use ZeroMQ::Raw::Constants qw(ZMQ_SUBSCRIBE ZMQ_PUB ZMQ_SUB ZMQ_NOBLOCK ZMQ_IDENTITY);
my $c = ZeroMQ::Raw::Context->new( threads => 1 );
my $pub = ZeroMQ::Raw::Socket->new($c, ZMQ_PUB);
my $sub = ZeroMQ::Raw::Socket->new($c, ZMQ_SUB);
my $pub_h = AnyEvent::ZeroMQ::Handle->new( socket => $pub, identity => 'pub_h' );
my $sub_h = AnyEvent::ZeroMQ::Handle->new( socket => $sub );
ok $pub_h, 'got publish handle';
ok $sub_h, 'got subscribe handle';
is $pub_h->identity, 'pub_h', 'got cached id';
is $pub_h->socket->getsockopt( ZMQ_IDENTITY ), 'pub_h', 'got real id';
$sub_h->identity('sub_h');
is $sub_h->socket->getsockopt( ZMQ_IDENTITY ), 'sub_h',
'got real id after using accessor';
$pub->bind('tcp://127.0.0.1:1235');
$sub->connect('tcp://127.0.0.1:1235');
$sub->setsockopt(ZMQ_SUBSCRIBE, '');
my $cv = AnyEvent->condvar;
$cv->begin for 1..2; # read x2
my ($a, $b);
$sub_h->push_read(sub {
my ($h, $data) = @_;
$a = $data;
pass 'got first piece of data';
$cv->end;
});
$sub_h->push_read(sub {
my ($h, $data) = @_;
$b = $data;
pass 'got second piece of data';
$cv->end;
});
my $made_b = 0;
my $drained = 0;
$pub_h->on_drain( sub { pass 'drained'; $drained = 1 } );
$pub_h->push_write('a');
$pub_h->push_write(sub { pass 'write called'; $made_b = 1; return 'b' });
$cv->recv;
is $a, 'a', 'got a';
is $b, 'b', 'got b';
ok $made_b, 'and b was generated by code';
ok $drained, 'the on_drain handler ran';
$pub_h->clear_on_drain;
# test the on_read callback
my @r;
$cv = AnyEvent->condvar;
$cv->begin for 1..2;
$sub_h->on_read(sub { push @r, $_[1]; $cv->end });
$pub_h->push_write(ZeroMQ::Raw::Message->new_from_scalar('c'));
$pub_h->push_write(sub { ZeroMQ::Raw::Message->new_from_scalar('d') });
( run in 1.178 second using v1.01-cache-2.11-cpan-39bf76dae61 )