AnyEvent-Stomper

 view release on metacpan or  search on metacpan

lib/AnyEvent/Stomper.pm  view on Meta::CPAN

package AnyEvent::Stomper;

use 5.008000;
use strict;
use warnings;
use base qw( Exporter );

our $VERSION = '0.36';

use AnyEvent::Stomper::Frame;
use AnyEvent::Stomper::Error;

use AnyEvent;
use AnyEvent::Handle;
use Scalar::Util qw( looks_like_number weaken );
use List::Util qw( max );
use List::MoreUtils qw( bsearch_index );
use Carp qw( croak );

my %ERROR_CODES;

BEGIN {
  %ERROR_CODES = %AnyEvent::Stomper::Error::ERROR_CODES;
  our @EXPORT_OK   = keys %ERROR_CODES;
  our %EXPORT_TAGS = ( err_codes => \@EXPORT_OK );
}

use constant {
  # Default values
  D_HOST      => 'localhost',
  D_PORT      => 61613,
  D_HEARTBEAT => [ 0, 0 ],

  %ERROR_CODES,

  # Operation status
  S_NEED_DO     => 1,
  S_IN_PROGRESS => 2,
  S_DONE        => 3,

  EOL    => "\n",
  RE_EOL => qr/\r?\n/,
};

my %SUBUNSUB_CMDS = (
  SUBSCRIBE   => 1,
  UNSUBSCRIBE => 1,
);

my %ACK_CMDS = (
  ACK  => 1,
  NACK => 1,
);

my %NEED_RECEIPT = (
  CONNECT    => 1,
  DISCONNECT => 1,
  %SUBUNSUB_CMDS,
);

my %ESCAPE_MAP = (
  "\r" => "\\r",
  "\n" => "\\n",
  ':'  => "\\c",
  "\\" => "\\\\",
);
my %UNESCAPE_MAP = reverse %ESCAPE_MAP;

my $RECEIPT_SEQ = 1;
my $MESSAGE_SEQ = 1;


sub new {
  my $class  = shift;
  my %params = @_;

lib/AnyEvent/Stomper.pm  view on Meta::CPAN

  return;
}

# Generate methods
{
  no strict qw( refs );

  foreach my $name ( qw( send subscribe unsubscribe ack nack begin commit
      abort disconnect ) )
  {
    *{$name} = sub {
      my $self = shift;

      my $cmd = $self->_prepare( $name, [@_] );
      $self->_execute($cmd);

      return;
    }
  }
}

sub on_error {
  my $self = shift;

  if (@_) {
    my $on_error = shift;

    if ( defined $on_error ) {
      $self->{on_error} = $on_error;
    }
    else {
      $self->{on_error} = sub {
        my $err = shift;
        warn $err->message . "\n";
      };
    }
  }

  return $self->{on_error};
}

# Generate accessors
{
  no strict qw( refs );

  foreach my $name ( qw( host port ) ) {
    *{$name} = sub {
      my $self = shift;
      return $self->{$name};
    }
  }

  foreach my $name ( qw( connection_timeout reconnect_interval ) ) {
    *{$name} = sub {
      my $self = shift;

      if (@_) {
        my $seconds = shift;

        if ( defined $seconds
          && ( !looks_like_number($seconds) || $seconds < 0 ) )
        {
          croak qq{"$name" must be a positive number};
        }
        $self->{$name} = $seconds;
      }

      return $self->{$name};
    };
  }

  foreach my $name ( qw( on_connect on_disconnect ) ) {
    *{$name} = sub {
      my $self = shift;

      if (@_) {
        $self->{$name} = shift;
      }

      return $self->{$name};
    };
  }
}

sub force_disconnect {
  my $self = shift;

  $self->_disconnect();

  return;
}

sub _connect {
  my $self = shift;

  $self->{_handle} = AnyEvent::Handle->new(
    %{ $self->{handle_params} },
    connect          => [ $self->{host}, $self->{port} ],
    on_prepare       => $self->_create_on_prepare,
    on_connect       => $self->_create_on_connect,
    on_connect_error => $self->_create_on_connect_error,
    on_wtimeout      => $self->_create_on_wtimeout,
    on_rtimeout      => $self->_create_on_rtimeout,
    on_eof           => $self->_create_on_eof,
    on_error         => $self->_create_on_handle_error,
    on_drain         => $self->_create_on_drain,
    on_read          => $self->_create_on_read,
  );

  return;
}

sub _create_on_prepare {
  my $self = shift;

  weaken($self);

  return sub {
    if ( defined $self->{connection_timeout} ) {
      return $self->{connection_timeout};
    }



( run in 1.598 second using v1.01-cache-2.11-cpan-39bf76dae61 )