AnyEvent-ZeroMQ

 view release on metacpan or  search on metacpan

lib/AnyEvent/ZeroMQ/Handle.pm  view on Meta::CPAN

use Scalar::Util qw(weaken);
use Try::Tiny;
use POSIX qw(EAGAIN EWOULDBLOCK);

use true;
use namespace::autoclean;

has 'socket' => (
    is       => 'ro',
    isa      => 'ZeroMQ::Raw::Socket',
    handles  => [qw/bind connect/],
    required => 1,
);

before qw/bind connect/ => sub {
    $_[0]->identity;
};

after qw/bind connect/ => sub {
    my $self = shift;
    # this can change readability/writability status, so do the checks
    # again
    $self->read;
    $self->write;
};

has 'identity' => (
    is         => 'rw',    # note: you can change this, but it has
                           # no effect until a new bind/connect.
    isa        => IdentityStr,
    lazy_build => 1,
    trigger    => sub { shift->_change_identity(@_) },
);

has 'on_read' => (
    is        => 'rw',
    isa       => 'CodeRef',
    predicate => 'has_on_read',
    clearer   => 'clear_on_read',

lib/AnyEvent/ZeroMQ/Handle/Role/Generic.pm  view on Meta::CPAN

use Moose::Role;
use true;
use namespace::autoclean;

requires 'on_error';
requires 'clear_on_error';
requires 'has_on_error';
requires 'identity';
requires 'has_identity';
requires 'socket';
requires 'bind';
requires 'connect';

__END__
=pod

=head1 NAME

AnyEvent::ZeroMQ::Handle::Role::Generic - stuff both readable and wrtiable handles do

=head1 VERSION

lib/AnyEvent/ZeroMQ/Role/WithHandle.pm  view on Meta::CPAN

        isa      => Endpoints,
        default  => sub { [] },
        coerce   => 1,
        traits   => ['Array'],
        handles  => {
            connected_to => 'elements',
            _connect     => 'push',
        },
    );

    has 'bind' => (
        init_arg => 'bind',
        isa      => Endpoints,
        default  => sub { [] },
        coerce   => 1,
        traits   => ['Array'],
        handles  => {
            bound_to => 'elements',
            _bind    => 'push',
        },
    );

    my @roles = 'AnyEvent::ZeroMQ::Handle::Role::Generic';
    push @roles, 'AnyEvent::ZeroMQ::Handle::Role::Readable' if $dir =~ /r/;
    push @roles, 'AnyEvent::ZeroMQ::Handle::Role::Writable' if $dir =~ /w/;
    # XXX: we want to apply @roles, but not until after the
    # parameterized role has been applied.  this poses a problem, so
    # each consumer must do it manually.  wtf.

lib/AnyEvent/ZeroMQ/Role/WithHandle.pm  view on Meta::CPAN

    # works for these three roles, do not cut-n-paste!
    my @methods = map { "$_" } map { $_->meta->get_required_method_list } @roles;

    has 'handle' => (
        reader     => 'handle',
        isa        => 'AnyEvent::ZeroMQ::Handle',
        lazy_build => 1,
        handles    => [@methods],
    );

    after 'bind' => sub {
        my ($self, $bind_to) = @_;
        $self->_bind($bind_to);
    };

    after 'connect' => sub {
        my ($self, $connect_to) = @_;
        $self->_connect($connect_to);
    };

    has '_extra_initargs' => (
        is       => 'ro',
        isa      => 'HashRef',

lib/AnyEvent/ZeroMQ/Role/WithHandle.pm  view on Meta::CPAN

    method '_build_handle' => sub {
        my $self = shift;

        my $socket = ZeroMQ::Raw::Socket->new($self->context, $type);

        my $h = AnyEvent::ZeroMQ::Handle->new(
            socket => $socket,
            %{$self->_extra_initargs || {}},
        );

        for my $bind ($self->bound_to){
            $h->bind($bind);
        }

        for my $connect ($self->connected_to){
            $h->connect($connect);
        }

        return $h;
    };

    # this does a few things:
    #
    # * allow multiple bind/connect pairs to be passed in
    #
    # * gather initargs delegated from Handle and save those as
    #   _extra_initargs.  in _build_handle, these get passed to the
    #   Handle's constructor, allowing on_read/on_drain/etc. to work
    #   correctly.
    #
    #   BUG: the only issue is that the on_read and on_drain get $h,
    #   the handle, instead of $self.
    method 'BUILDARGS' => sub {
        my ($class, @in) = @_;
        my %in;
        while(@in) {
            my $key = shift @in;
            my $value = shift @in;
            if($key eq 'bind' || $key eq 'connect'){
                $in{$key} ||= [];
                push @{$in{$key}}, ref $value ? @$value : $value;
            }
            else {
                $in{$key} = $value;
            }
        }
        my %extra;
        for my $m (grep { !/bind|connect/ } @methods) {
            $extra{$m} = delete $in{$m} if exists $in{$m};
        }
        return { %in, _extra_initargs => \%extra };
    };

    method 'BUILD' => sub {
        my $self = shift;
        $self->handle; # make sure the handle is ready immediately
    };
};

t/basic-tcp.t  view on Meta::CPAN

use warnings;
use Test::More;

use AnyEvent::ZeroMQ;
use ZeroMQ::Raw;
use ZeroMQ::Raw::Constants qw(ZMQ_NOBLOCK ZMQ_PUB ZMQ_SUB ZMQ_SUBSCRIBE);

my $c   = ZeroMQ::Raw::Context->new( threads => 10 );
my $pub = ZeroMQ::Raw::Socket->new($c, ZMQ_PUB);
my $sub = ZeroMQ::Raw::Socket->new($c, ZMQ_SUB);
$pub->bind('tcp://127.0.0.1:1234');
$sub->connect('tcp://127.0.0.1:1234');
$sub->setsockopt(ZMQ_SUBSCRIBE, '');

my $cv = AnyEvent->condvar;
$cv->begin; # wait for writability
$cv->begin; # wait for readability

my $got;

my $r = AnyEvent::ZeroMQ->io( poll => 'r', socket => $sub, cb => sub {

t/handle-basic-tcp.t  view on Meta::CPAN

ok $pub_h, 'got publish handle';
ok $sub_h, 'got subscribe handle';

is $pub_h->identity, 'pub_h', 'got cached id';
is $pub_h->socket->getsockopt( ZMQ_IDENTITY ), 'pub_h', 'got real id';

$sub_h->identity('sub_h');
is $sub_h->socket->getsockopt( ZMQ_IDENTITY ), 'sub_h',
    'got real id after using accessor';

$pub->bind('tcp://127.0.0.1:1235');
$sub->connect('tcp://127.0.0.1:1235');
$sub->setsockopt(ZMQ_SUBSCRIBE, '');

my $cv = AnyEvent->condvar;
$cv->begin for 1..2; # read x2

my ($a, $b);
$sub_h->push_read(sub {
    my ($h, $data) = @_;
    $a = $data;

t/publish-subscribe-multiconnect.t  view on Meta::CPAN

use ok 'AnyEvent::ZeroMQ::Publish';
use ok 'AnyEvent::ZeroMQ::Subscribe';

my $ENDPOINT1 = 'inproc://#1';
my $ENDPOINT2 = 'inproc://#2';

my $c = ZeroMQ::Raw::Context->new( threads => 0 );

my $sub = AnyEvent::ZeroMQ::Subscribe->new(
    context => $c,
    bind    => $ENDPOINT1,
);

my $pub = AnyEvent::ZeroMQ::Publish->new(
    context => $c,
    connect => $ENDPOINT1,
);


# $pub pushes messages to $sub
my $cv = AnyEvent->condvar;

t/publish-subscribe-multiconnect.t  view on Meta::CPAN

my @got;
$cv->begin for 1..2;
my $cb = sub { push @got, $_[1], $cv->end };
$sub->push_read($cb) for 1..2;
$pub->push_write('foo');
$pub2->push_write('bar');
$cv->recv;
is_deeply [sort @got], [qw/bar foo/], 'got messages from two publishers';

# now $pub2 is also accepting other subscribers
$pub2->bind($ENDPOINT2);

# and $sub2 is accepting $pub2's messages
my $sub2 = AnyEvent::ZeroMQ::Subscribe->new(
    context => $c,
    connect => $ENDPOINT2,
);

# $pub2 publishes a message, and both subscribers get it
@got = ();
$cv = AnyEvent->condvar;

t/publish-subscribe-topics.t  view on Meta::CPAN

use Test::More;

use AnyEvent::ZeroMQ::Publish;
use AnyEvent::ZeroMQ::Subscribe;

my $ENDPOINT = 'inproc://#1';
my $c = ZeroMQ::Raw::Context->new( threads => 0 );

my $pub = AnyEvent::ZeroMQ::Publish->with_traits('Topics')->new(
    context => $c,
    bind    => $ENDPOINT,
);

my $sub = AnyEvent::ZeroMQ::Subscribe->with_traits('Topics')->new(
    context => $c,
    connect => $ENDPOINT,
    topics  => [qw/foo: bar:/],
);

my $cv = AE::cv;
$cv->begin for 1..2;

t/publish-subscribe.t  view on Meta::CPAN


use ok 'AnyEvent::ZeroMQ::Role::WithHandle';
use ok 'AnyEvent::ZeroMQ::Publish';
use ok 'AnyEvent::ZeroMQ::Subscribe';

my $ENDPOINT = 'inproc://#1';
my $c = ZeroMQ::Raw::Context->new( threads => 0 );

my $pub = AnyEvent::ZeroMQ::Publish->new(
    context => $c,
    bind    => $ENDPOINT,
);

my $all = AnyEvent::ZeroMQ::Subscribe->new(
    context => $c,
    connect => $ENDPOINT,
);

my $foo = AnyEvent::ZeroMQ::Subscribe->new(
    context => $c,
    connect => $ENDPOINT,

t/push-pull-multi.t  view on Meta::CPAN

#              \           \ /             |
#               \->[A]<-----X              /
#    -------------           \  -------------
#    | manager a |            \-| manager b |
#    -------------              -------------
#      PUSH                         PUSH
#

my $manager_a = AnyEvent::ZeroMQ::Push->new(
    context => $c,
    bind    => $ENDPOINT_A,
);

my $worker_a = AnyEvent::ZeroMQ::Pull->new(
    context => $c,
    bind    => $ENDPOINT_B,
    connect => $ENDPOINT_A,
);

my $worker_b = AnyEvent::ZeroMQ::Pull->new(
    context => $c,
    bind    => $ENDPOINT_C,
    connect => $ENDPOINT_A,
);

my $manager_b = AnyEvent::ZeroMQ::Push->new(
    context => $c,
    connect => $ENDPOINT_B,
    connect => $ENDPOINT_C,
);

my %results;

t/push-pull.t  view on Meta::CPAN

use warnings;
use Test::More;

use ok 'AnyEvent::ZeroMQ::Push';
use ok 'AnyEvent::ZeroMQ::Pull';

my $ENDPOINT = 'inproc://#1';

my @c = (context => ZeroMQ::Raw::Context->new( threads => 0 ));

my $server   = AnyEvent::ZeroMQ::Push->new( @c, bind    => $ENDPOINT );
my $client_a = AnyEvent::ZeroMQ::Pull->new( @c, connect => $ENDPOINT );

my $cv = AnyEvent->condvar;
my @to_write = qw/a b/;
my ($a, $b, $drain) = (0, 0, 0);

$cv->begin for 1..8;
$client_a->on_read(sub { $a++; $cv->end });

my $client_b = AnyEvent::ZeroMQ::Pull->new(

t/request-reply.t  view on Meta::CPAN

use ok 'AnyEvent::ZeroMQ::Reply';

my $on_request;
my $_on_request = sub { eval { $on_request->() } };

my $ENDPOINT = 'inproc://#1';
my $c = ZeroMQ::Raw::Context->new( threads => 0 );

my $req = AnyEvent::ZeroMQ::Request->new(
    context => $c,
    bind    => $ENDPOINT,
);

my $rep = AnyEvent::ZeroMQ::Reply->new(
    context    => $c,
    connect    => $ENDPOINT,
    on_request => sub {
        my ($h, $req) = @_;
        $_on_request->();
        $req++;
        return $req;



( run in 0.799 second using v1.01-cache-2.11-cpan-e1769b4cff6 )