AnyEvent-ZeroMQ

 view release on metacpan or  search on metacpan

t/handle-basic-tcp.t  view on Meta::CPAN

use strict;
use warnings;
use Test::More;

use EV;
use AnyEvent::ZeroMQ::Handle;
use ZeroMQ::Raw;
use ZeroMQ::Raw::Constants qw(ZMQ_SUBSCRIBE ZMQ_PUB ZMQ_SUB ZMQ_NOBLOCK ZMQ_IDENTITY);

my $c   = ZeroMQ::Raw::Context->new( threads => 1 );
my $pub = ZeroMQ::Raw::Socket->new($c, ZMQ_PUB);
my $sub = ZeroMQ::Raw::Socket->new($c, ZMQ_SUB);

my $pub_h = AnyEvent::ZeroMQ::Handle->new( socket => $pub, identity => 'pub_h' );
my $sub_h = AnyEvent::ZeroMQ::Handle->new( socket => $sub );

ok $pub_h, 'got publish handle';
ok $sub_h, 'got subscribe handle';

is $pub_h->identity, 'pub_h', 'got cached id';
is $pub_h->socket->getsockopt( ZMQ_IDENTITY ), 'pub_h', 'got real id';

$sub_h->identity('sub_h');
is $sub_h->socket->getsockopt( ZMQ_IDENTITY ), 'sub_h',
    'got real id after using accessor';

$pub->bind('tcp://127.0.0.1:1235');
$sub->connect('tcp://127.0.0.1:1235');
$sub->setsockopt(ZMQ_SUBSCRIBE, '');

my $cv = AnyEvent->condvar;
$cv->begin for 1..2; # read x2

my ($a, $b);
$sub_h->push_read(sub {
    my ($h, $data) = @_;
    $a = $data;
    pass 'got first piece of data';
    $cv->end;
});

$sub_h->push_read(sub {
    my ($h, $data) = @_;
    $b = $data;
    pass 'got second piece of data';
    $cv->end;
});

my $made_b = 0;
my $drained = 0;

$pub_h->on_drain( sub { pass 'drained'; $drained = 1 } );
$pub_h->push_write('a');
$pub_h->push_write(sub { pass 'write called'; $made_b = 1; return 'b' });

$cv->recv;

is $a, 'a', 'got a';
is $b, 'b', 'got b';
ok $made_b, 'and b was generated by code';
ok $drained, 'the on_drain handler ran';
$pub_h->clear_on_drain;

# test the on_read callback
my @r;
$cv = AnyEvent->condvar;
$cv->begin for 1..2;
$sub_h->on_read(sub { push @r, $_[1]; $cv->end });
$pub_h->push_write(ZeroMQ::Raw::Message->new_from_scalar('c'));
$pub_h->push_write(sub { ZeroMQ::Raw::Message->new_from_scalar('d') });



( run in 1.178 second using v1.01-cache-2.11-cpan-39bf76dae61 )