AnyEvent-Stomper
view release on metacpan or search on metacpan
lib/AnyEvent/Stomper.pm view on Meta::CPAN
package AnyEvent::Stomper;
use 5.008000;
use strict;
use warnings;
use base qw( Exporter );
our $VERSION = '0.36';
use AnyEvent::Stomper::Frame;
use AnyEvent::Stomper::Error;
use AnyEvent;
use AnyEvent::Handle;
use Scalar::Util qw( looks_like_number weaken );
use List::Util qw( max );
use List::MoreUtils qw( bsearch_index );
use Carp qw( croak );
my %ERROR_CODES;
BEGIN {
%ERROR_CODES = %AnyEvent::Stomper::Error::ERROR_CODES;
our @EXPORT_OK = keys %ERROR_CODES;
our %EXPORT_TAGS = ( err_codes => \@EXPORT_OK );
}
use constant {
# Default values
D_HOST => 'localhost',
D_PORT => 61613,
D_HEARTBEAT => [ 0, 0 ],
%ERROR_CODES,
# Operation status
S_NEED_DO => 1,
S_IN_PROGRESS => 2,
S_DONE => 3,
EOL => "\n",
RE_EOL => qr/\r?\n/,
};
my %SUBUNSUB_CMDS = (
SUBSCRIBE => 1,
UNSUBSCRIBE => 1,
);
my %ACK_CMDS = (
ACK => 1,
NACK => 1,
);
my %NEED_RECEIPT = (
CONNECT => 1,
DISCONNECT => 1,
%SUBUNSUB_CMDS,
);
my %ESCAPE_MAP = (
"\r" => "\\r",
"\n" => "\\n",
':' => "\\c",
"\\" => "\\\\",
);
my %UNESCAPE_MAP = reverse %ESCAPE_MAP;
my $RECEIPT_SEQ = 1;
my $MESSAGE_SEQ = 1;
sub new {
my $class = shift;
my %params = @_;
lib/AnyEvent/Stomper.pm view on Meta::CPAN
if (@_) {
my $seconds = shift;
if ( defined $seconds
&& ( !looks_like_number($seconds) || $seconds < 0 ) )
{
croak qq{"$name" must be a positive number};
}
$self->{$name} = $seconds;
}
return $self->{$name};
};
}
foreach my $name ( qw( on_connect on_disconnect ) ) {
*{$name} = sub {
my $self = shift;
if (@_) {
$self->{$name} = shift;
}
return $self->{$name};
};
}
}
sub force_disconnect {
my $self = shift;
$self->_disconnect();
return;
}
sub _connect {
my $self = shift;
$self->{_handle} = AnyEvent::Handle->new(
%{ $self->{handle_params} },
connect => [ $self->{host}, $self->{port} ],
on_prepare => $self->_create_on_prepare,
on_connect => $self->_create_on_connect,
on_connect_error => $self->_create_on_connect_error,
on_wtimeout => $self->_create_on_wtimeout,
on_rtimeout => $self->_create_on_rtimeout,
on_eof => $self->_create_on_eof,
on_error => $self->_create_on_handle_error,
on_drain => $self->_create_on_drain,
on_read => $self->_create_on_read,
);
return;
}
sub _create_on_prepare {
my $self = shift;
weaken($self);
return sub {
if ( defined $self->{connection_timeout} ) {
return $self->{connection_timeout};
}
return;
};
}
sub _create_on_connect {
my $self = shift;
weaken($self);
return sub {
$self->{_connected} = 1;
$self->_login;
if ( defined $self->{on_connect} ) {
$self->{on_connect}->();
}
};
}
sub _create_on_connect_error {
my $self = shift;
weaken($self);
return sub {
my $err_msg = pop;
my $err = _new_error(
"Can't connect to $self->{host}:$self->{port}: $err_msg",
E_CANT_CONN
);
$self->_disconnect($err);
};
}
sub _create_on_wtimeout {
my $self = shift;
weaken($self);
return sub {
$self->{_handle}->push_write(EOL);
};
}
sub _create_on_rtimeout {
my $self = shift;
weaken($self);
return sub {
my $err = _new_error( 'Read timed out.', E_READ_TIMEDOUT );
$self->_disconnect($err);
};
}
sub _create_on_eof {
my $self = shift;
weaken($self);
return sub {
my $err = _new_error( 'Connection closed by remote host.',
E_CONN_CLOSED_BY_REMOTE_HOST );
$self->_disconnect($err);
};
}
sub _create_on_handle_error {
my $self = shift;
weaken($self);
return sub {
my $err_msg = pop;
my $err = _new_error( $err_msg, E_IO );
$self->_disconnect($err);
};
}
sub _create_on_drain {
my $self = shift;
weaken($self);
return sub {
return unless @{ $self->{_write_queue} };
$self->{_temp_write_queue} = $self->{_write_queue};
$self->{_write_queue} = [];
while ( my $cmd = shift @{ $self->{_temp_write_queue} } ) {
$cmd->{on_receipt}->();
}
};
}
sub _create_on_read {
my $self = shift;
weaken($self);
my $cmd_name;
my $headers;
return sub {
my $handle = shift;
my $frame;
while (1) {
return if $handle->destroyed;
if ( defined $cmd_name ) {
my $content_length;
if ( defined $headers->{'content-length'} ) {
$content_length = $headers->{'content-length'};
return if length( $handle->{rbuf} ) < $content_length + 1;
}
else {
$content_length = index( $handle->{rbuf}, "\0" );
return if $content_length < 0
}
my $body = substr( $handle->{rbuf}, 0, $content_length, '' );
$handle->{rbuf} =~ s/^\0(?:${\(RE_EOL)})*//;
$frame = _new_frame( $cmd_name, $headers, $body );
undef $cmd_name;
undef $headers;
}
else {
$handle->{rbuf} =~ s/^(?:${\(RE_EOL)})+//;
return unless $handle->{rbuf} =~ s/^(.+?)(?:${\(RE_EOL)}){2}//s;
( $cmd_name, my @header_strings ) = split( m/${\(RE_EOL)}/, $1 );
foreach my $header_str (@header_strings) {
my ( $name, $value ) = split( /:/, $header_str, 2 );
$headers->{ _unescape($name) } = _unescape($value);
}
next;
}
$self->_process_frame($frame);
}
};
}
sub _prepare {
my $self = shift;
my $cmd_name = uc(shift);
my $args = shift;
my %params;
if ( ref( $args->[-1] ) eq 'CODE'
&& scalar @{$args} % 2 > 0 )
{
if ( $cmd_name eq 'SUBSCRIBE' ) {
$params{on_message} = pop @{$args};
}
else {
$params{on_receipt} = pop @{$args};
}
}
my %headers = @{$args};
foreach my $name ( qw( body on_receipt on_message ) ) {
if ( defined $headers{$name} ) {
$params{$name} = delete $headers{$name};
}
}
if ( exists $ACK_CMDS{$cmd_name} ) {
$params{message} = delete $headers{message};
}
%headers = (
%{ $self->{default_headers} },
defined $self->{command_headers}{$cmd_name}
? %{ $self->{command_headers}{$cmd_name} }
: (),
%headers,
);
my $cmd = {
name => $cmd_name,
headers => \%headers,
%params,
};
unless ( defined $cmd->{on_receipt} ) {
weaken($self);
$cmd->{on_receipt} = sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
$self->{on_error}->($err);
return;
}
};
}
return $cmd;
}
sub _execute {
my $self = shift;
my $cmd = shift;
if ( $cmd->{name} eq 'SUBSCRIBE'
&& !defined $cmd->{on_message} )
{
croak '"on_message" callback must be specified';
}
elsif ( exists $ACK_CMDS{ $cmd->{name} }
&& !defined $cmd->{message} )
{
croak '"message" parameter must be specified';
}
unless ( $self->{_ready} ) {
if ( defined $self->{_handle} ) {
if ( $self->{_connected} ) {
if ( $self->{_login_state} == S_NEED_DO ) {
$self->_login;
}
}
}
elsif ( $self->{lazy} ) {
undef $self->{lazy};
$self->_connect;
}
else {
if ( defined $self->{reconnect_interval}
&& $self->{reconnect_interval} > 0 )
{
unless ( defined $self->{_reconnect_timer} ) {
weaken($self);
$self->{_reconnect_timer} = AE::timer(
$self->{reconnect_interval}, 0,
sub {
undef $self->{_reconnect_timer};
$self->_connect;
}
);
}
}
else {
$self->_connect;
}
}
push( @{ $self->{_input_queue} }, $cmd );
return;
}
$self->_push_write($cmd);
return;
}
sub _push_write {
my $self = shift;
my $cmd = shift;
my $cmd_headers = $cmd->{headers};
if ( exists $ACK_CMDS{ $cmd->{name} } ) {
unless ( $self->_check_ack( $cmd->{message} ) ) {
my $err = _new_error( "Unexpected $cmd->{name} sent.", E_OPRN_ERROR );
AE::postpone { $cmd->{on_receipt}->( undef, $err ) };
return;
}
my $msg_headers = $cmd->{message}->headers;
if ( $self->{_version} <= 1.1 ) {
$cmd_headers->{'message-id'} = $msg_headers->{'message-id'};
if ( $self->{_version} > 1.0 ) {
$cmd_headers->{subscription} = $msg_headers->{subscription};
}
}
else {
$cmd_headers->{id} = $msg_headers->{ack};
}
}
if ( exists $NEED_RECEIPT{ $cmd->{name} }
|| defined $cmd_headers->{receipt} )
{
if ( $cmd->{name} eq 'CONNECT' ) {
$self->{_pending_receipts}{CONNECTED} = $cmd;
}
else {
if ( !defined $cmd_headers->{receipt}
|| $cmd_headers->{receipt} eq 'auto' )
{
$cmd_headers->{receipt} = 'R_' . $self->{_session_id} . '.'
. $RECEIPT_SEQ++;
}
$self->{_pending_receipts}{ $cmd_headers->{receipt} } = $cmd;
}
}
else {
push( @{ $self->{_write_queue} }, $cmd );
}
my $body = $cmd->{body};
unless ( defined $body ) {
$body = '';
}
unless ( defined $cmd_headers->{'content-length'} ) {
$cmd_headers->{'content-length'} = length($body);
}
my $frame_str = $cmd->{name} . EOL;
while ( my ( $name, $value ) = each %{$cmd_headers} ) {
unless ( defined $value ) {
$value = '';
}
$frame_str .= _escape($name) . ':' . _escape($value) . EOL;
}
$frame_str .= EOL . "$body\0";
$self->{_handle}->push_write($frame_str);
return;
}
sub _login {
my $self = shift;
my ( $cx, $cy ) = @{ $self->{heartbeat} };
if ( $cy > 0 ) {
$self->_rtimeout($cy);
}
my %cmd_headers = (
'accept-version' => '1.0,1.1,1.2',
'heart-beat' => join( ',', $cx, $cy ),
);
if ( defined $self->{login} ) {
$cmd_headers{login} = $self->{login};
}
if ( defined $self->{passcode} ) {
$cmd_headers{passcode} = $self->{passcode};
}
if ( defined $self->{vhost} ) {
$cmd_headers{host} = $self->{vhost};
}
weaken($self);
$self->{_login_state} = S_IN_PROGRESS;
$self->_push_write(
{ name => 'CONNECT',
headers => \%cmd_headers,
on_receipt => sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
$self->{_login_state} = S_NEED_DO;
$self->_abort($err);
return;
}
$self->{_login_state} = S_DONE;
my $receipt_headers = $receipt->headers;
if ( defined $receipt_headers->{'heart-beat'} ) {
my ( $sx, $sy ) = split( /,/, $receipt_headers->{'heart-beat'} );
if ( $sx > 0 ) {
$self->_rtimeout( max( $cy, $sx ) );
}
if ( $sy > 0 ) {
$self->_wtimeout( max( $cx, $sy ) );
}
}
$self->{_ready} = 1;
$self->{_version}
= version->parse( $receipt_headers->{version} || 1.0 );
$self->{_session_id} = $receipt_headers->{session} || '';
$self->_process_input_queue;
},
}
);
return;
}
sub _rtimeout {
my $self = shift;
my $rtimeout = shift;
$self->{_handle}->rtimeout_reset;
$self->{_handle}->rtimeout( ( $rtimeout / 1000 ) * 3 );
return;
}
sub _wtimeout {
my $self = shift;
my $wtimeout = shift;
$self->{_handle}->wtimeout_reset;
( run in 0.687 second using v1.01-cache-2.11-cpan-d8267643d1d )