AnyEvent-Stomper
view release on metacpan or search on metacpan
lib/AnyEvent/Stomper.pm view on Meta::CPAN
package AnyEvent::Stomper;
use 5.008000;
use strict;
use warnings;
use base qw( Exporter );
our $VERSION = '0.36';
use AnyEvent::Stomper::Frame;
use AnyEvent::Stomper::Error;
use AnyEvent;
use AnyEvent::Handle;
use Scalar::Util qw( looks_like_number weaken );
use List::Util qw( max );
use List::MoreUtils qw( bsearch_index );
use Carp qw( croak );
my %ERROR_CODES;
BEGIN {
%ERROR_CODES = %AnyEvent::Stomper::Error::ERROR_CODES;
our @EXPORT_OK = keys %ERROR_CODES;
our %EXPORT_TAGS = ( err_codes => \@EXPORT_OK );
}
use constant {
# Default values
D_HOST => 'localhost',
D_PORT => 61613,
D_HEARTBEAT => [ 0, 0 ],
%ERROR_CODES,
# Operation status
S_NEED_DO => 1,
S_IN_PROGRESS => 2,
S_DONE => 3,
EOL => "\n",
RE_EOL => qr/\r?\n/,
};
my %SUBUNSUB_CMDS = (
SUBSCRIBE => 1,
UNSUBSCRIBE => 1,
);
my %ACK_CMDS = (
ACK => 1,
NACK => 1,
);
my %NEED_RECEIPT = (
CONNECT => 1,
DISCONNECT => 1,
%SUBUNSUB_CMDS,
);
my %ESCAPE_MAP = (
"\r" => "\\r",
"\n" => "\\n",
':' => "\\c",
"\\" => "\\\\",
);
my %UNESCAPE_MAP = reverse %ESCAPE_MAP;
my $RECEIPT_SEQ = 1;
my $MESSAGE_SEQ = 1;
sub new {
my $class = shift;
my %params = @_;
lib/AnyEvent/Stomper.pm view on Meta::CPAN
return;
}
# Generate methods
{
no strict qw( refs );
foreach my $name ( qw( send subscribe unsubscribe ack nack begin commit
abort disconnect ) )
{
*{$name} = sub {
my $self = shift;
my $cmd = $self->_prepare( $name, [@_] );
$self->_execute($cmd);
return;
}
}
}
sub on_error {
my $self = shift;
if (@_) {
my $on_error = shift;
if ( defined $on_error ) {
$self->{on_error} = $on_error;
}
else {
$self->{on_error} = sub {
my $err = shift;
warn $err->message . "\n";
};
}
}
return $self->{on_error};
}
# Generate accessors
{
no strict qw( refs );
foreach my $name ( qw( host port ) ) {
*{$name} = sub {
my $self = shift;
return $self->{$name};
}
}
foreach my $name ( qw( connection_timeout reconnect_interval ) ) {
*{$name} = sub {
my $self = shift;
if (@_) {
my $seconds = shift;
if ( defined $seconds
&& ( !looks_like_number($seconds) || $seconds < 0 ) )
{
croak qq{"$name" must be a positive number};
}
$self->{$name} = $seconds;
}
return $self->{$name};
};
}
foreach my $name ( qw( on_connect on_disconnect ) ) {
*{$name} = sub {
my $self = shift;
if (@_) {
$self->{$name} = shift;
}
return $self->{$name};
};
}
}
sub force_disconnect {
my $self = shift;
$self->_disconnect();
return;
}
sub _connect {
my $self = shift;
$self->{_handle} = AnyEvent::Handle->new(
%{ $self->{handle_params} },
connect => [ $self->{host}, $self->{port} ],
on_prepare => $self->_create_on_prepare,
on_connect => $self->_create_on_connect,
on_connect_error => $self->_create_on_connect_error,
on_wtimeout => $self->_create_on_wtimeout,
on_rtimeout => $self->_create_on_rtimeout,
on_eof => $self->_create_on_eof,
on_error => $self->_create_on_handle_error,
on_drain => $self->_create_on_drain,
on_read => $self->_create_on_read,
);
return;
}
sub _create_on_prepare {
my $self = shift;
weaken($self);
return sub {
if ( defined $self->{connection_timeout} ) {
return $self->{connection_timeout};
}
( run in 1.598 second using v1.01-cache-2.11-cpan-39bf76dae61 )