view release on metacpan or search on metacpan
lib/AnyEvent/ZeroMQ/Handle.pm view on Meta::CPAN
use Scalar::Util qw(weaken);
use Try::Tiny;
use POSIX qw(EAGAIN EWOULDBLOCK);
use true;
use namespace::autoclean;
has 'socket' => (
is => 'ro',
isa => 'ZeroMQ::Raw::Socket',
handles => [qw/bind connect/],
required => 1,
);
before qw/bind connect/ => sub {
$_[0]->identity;
};
after qw/bind connect/ => sub {
my $self = shift;
# this can change readability/writability status, so do the checks
# again
$self->read;
$self->write;
};
has 'identity' => (
is => 'rw', # note: you can change this, but it has
# no effect until a new bind/connect.
isa => IdentityStr,
lazy_build => 1,
trigger => sub { shift->_change_identity(@_) },
);
has 'on_read' => (
is => 'rw',
isa => 'CodeRef',
predicate => 'has_on_read',
clearer => 'clear_on_read',
lib/AnyEvent/ZeroMQ/Handle/Role/Generic.pm view on Meta::CPAN
use Moose::Role;
use true;
use namespace::autoclean;
requires 'on_error';
requires 'clear_on_error';
requires 'has_on_error';
requires 'identity';
requires 'has_identity';
requires 'socket';
requires 'bind';
requires 'connect';
__END__
=pod
=head1 NAME
AnyEvent::ZeroMQ::Handle::Role::Generic - stuff both readable and wrtiable handles do
=head1 VERSION
lib/AnyEvent/ZeroMQ/Role/WithHandle.pm view on Meta::CPAN
isa => Endpoints,
default => sub { [] },
coerce => 1,
traits => ['Array'],
handles => {
connected_to => 'elements',
_connect => 'push',
},
);
has 'bind' => (
init_arg => 'bind',
isa => Endpoints,
default => sub { [] },
coerce => 1,
traits => ['Array'],
handles => {
bound_to => 'elements',
_bind => 'push',
},
);
my @roles = 'AnyEvent::ZeroMQ::Handle::Role::Generic';
push @roles, 'AnyEvent::ZeroMQ::Handle::Role::Readable' if $dir =~ /r/;
push @roles, 'AnyEvent::ZeroMQ::Handle::Role::Writable' if $dir =~ /w/;
# XXX: we want to apply @roles, but not until after the
# parameterized role has been applied. this poses a problem, so
# each consumer must do it manually. wtf.
lib/AnyEvent/ZeroMQ/Role/WithHandle.pm view on Meta::CPAN
# works for these three roles, do not cut-n-paste!
my @methods = map { "$_" } map { $_->meta->get_required_method_list } @roles;
has 'handle' => (
reader => 'handle',
isa => 'AnyEvent::ZeroMQ::Handle',
lazy_build => 1,
handles => [@methods],
);
after 'bind' => sub {
my ($self, $bind_to) = @_;
$self->_bind($bind_to);
};
after 'connect' => sub {
my ($self, $connect_to) = @_;
$self->_connect($connect_to);
};
has '_extra_initargs' => (
is => 'ro',
isa => 'HashRef',
lib/AnyEvent/ZeroMQ/Role/WithHandle.pm view on Meta::CPAN
method '_build_handle' => sub {
my $self = shift;
my $socket = ZeroMQ::Raw::Socket->new($self->context, $type);
my $h = AnyEvent::ZeroMQ::Handle->new(
socket => $socket,
%{$self->_extra_initargs || {}},
);
for my $bind ($self->bound_to){
$h->bind($bind);
}
for my $connect ($self->connected_to){
$h->connect($connect);
}
return $h;
};
# this does a few things:
#
# * allow multiple bind/connect pairs to be passed in
#
# * gather initargs delegated from Handle and save those as
# _extra_initargs. in _build_handle, these get passed to the
# Handle's constructor, allowing on_read/on_drain/etc. to work
# correctly.
#
# BUG: the only issue is that the on_read and on_drain get $h,
# the handle, instead of $self.
method 'BUILDARGS' => sub {
my ($class, @in) = @_;
my %in;
while(@in) {
my $key = shift @in;
my $value = shift @in;
if($key eq 'bind' || $key eq 'connect'){
$in{$key} ||= [];
push @{$in{$key}}, ref $value ? @$value : $value;
}
else {
$in{$key} = $value;
}
}
my %extra;
for my $m (grep { !/bind|connect/ } @methods) {
$extra{$m} = delete $in{$m} if exists $in{$m};
}
return { %in, _extra_initargs => \%extra };
};
method 'BUILD' => sub {
my $self = shift;
$self->handle; # make sure the handle is ready immediately
};
};
t/basic-tcp.t view on Meta::CPAN
use warnings;
use Test::More;
use AnyEvent::ZeroMQ;
use ZeroMQ::Raw;
use ZeroMQ::Raw::Constants qw(ZMQ_NOBLOCK ZMQ_PUB ZMQ_SUB ZMQ_SUBSCRIBE);
my $c = ZeroMQ::Raw::Context->new( threads => 10 );
my $pub = ZeroMQ::Raw::Socket->new($c, ZMQ_PUB);
my $sub = ZeroMQ::Raw::Socket->new($c, ZMQ_SUB);
$pub->bind('tcp://127.0.0.1:1234');
$sub->connect('tcp://127.0.0.1:1234');
$sub->setsockopt(ZMQ_SUBSCRIBE, '');
my $cv = AnyEvent->condvar;
$cv->begin; # wait for writability
$cv->begin; # wait for readability
my $got;
my $r = AnyEvent::ZeroMQ->io( poll => 'r', socket => $sub, cb => sub {
t/handle-basic-tcp.t view on Meta::CPAN
ok $pub_h, 'got publish handle';
ok $sub_h, 'got subscribe handle';
is $pub_h->identity, 'pub_h', 'got cached id';
is $pub_h->socket->getsockopt( ZMQ_IDENTITY ), 'pub_h', 'got real id';
$sub_h->identity('sub_h');
is $sub_h->socket->getsockopt( ZMQ_IDENTITY ), 'sub_h',
'got real id after using accessor';
$pub->bind('tcp://127.0.0.1:1235');
$sub->connect('tcp://127.0.0.1:1235');
$sub->setsockopt(ZMQ_SUBSCRIBE, '');
my $cv = AnyEvent->condvar;
$cv->begin for 1..2; # read x2
my ($a, $b);
$sub_h->push_read(sub {
my ($h, $data) = @_;
$a = $data;
t/publish-subscribe-multiconnect.t view on Meta::CPAN
use ok 'AnyEvent::ZeroMQ::Publish';
use ok 'AnyEvent::ZeroMQ::Subscribe';
my $ENDPOINT1 = 'inproc://#1';
my $ENDPOINT2 = 'inproc://#2';
my $c = ZeroMQ::Raw::Context->new( threads => 0 );
my $sub = AnyEvent::ZeroMQ::Subscribe->new(
context => $c,
bind => $ENDPOINT1,
);
my $pub = AnyEvent::ZeroMQ::Publish->new(
context => $c,
connect => $ENDPOINT1,
);
# $pub pushes messages to $sub
my $cv = AnyEvent->condvar;
t/publish-subscribe-multiconnect.t view on Meta::CPAN
my @got;
$cv->begin for 1..2;
my $cb = sub { push @got, $_[1], $cv->end };
$sub->push_read($cb) for 1..2;
$pub->push_write('foo');
$pub2->push_write('bar');
$cv->recv;
is_deeply [sort @got], [qw/bar foo/], 'got messages from two publishers';
# now $pub2 is also accepting other subscribers
$pub2->bind($ENDPOINT2);
# and $sub2 is accepting $pub2's messages
my $sub2 = AnyEvent::ZeroMQ::Subscribe->new(
context => $c,
connect => $ENDPOINT2,
);
# $pub2 publishes a message, and both subscribers get it
@got = ();
$cv = AnyEvent->condvar;
t/publish-subscribe-topics.t view on Meta::CPAN
use Test::More;
use AnyEvent::ZeroMQ::Publish;
use AnyEvent::ZeroMQ::Subscribe;
my $ENDPOINT = 'inproc://#1';
my $c = ZeroMQ::Raw::Context->new( threads => 0 );
my $pub = AnyEvent::ZeroMQ::Publish->with_traits('Topics')->new(
context => $c,
bind => $ENDPOINT,
);
my $sub = AnyEvent::ZeroMQ::Subscribe->with_traits('Topics')->new(
context => $c,
connect => $ENDPOINT,
topics => [qw/foo: bar:/],
);
my $cv = AE::cv;
$cv->begin for 1..2;
t/publish-subscribe.t view on Meta::CPAN
use ok 'AnyEvent::ZeroMQ::Role::WithHandle';
use ok 'AnyEvent::ZeroMQ::Publish';
use ok 'AnyEvent::ZeroMQ::Subscribe';
my $ENDPOINT = 'inproc://#1';
my $c = ZeroMQ::Raw::Context->new( threads => 0 );
my $pub = AnyEvent::ZeroMQ::Publish->new(
context => $c,
bind => $ENDPOINT,
);
my $all = AnyEvent::ZeroMQ::Subscribe->new(
context => $c,
connect => $ENDPOINT,
);
my $foo = AnyEvent::ZeroMQ::Subscribe->new(
context => $c,
connect => $ENDPOINT,
t/push-pull-multi.t view on Meta::CPAN
# \ \ / |
# \->[A]<-----X /
# ------------- \ -------------
# | manager a | \-| manager b |
# ------------- -------------
# PUSH PUSH
#
my $manager_a = AnyEvent::ZeroMQ::Push->new(
context => $c,
bind => $ENDPOINT_A,
);
my $worker_a = AnyEvent::ZeroMQ::Pull->new(
context => $c,
bind => $ENDPOINT_B,
connect => $ENDPOINT_A,
);
my $worker_b = AnyEvent::ZeroMQ::Pull->new(
context => $c,
bind => $ENDPOINT_C,
connect => $ENDPOINT_A,
);
my $manager_b = AnyEvent::ZeroMQ::Push->new(
context => $c,
connect => $ENDPOINT_B,
connect => $ENDPOINT_C,
);
my %results;
t/push-pull.t view on Meta::CPAN
use warnings;
use Test::More;
use ok 'AnyEvent::ZeroMQ::Push';
use ok 'AnyEvent::ZeroMQ::Pull';
my $ENDPOINT = 'inproc://#1';
my @c = (context => ZeroMQ::Raw::Context->new( threads => 0 ));
my $server = AnyEvent::ZeroMQ::Push->new( @c, bind => $ENDPOINT );
my $client_a = AnyEvent::ZeroMQ::Pull->new( @c, connect => $ENDPOINT );
my $cv = AnyEvent->condvar;
my @to_write = qw/a b/;
my ($a, $b, $drain) = (0, 0, 0);
$cv->begin for 1..8;
$client_a->on_read(sub { $a++; $cv->end });
my $client_b = AnyEvent::ZeroMQ::Pull->new(
t/request-reply.t view on Meta::CPAN
use ok 'AnyEvent::ZeroMQ::Reply';
my $on_request;
my $_on_request = sub { eval { $on_request->() } };
my $ENDPOINT = 'inproc://#1';
my $c = ZeroMQ::Raw::Context->new( threads => 0 );
my $req = AnyEvent::ZeroMQ::Request->new(
context => $c,
bind => $ENDPOINT,
);
my $rep = AnyEvent::ZeroMQ::Reply->new(
context => $c,
connect => $ENDPOINT,
on_request => sub {
my ($h, $req) = @_;
$_on_request->();
$req++;
return $req;