AnyEvent-Stomper

 view release on metacpan or  search on metacpan

lib/AnyEvent/Stomper/Cluster.pm  view on Meta::CPAN

package AnyEvent::Stomper::Cluster;

use 5.008000;
use strict;
use warnings;
use base qw( Exporter );

our $VERSION = '0.36';

use AnyEvent::Stomper;
use AnyEvent::Stomper::Error;

use Scalar::Util qw( weaken );
use Carp qw( croak );

our %ERROR_CODES;

BEGIN {
  %ERROR_CODES = %AnyEvent::Stomper::Error::ERROR_CODES;
  our @EXPORT_OK   = keys %ERROR_CODES;
  our %EXPORT_TAGS = ( err_codes => \@EXPORT_OK );
}

use constant \%ERROR_CODES;

my %ACK_CMDS = (
  ACK  => 1,
  NACK => 1,
);

my %CAN_REPEAT = (
  SEND      => 1,
  SUBSCRIBE => 1,
  BEGIN     => 1,
);


sub new {
  my $class  = shift;
  my %params = @_;

  my $self = bless {}, $class;

  unless ( defined $params{nodes} ) {
    croak 'Nodes not specified';
  }
  unless ( ref( $params{nodes} ) eq 'ARRAY' ) {
    croak 'Nodes must be specified as array reference';
  }
  unless ( @{ $params{nodes} } ) {
    croak 'Specified empty list of nodes';
  }

  $self->{nodes}              = $params{nodes};
  $self->{on_node_connect}    = $params{on_node_connect};
  $self->{on_node_disconnect} = $params{on_node_disconnect};
  $self->{on_node_error}      = $params{on_node_error};
  $self->on_error( $params{on_error} );

  my %node_params;
  foreach my $name ( qw( login passcode vhost heartbeat connection_timeout
      reconnect_interval handle_params default_headers command_headers ) )
  {
    next unless defined $params{$name};
    $node_params{$name} = $params{$name};
  }
  $self->{_node_params} = \%node_params;

  $self->_reset_internals;
  $self->_init;

  return $self;
}

lib/AnyEvent/Stomper/Cluster.pm  view on Meta::CPAN

        warn $err->message . "\n";
      };
    }
  }

  return $self->{on_error};
}

sub force_disconnect {
  my $self = shift;

  foreach my $node ( $self->nodes ) {
    $node->force_disconnect;
  }
  $self->_reset_internals;

  return;
}

sub _init {
  my $self = shift;

  my $nodes_pool = $self->{_nodes_pool};

  foreach my $node_params ( @{ $self->{nodes} } ) {
    my $hostport = "$node_params->{host}:$node_params->{port}";

    unless ( defined $nodes_pool->{$hostport} ) {
      $nodes_pool->{$hostport}
          = $self->_new_node( $node_params->{host}, $node_params->{port} );
    }
  }

  $self->{_nodes}       = [ keys %{ $self->{_nodes_pool} } ];
  $self->{_active_node} = $self->_next_node;

  return;
}

sub _new_node {
  my $self = shift;
  my $host = shift;
  my $port = shift;

  return AnyEvent::Stomper->new(
    %{ $self->{_node_params} },
    host          => $host,
    port          => $port,
    lazy          => 1,
    on_connect    => $self->_create_on_node_connect( $host, $port ),
    on_disconnect => $self->_create_on_node_disconnect( $host, $port ),
    on_error      => $self->_create_on_node_error( $host, $port ),
  );
}

sub _create_on_node_connect {
  my $self = shift;
  my $host = shift;
  my $port = shift;

  weaken($self);

  return sub {
    if ( defined $self->{on_node_connect} ) {
      $self->{on_node_connect}->( $host, $port );
    }
  };
}

sub _create_on_node_disconnect {
  my $self = shift;
  my $host = shift;
  my $port = shift;

  weaken($self);

  return sub {
    if ( defined $self->{on_node_disconnect} ) {
      $self->{on_node_disconnect}->( $host, $port );
    }
  };
}

sub _create_on_node_error {
  my $self = shift;
  my $host = shift;
  my $port = shift;

  weaken($self);

  return sub {
    my $err = shift;

    my $err_code = $err->code;

    if ( $err_code != E_OPRN_ERROR
      && $err_code != E_CONN_CLOSED_BY_CLIENT )
    {
      $self->{_active_node} = $self->_next_node;
    }

    if ( defined $self->{on_node_error} ) {
      $self->{on_node_error}->( $err, $host, $port );
    }
  };
}

sub _prepare {
  my $self     = shift;
  my $cmd_name = uc(shift);
  my $args     = shift;

  my %params;

  if ( ref( $args->[-1] ) eq 'CODE'
    && scalar @{$args} % 2 > 0 )
  {
    if ( $cmd_name eq 'SUBSCRIBE' ) {
      $params{on_message} = pop @{$args};
    }
    else {
      $params{on_receipt} = pop @{$args};
    }
  }

  my %headers = @{$args};

  foreach my $name ( qw( body on_receipt on_message on_node_error ) ) {
    if ( defined $headers{$name} ) {
      $params{$name} = delete $headers{$name};
    }
  }
  if ( exists $ACK_CMDS{$cmd_name} ) {
    $params{message} = delete $headers{message};
  }

  my $cmd = {
    name    => $cmd_name,
    headers => \%headers,
    %params,
  };

  unless ( defined $cmd->{on_receipt} ) {
    weaken($self);

    $cmd->{on_receipt} = sub {
      my $receipt = shift;
      my $err     = shift;

      if ( defined $err ) {
        $self->{on_error}->($err);
        return;
      }
    };
  }

  return $cmd;
}

sub _execute {
  my $self      = shift;
  my $cmd       = shift;
  my $fails_cnt = shift || 0;

  my $hostport = $self->{_active_node};
  my $node     = $self->{_nodes_pool}{$hostport};

  weaken($self);

  $node->execute( $cmd->{name}, %{ $cmd->{headers} },
    body => $cmd->{body},

    on_receipt => sub {
      my $receipt = shift;
      my $err     = shift;

      if ( defined $err ) {
        my $err_code = $err->code;

        my $on_node_error = $cmd->{on_node_error} || $self->{on_node_error};
        if ( defined $on_node_error ) {
          my $node = $self->{_nodes_pool}{$hostport};
          $on_node_error->( $err, $node->host, $node->port );
        }

        if ( $CAN_REPEAT{ $cmd->{name} }
          && $err_code != E_OPRN_ERROR
          && $err_code != E_CONN_CLOSED_BY_CLIENT
          && ++$fails_cnt < scalar @{ $self->{_nodes} } )
        {
          $self->_execute( $cmd, $fails_cnt );
          return;
        }

        $cmd->{on_receipt}->( $receipt, $err );

        return;
      }

      $cmd->{on_receipt}->($receipt);
    },

    defined $cmd->{message}
    ? ( message => $cmd->{message} )
    : (),

    defined $cmd->{on_message}
    ? ( on_message => $cmd->{on_message} )
    : (),
  );

  return;
}

sub _next_node {
  my $self = shift;

  unless ( defined $self->{_node_index} ) {
    $self->{_node_index} = int( rand( scalar @{ $self->{_nodes} } ) );
  }
  elsif ( $self->{_node_index} == scalar @{ $self->{_nodes} } ) {
    $self->{_node_index} = 0;
  }

  return $self->{_nodes}[ $self->{_node_index}++ ];
}

sub _reset_internals {



( run in 1.140 second using v1.01-cache-2.11-cpan-d8267643d1d )