AnyEvent-Stomper
view release on metacpan or search on metacpan
lib/AnyEvent/Stomper.pm view on Meta::CPAN
use warnings;
use base qw( Exporter );
our $VERSION = '0.36';
use AnyEvent::Stomper::Frame;
use AnyEvent::Stomper::Error;
use AnyEvent;
use AnyEvent::Handle;
use Scalar::Util qw( looks_like_number weaken );
use List::Util qw( max );
use List::MoreUtils qw( bsearch_index );
use Carp qw( croak );
my %ERROR_CODES;
BEGIN {
%ERROR_CODES = %AnyEvent::Stomper::Error::ERROR_CODES;
our @EXPORT_OK = keys %ERROR_CODES;
our %EXPORT_TAGS = ( err_codes => \@EXPORT_OK );
lib/AnyEvent/Stomper.pm view on Meta::CPAN
on_drain => $self->_create_on_drain,
on_read => $self->_create_on_read,
);
return;
}
sub _create_on_prepare {
my $self = shift;
weaken($self);
return sub {
if ( defined $self->{connection_timeout} ) {
return $self->{connection_timeout};
}
return;
};
}
sub _create_on_connect {
my $self = shift;
weaken($self);
return sub {
$self->{_connected} = 1;
$self->_login;
if ( defined $self->{on_connect} ) {
$self->{on_connect}->();
}
};
}
sub _create_on_connect_error {
my $self = shift;
weaken($self);
return sub {
my $err_msg = pop;
my $err = _new_error(
"Can't connect to $self->{host}:$self->{port}: $err_msg",
E_CANT_CONN
);
$self->_disconnect($err);
};
}
sub _create_on_wtimeout {
my $self = shift;
weaken($self);
return sub {
$self->{_handle}->push_write(EOL);
};
}
sub _create_on_rtimeout {
my $self = shift;
weaken($self);
return sub {
my $err = _new_error( 'Read timed out.', E_READ_TIMEDOUT );
$self->_disconnect($err);
};
}
sub _create_on_eof {
my $self = shift;
weaken($self);
return sub {
my $err = _new_error( 'Connection closed by remote host.',
E_CONN_CLOSED_BY_REMOTE_HOST );
$self->_disconnect($err);
};
}
sub _create_on_handle_error {
my $self = shift;
weaken($self);
return sub {
my $err_msg = pop;
my $err = _new_error( $err_msg, E_IO );
$self->_disconnect($err);
};
}
sub _create_on_drain {
my $self = shift;
weaken($self);
return sub {
return unless @{ $self->{_write_queue} };
$self->{_temp_write_queue} = $self->{_write_queue};
$self->{_write_queue} = [];
while ( my $cmd = shift @{ $self->{_temp_write_queue} } ) {
$cmd->{on_receipt}->();
}
};
}
sub _create_on_read {
my $self = shift;
weaken($self);
my $cmd_name;
my $headers;
return sub {
my $handle = shift;
my $frame;
while (1) {
lib/AnyEvent/Stomper.pm view on Meta::CPAN
%headers,
);
my $cmd = {
name => $cmd_name,
headers => \%headers,
%params,
};
unless ( defined $cmd->{on_receipt} ) {
weaken($self);
$cmd->{on_receipt} = sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
$self->{on_error}->($err);
return;
}
};
lib/AnyEvent/Stomper.pm view on Meta::CPAN
}
elsif ( $self->{lazy} ) {
undef $self->{lazy};
$self->_connect;
}
else {
if ( defined $self->{reconnect_interval}
&& $self->{reconnect_interval} > 0 )
{
unless ( defined $self->{_reconnect_timer} ) {
weaken($self);
$self->{_reconnect_timer} = AE::timer(
$self->{reconnect_interval}, 0,
sub {
undef $self->{_reconnect_timer};
$self->_connect;
}
);
}
}
lib/AnyEvent/Stomper.pm view on Meta::CPAN
if ( defined $self->{login} ) {
$cmd_headers{login} = $self->{login};
}
if ( defined $self->{passcode} ) {
$cmd_headers{passcode} = $self->{passcode};
}
if ( defined $self->{vhost} ) {
$cmd_headers{host} = $self->{vhost};
}
weaken($self);
$self->{_login_state} = S_IN_PROGRESS;
$self->_push_write(
{ name => 'CONNECT',
headers => \%cmd_headers,
on_receipt => sub {
my $receipt = shift;
my $err = shift;
lib/AnyEvent/Stomper/Cluster.pm view on Meta::CPAN
use 5.008000;
use strict;
use warnings;
use base qw( Exporter );
our $VERSION = '0.36';
use AnyEvent::Stomper;
use AnyEvent::Stomper::Error;
use Scalar::Util qw( weaken );
use Carp qw( croak );
our %ERROR_CODES;
BEGIN {
%ERROR_CODES = %AnyEvent::Stomper::Error::ERROR_CODES;
our @EXPORT_OK = keys %ERROR_CODES;
our %EXPORT_TAGS = ( err_codes => \@EXPORT_OK );
}
lib/AnyEvent/Stomper/Cluster.pm view on Meta::CPAN
on_disconnect => $self->_create_on_node_disconnect( $host, $port ),
on_error => $self->_create_on_node_error( $host, $port ),
);
}
sub _create_on_node_connect {
my $self = shift;
my $host = shift;
my $port = shift;
weaken($self);
return sub {
if ( defined $self->{on_node_connect} ) {
$self->{on_node_connect}->( $host, $port );
}
};
}
sub _create_on_node_disconnect {
my $self = shift;
my $host = shift;
my $port = shift;
weaken($self);
return sub {
if ( defined $self->{on_node_disconnect} ) {
$self->{on_node_disconnect}->( $host, $port );
}
};
}
sub _create_on_node_error {
my $self = shift;
my $host = shift;
my $port = shift;
weaken($self);
return sub {
my $err = shift;
my $err_code = $err->code;
if ( $err_code != E_OPRN_ERROR
&& $err_code != E_CONN_CLOSED_BY_CLIENT )
{
$self->{_active_node} = $self->_next_node;
lib/AnyEvent/Stomper/Cluster.pm view on Meta::CPAN
$params{message} = delete $headers{message};
}
my $cmd = {
name => $cmd_name,
headers => \%headers,
%params,
};
unless ( defined $cmd->{on_receipt} ) {
weaken($self);
$cmd->{on_receipt} = sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
$self->{on_error}->($err);
return;
}
};
lib/AnyEvent/Stomper/Cluster.pm view on Meta::CPAN
}
sub _execute {
my $self = shift;
my $cmd = shift;
my $fails_cnt = shift || 0;
my $hostport = $self->{_active_node};
my $node = $self->{_nodes_pool}{$hostport};
weaken($self);
$node->execute( $cmd->{name}, %{ $cmd->{headers} },
body => $cmd->{body},
on_receipt => sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
my $err_code = $err->code;
( run in 0.764 second using v1.01-cache-2.11-cpan-49f99fa48dc )