AnyEvent-ZeroMQ

 view release on metacpan or  search on metacpan

lib/AnyEvent/ZeroMQ/Handle.pm  view on Meta::CPAN

package AnyEvent::ZeroMQ::Handle;
BEGIN {
  $AnyEvent::ZeroMQ::Handle::VERSION = '0.01';
}
# ABSTRACT: AnyEvent::Handle-like interface for 0MQ sockets
use Moose;

use AnyEvent::ZeroMQ;
use AnyEvent::ZeroMQ::Types qw(IdentityStr);
use ZeroMQ::Raw::Constants qw(ZMQ_NOBLOCK ZMQ_IDENTITY);

use Params::Util qw(_CODELIKE);
use Scalar::Util qw(weaken);
use Try::Tiny;
use POSIX qw(EAGAIN EWOULDBLOCK);

use true;
use namespace::autoclean;

has 'socket' => (
    is       => 'ro',
    isa      => 'ZeroMQ::Raw::Socket',
    handles  => [qw/bind connect/],
    required => 1,
);

before qw/bind connect/ => sub {
    $_[0]->identity;
};

after qw/bind connect/ => sub {
    my $self = shift;
    # this can change readability/writability status, so do the checks
    # again
    $self->read;
    $self->write;
};

has 'identity' => (
    is         => 'rw',    # note: you can change this, but it has
                           # no effect until a new bind/connect.
    isa        => IdentityStr,
    lazy_build => 1,
    trigger    => sub { shift->_change_identity(@_) },
);

has 'on_read' => (
    is        => 'rw',
    isa       => 'CodeRef',
    predicate => 'has_on_read',
    clearer   => 'clear_on_read',
    trigger   => sub { $_[0]->read },
);

has 'on_error' => (
    is        => 'rw',
    isa       => 'CodeRef',
    predicate => 'has_on_error',
    clearer   => 'clear_on_error',
);

sub handle_error {
    my ($self, $str) = @_;
    return $self->on_error->($str)
        if $self->has_on_error;

    warn "AnyEvent::ZeroMQ::Handle: error in callback (ignoring): $str";
}

has 'on_drain' => (
    is        => 'rw',
    isa       => 'CodeRef',
    predicate => 'has_on_drain',
    clearer   => 'clear_on_drain',
    # i don't think we need to trigger this, since if we were
    # writable, we would be drained.
);

has [qw/read_watcher write_watcher/] => (
    init_arg   => undef,
    is         => 'ro',
    lazy_build => 1,
);

has [qw/read_buffer write_buffer/] => (
    init_arg => undef,
    is       => 'ro',
    default  => sub { [] },
);

sub _build_read_watcher {
    my $self = shift;
    weaken $self;
    return AnyEvent::ZeroMQ->io(
        poll   => 'r',
        socket => $self->socket,
        cb     => sub { $self->read },
    );
}

sub _build_write_watcher {
    my $self = shift;
    weaken $self;
    return AnyEvent::ZeroMQ->io(
        poll   => 'w',
        socket => $self->socket,
        cb     => sub { $self->write },
    );
}

sub _build_identity {
    my ($self) = @_;
    return $self->socket->getsockopt( ZMQ_IDENTITY );
}

sub _change_identity {
    my ($self, $new, $old) = @_;
    return $self->socket->setsockopt( ZMQ_IDENTITY, $new );
}

sub has_read_todo {
    my $self = shift;
    return exists $self->read_buffer->[0];
}

sub readable {
    my $self = shift;
    return AnyEvent::ZeroMQ->probe( poll => 'r', socket => $self->socket );
}

sub _read_once {
    my ($self, $cb) = @_;
    local $! = 0;
    try {
        my $msg = ZeroMQ::Raw::Message->new;
        $self->socket->recv($msg, ZMQ_NOBLOCK);
        $cb->($self, $msg->data);
    }
    catch {
        if($! == EWOULDBLOCK || $! == EAGAIN){
            return;
        }
        else {
            $self->handle_error($_);
        }
    };
}

sub read {
    my $self = shift;

    while($self->readable && $self->has_read_todo){
        $self->_read_once(shift @{$self->read_buffer});
    }

    while($self->readable && $self->has_on_read){
        $self->_read_once($self->on_read);
    }

    if($self->has_read_todo || $self->has_on_read){
        # ensure we have a watcher
        $self->read_watcher;
    }



( run in 1.870 second using v1.01-cache-2.11-cpan-39bf76dae61 )