AnyEvent-Stomper

 view release on metacpan or  search on metacpan

lib/AnyEvent/Stomper.pm  view on Meta::CPAN

use warnings;
use base qw( Exporter );

our $VERSION = '0.36';

use AnyEvent::Stomper::Frame;
use AnyEvent::Stomper::Error;

use AnyEvent;
use AnyEvent::Handle;
use Scalar::Util qw( looks_like_number weaken );
use List::Util qw( max );
use List::MoreUtils qw( bsearch_index );
use Carp qw( croak );

my %ERROR_CODES;

BEGIN {
  %ERROR_CODES = %AnyEvent::Stomper::Error::ERROR_CODES;
  our @EXPORT_OK   = keys %ERROR_CODES;
  our %EXPORT_TAGS = ( err_codes => \@EXPORT_OK );

lib/AnyEvent/Stomper.pm  view on Meta::CPAN

    on_drain         => $self->_create_on_drain,
    on_read          => $self->_create_on_read,
  );

  return;
}

sub _create_on_prepare {
  my $self = shift;

  weaken($self);

  return sub {
    if ( defined $self->{connection_timeout} ) {
      return $self->{connection_timeout};
    }

    return;
  };
}

sub _create_on_connect {
  my $self = shift;

  weaken($self);

  return sub {
    $self->{_connected} = 1;
    $self->_login;

    if ( defined $self->{on_connect} ) {
      $self->{on_connect}->();
    }
  };
}

sub _create_on_connect_error {
  my $self = shift;

  weaken($self);

  return sub {
    my $err_msg = pop;

    my $err = _new_error(
      "Can't connect to $self->{host}:$self->{port}: $err_msg",
      E_CANT_CONN
    );
    $self->_disconnect($err);
  };
}

sub _create_on_wtimeout {
  my $self = shift;

  weaken($self);

  return sub {
    $self->{_handle}->push_write(EOL);
  };
}

sub _create_on_rtimeout {
  my $self = shift;

  weaken($self);

  return sub {
    my $err = _new_error( 'Read timed out.', E_READ_TIMEDOUT );
    $self->_disconnect($err);
  };
}

sub _create_on_eof {
  my $self = shift;

  weaken($self);

  return sub {
    my $err = _new_error( 'Connection closed by remote host.',
        E_CONN_CLOSED_BY_REMOTE_HOST );
    $self->_disconnect($err);
  };
}

sub _create_on_handle_error {
  my $self = shift;

  weaken($self);

  return sub {
    my $err_msg = pop;

    my $err = _new_error( $err_msg, E_IO );
    $self->_disconnect($err);
  };
}

sub _create_on_drain {
  my $self = shift;

  weaken($self);

  return sub {
    return unless @{ $self->{_write_queue} };

    $self->{_temp_write_queue} = $self->{_write_queue};
    $self->{_write_queue} = [];

    while ( my $cmd = shift @{ $self->{_temp_write_queue} } ) {
      $cmd->{on_receipt}->();
    }
  };
}

sub _create_on_read {
  my $self = shift;

  weaken($self);

  my $cmd_name;
  my $headers;

  return sub {
    my $handle = shift;

    my $frame;

    while (1) {

lib/AnyEvent/Stomper.pm  view on Meta::CPAN

    %headers,
  );

  my $cmd = {
    name    => $cmd_name,
    headers => \%headers,
    %params,
  };

  unless ( defined $cmd->{on_receipt} ) {
    weaken($self);

    $cmd->{on_receipt} = sub {
      my $receipt = shift;
      my $err     = shift;

      if ( defined $err ) {
        $self->{on_error}->($err);
        return;
      }
    };

lib/AnyEvent/Stomper.pm  view on Meta::CPAN

    }
    elsif ( $self->{lazy} ) {
      undef $self->{lazy};
      $self->_connect;
    }
    else {
      if ( defined $self->{reconnect_interval}
        && $self->{reconnect_interval} > 0 )
      {
        unless ( defined $self->{_reconnect_timer} ) {
          weaken($self);

          $self->{_reconnect_timer} = AE::timer(
            $self->{reconnect_interval}, 0,
            sub {
              undef $self->{_reconnect_timer};
              $self->_connect;
            }
          );
        }
      }

lib/AnyEvent/Stomper.pm  view on Meta::CPAN

  if ( defined $self->{login} ) {
    $cmd_headers{login} = $self->{login};
  }
  if ( defined $self->{passcode} ) {
    $cmd_headers{passcode} = $self->{passcode};
  }
  if ( defined $self->{vhost} ) {
    $cmd_headers{host} = $self->{vhost};
  }

  weaken($self);
  $self->{_login_state} = S_IN_PROGRESS;

  $self->_push_write(
    { name    => 'CONNECT',
      headers => \%cmd_headers,

      on_receipt => sub {
        my $receipt = shift;
        my $err     = shift;

lib/AnyEvent/Stomper/Cluster.pm  view on Meta::CPAN

use 5.008000;
use strict;
use warnings;
use base qw( Exporter );

our $VERSION = '0.36';

use AnyEvent::Stomper;
use AnyEvent::Stomper::Error;

use Scalar::Util qw( weaken );
use Carp qw( croak );

our %ERROR_CODES;

BEGIN {
  %ERROR_CODES = %AnyEvent::Stomper::Error::ERROR_CODES;
  our @EXPORT_OK   = keys %ERROR_CODES;
  our %EXPORT_TAGS = ( err_codes => \@EXPORT_OK );
}

lib/AnyEvent/Stomper/Cluster.pm  view on Meta::CPAN

    on_disconnect => $self->_create_on_node_disconnect( $host, $port ),
    on_error      => $self->_create_on_node_error( $host, $port ),
  );
}

sub _create_on_node_connect {
  my $self = shift;
  my $host = shift;
  my $port = shift;

  weaken($self);

  return sub {
    if ( defined $self->{on_node_connect} ) {
      $self->{on_node_connect}->( $host, $port );
    }
  };
}

sub _create_on_node_disconnect {
  my $self = shift;
  my $host = shift;
  my $port = shift;

  weaken($self);

  return sub {
    if ( defined $self->{on_node_disconnect} ) {
      $self->{on_node_disconnect}->( $host, $port );
    }
  };
}

sub _create_on_node_error {
  my $self = shift;
  my $host = shift;
  my $port = shift;

  weaken($self);

  return sub {
    my $err = shift;

    my $err_code = $err->code;

    if ( $err_code != E_OPRN_ERROR
      && $err_code != E_CONN_CLOSED_BY_CLIENT )
    {
      $self->{_active_node} = $self->_next_node;

lib/AnyEvent/Stomper/Cluster.pm  view on Meta::CPAN

    $params{message} = delete $headers{message};
  }

  my $cmd = {
    name    => $cmd_name,
    headers => \%headers,
    %params,
  };

  unless ( defined $cmd->{on_receipt} ) {
    weaken($self);

    $cmd->{on_receipt} = sub {
      my $receipt = shift;
      my $err     = shift;

      if ( defined $err ) {
        $self->{on_error}->($err);
        return;
      }
    };

lib/AnyEvent/Stomper/Cluster.pm  view on Meta::CPAN

}

sub _execute {
  my $self      = shift;
  my $cmd       = shift;
  my $fails_cnt = shift || 0;

  my $hostport = $self->{_active_node};
  my $node     = $self->{_nodes_pool}{$hostport};

  weaken($self);

  $node->execute( $cmd->{name}, %{ $cmd->{headers} },
    body => $cmd->{body},

    on_receipt => sub {
      my $receipt = shift;
      my $err     = shift;

      if ( defined $err ) {
        my $err_code = $err->code;



( run in 0.764 second using v1.01-cache-2.11-cpan-49f99fa48dc )