AnyEvent-Stomper
view release on metacpan or search on metacpan
lib/AnyEvent/Stomper/Cluster.pm view on Meta::CPAN
package AnyEvent::Stomper::Cluster;
use 5.008000;
use strict;
use warnings;
use base qw( Exporter );
our $VERSION = '0.36';
use AnyEvent::Stomper;
use AnyEvent::Stomper::Error;
use Scalar::Util qw( weaken );
use Carp qw( croak );
our %ERROR_CODES;
BEGIN {
%ERROR_CODES = %AnyEvent::Stomper::Error::ERROR_CODES;
our @EXPORT_OK = keys %ERROR_CODES;
our %EXPORT_TAGS = ( err_codes => \@EXPORT_OK );
}
use constant \%ERROR_CODES;
my %ACK_CMDS = (
ACK => 1,
NACK => 1,
);
my %CAN_REPEAT = (
SEND => 1,
SUBSCRIBE => 1,
BEGIN => 1,
);
sub new {
my $class = shift;
my %params = @_;
my $self = bless {}, $class;
unless ( defined $params{nodes} ) {
croak 'Nodes not specified';
}
unless ( ref( $params{nodes} ) eq 'ARRAY' ) {
croak 'Nodes must be specified as array reference';
}
unless ( @{ $params{nodes} } ) {
croak 'Specified empty list of nodes';
}
$self->{nodes} = $params{nodes};
$self->{on_node_connect} = $params{on_node_connect};
$self->{on_node_disconnect} = $params{on_node_disconnect};
$self->{on_node_error} = $params{on_node_error};
$self->on_error( $params{on_error} );
my %node_params;
foreach my $name ( qw( login passcode vhost heartbeat connection_timeout
reconnect_interval handle_params default_headers command_headers ) )
{
next unless defined $params{$name};
$node_params{$name} = $params{$name};
}
$self->{_node_params} = \%node_params;
$self->_reset_internals;
$self->_init;
return $self;
}
lib/AnyEvent/Stomper/Cluster.pm view on Meta::CPAN
warn $err->message . "\n";
};
}
}
return $self->{on_error};
}
sub force_disconnect {
my $self = shift;
foreach my $node ( $self->nodes ) {
$node->force_disconnect;
}
$self->_reset_internals;
return;
}
sub _init {
my $self = shift;
my $nodes_pool = $self->{_nodes_pool};
foreach my $node_params ( @{ $self->{nodes} } ) {
my $hostport = "$node_params->{host}:$node_params->{port}";
unless ( defined $nodes_pool->{$hostport} ) {
$nodes_pool->{$hostport}
= $self->_new_node( $node_params->{host}, $node_params->{port} );
}
}
$self->{_nodes} = [ keys %{ $self->{_nodes_pool} } ];
$self->{_active_node} = $self->_next_node;
return;
}
sub _new_node {
my $self = shift;
my $host = shift;
my $port = shift;
return AnyEvent::Stomper->new(
%{ $self->{_node_params} },
host => $host,
port => $port,
lazy => 1,
on_connect => $self->_create_on_node_connect( $host, $port ),
on_disconnect => $self->_create_on_node_disconnect( $host, $port ),
on_error => $self->_create_on_node_error( $host, $port ),
);
}
sub _create_on_node_connect {
my $self = shift;
my $host = shift;
my $port = shift;
weaken($self);
return sub {
if ( defined $self->{on_node_connect} ) {
$self->{on_node_connect}->( $host, $port );
}
};
}
sub _create_on_node_disconnect {
my $self = shift;
my $host = shift;
my $port = shift;
weaken($self);
return sub {
if ( defined $self->{on_node_disconnect} ) {
$self->{on_node_disconnect}->( $host, $port );
}
};
}
sub _create_on_node_error {
my $self = shift;
my $host = shift;
my $port = shift;
weaken($self);
return sub {
my $err = shift;
my $err_code = $err->code;
if ( $err_code != E_OPRN_ERROR
&& $err_code != E_CONN_CLOSED_BY_CLIENT )
{
$self->{_active_node} = $self->_next_node;
}
if ( defined $self->{on_node_error} ) {
$self->{on_node_error}->( $err, $host, $port );
}
};
}
sub _prepare {
my $self = shift;
my $cmd_name = uc(shift);
my $args = shift;
my %params;
if ( ref( $args->[-1] ) eq 'CODE'
&& scalar @{$args} % 2 > 0 )
{
if ( $cmd_name eq 'SUBSCRIBE' ) {
$params{on_message} = pop @{$args};
}
else {
$params{on_receipt} = pop @{$args};
}
}
my %headers = @{$args};
foreach my $name ( qw( body on_receipt on_message on_node_error ) ) {
if ( defined $headers{$name} ) {
$params{$name} = delete $headers{$name};
}
}
if ( exists $ACK_CMDS{$cmd_name} ) {
$params{message} = delete $headers{message};
}
my $cmd = {
name => $cmd_name,
headers => \%headers,
%params,
};
unless ( defined $cmd->{on_receipt} ) {
weaken($self);
$cmd->{on_receipt} = sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
$self->{on_error}->($err);
return;
}
};
}
return $cmd;
}
sub _execute {
my $self = shift;
my $cmd = shift;
my $fails_cnt = shift || 0;
my $hostport = $self->{_active_node};
my $node = $self->{_nodes_pool}{$hostport};
weaken($self);
$node->execute( $cmd->{name}, %{ $cmd->{headers} },
body => $cmd->{body},
on_receipt => sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
my $err_code = $err->code;
my $on_node_error = $cmd->{on_node_error} || $self->{on_node_error};
if ( defined $on_node_error ) {
my $node = $self->{_nodes_pool}{$hostport};
$on_node_error->( $err, $node->host, $node->port );
}
if ( $CAN_REPEAT{ $cmd->{name} }
&& $err_code != E_OPRN_ERROR
&& $err_code != E_CONN_CLOSED_BY_CLIENT
&& ++$fails_cnt < scalar @{ $self->{_nodes} } )
{
$self->_execute( $cmd, $fails_cnt );
return;
}
$cmd->{on_receipt}->( $receipt, $err );
return;
}
$cmd->{on_receipt}->($receipt);
},
defined $cmd->{message}
? ( message => $cmd->{message} )
: (),
defined $cmd->{on_message}
? ( on_message => $cmd->{on_message} )
: (),
);
return;
}
sub _next_node {
my $self = shift;
unless ( defined $self->{_node_index} ) {
$self->{_node_index} = int( rand( scalar @{ $self->{_nodes} } ) );
}
elsif ( $self->{_node_index} == scalar @{ $self->{_nodes} } ) {
$self->{_node_index} = 0;
}
return $self->{_nodes}[ $self->{_node_index}++ ];
}
sub _reset_internals {
( run in 1.140 second using v1.01-cache-2.11-cpan-d8267643d1d )