AnyEvent-ZeroMQ

 view release on metacpan or  search on metacpan

lib/AnyEvent/ZeroMQ/Handle.pm  view on Meta::CPAN

  $AnyEvent::ZeroMQ::Handle::VERSION = '0.01';
}
# ABSTRACT: AnyEvent::Handle-like interface for 0MQ sockets
use Moose;

use AnyEvent::ZeroMQ;
use AnyEvent::ZeroMQ::Types qw(IdentityStr);
use ZeroMQ::Raw::Constants qw(ZMQ_NOBLOCK ZMQ_IDENTITY);

use Params::Util qw(_CODELIKE);
use Scalar::Util qw(weaken);
use Try::Tiny;
use POSIX qw(EAGAIN EWOULDBLOCK);

use true;
use namespace::autoclean;

has 'socket' => (
    is       => 'ro',
    isa      => 'ZeroMQ::Raw::Socket',
    handles  => [qw/bind connect/],

lib/AnyEvent/ZeroMQ/Handle.pm  view on Meta::CPAN

);

has [qw/read_buffer write_buffer/] => (
    init_arg => undef,
    is       => 'ro',
    default  => sub { [] },
);

sub _build_read_watcher {
    my $self = shift;
    weaken $self;
    return AnyEvent::ZeroMQ->io(
        poll   => 'r',
        socket => $self->socket,
        cb     => sub { $self->read },
    );
}

sub _build_write_watcher {
    my $self = shift;
    weaken $self;
    return AnyEvent::ZeroMQ->io(
        poll   => 'w',
        socket => $self->socket,
        cb     => sub { $self->write },
    );
}

sub _build_identity {
    my ($self) = @_;
    return $self->socket->getsockopt( ZMQ_IDENTITY );

lib/AnyEvent/ZeroMQ/Reply.pm  view on Meta::CPAN

package AnyEvent::ZeroMQ::Reply;
BEGIN {
  $AnyEvent::ZeroMQ::Reply::VERSION = '0.01';
}
# ABSTRACT: Non-blocking OO abstraction over ZMQ_REP request/reply sockets
use Moose;
use true;
use namespace::autoclean;
use Scalar::Util qw(weaken);
use ZeroMQ::Raw::Constants qw(ZMQ_REP);

with 'AnyEvent::ZeroMQ::Role::WithHandle' =>
    { socket_type => ZMQ_REP, socket_direction => '' };

has 'on_request' => (
    is       => 'ro',
    isa      => 'CodeRef',
    required => 1,
);

after 'BUILD' => sub {
    my $self = shift;
    my $h = $self->handle;

    weaken $self;
    $h->on_read(sub {
        my ($h, $msg) = @_;
        my $res = $self->on_request->($self, $msg);
        $h->push_write($res);
    });
};

with 'AnyEvent::ZeroMQ::Handle::Role::Generic';

__PACKAGE__->meta->make_immutable;

lib/AnyEvent/ZeroMQ/Subscribe.pm  view on Meta::CPAN

package AnyEvent::ZeroMQ::Subscribe;
BEGIN {
  $AnyEvent::ZeroMQ::Subscribe::VERSION = '0.01';
}
# ABSTRACT: Non-blocking OO abstraction over ZMQ_SUB publish/subscribe sockets
use Moose;
use true;
use namespace::autoclean;
use MooseX::Types::Set::Object;
use Scalar::Util qw(weaken);
use ZeroMQ::Raw::Constants qw(ZMQ_SUB ZMQ_SUBSCRIBE ZMQ_UNSUBSCRIBE);

with 'AnyEvent::ZeroMQ::Role::WithHandle' =>
    { socket_type => ZMQ_SUB, socket_direction => '' },
    'MooseX::Traits';

has '+_trait_namespace' => ( default => 'AnyEvent::ZeroMQ::Subscribe::Trait' );

has 'topics' => (
    is      => 'rw',

lib/AnyEvent/ZeroMQ/Subscribe.pm  view on Meta::CPAN

    return $new;
}

has 'on_read' => (
    is        => 'rw',
    isa       => 'CodeRef',
    predicate => 'has_on_read',
    clearer   => 'clear_on_read',
    trigger   => sub {
        my ($self, $val) = @_;
        weaken $self;
        $self->handle->on_read(sub { $self->_receive_item(@_) });
    },
);

sub _receive_item {
    my ($self, $h, $item, @rest) = @_;
    # if we don't has_on_read, got_item can never be called.
    confess 'BUG: receive_item called but there is no on_read'
        unless $self->has_on_read; # but check anyway.

    $self->_call_callback( $self->on_read, $item, @rest );
}

sub _call_callback { # i wonder what this does
    my ($self, $cb, $item, @rest) = @_;
    return $cb->($self, $item, @rest); # who would have guessed!
}

sub push_read {
    my ($self, $cb) = @_;
    weaken $self;
    $self->handle->push_read(sub {
        my ($h, $item, @rest) = @_;
        $self->_call_callback($cb, $item, @rest);
    });
}

sub _subscribe {
    my ($self, $topic) = @_;
    $self->handle->socket->setsockopt(ZMQ_SUBSCRIBE, $topic);
}



( run in 0.424 second using v1.01-cache-2.11-cpan-a9ef4e587e4 )