AnyEvent-Stomper

 view release on metacpan or  search on metacpan

lib/AnyEvent/Stomper.pm  view on Meta::CPAN

package AnyEvent::Stomper;

use 5.008000;
use strict;
use warnings;
use base qw( Exporter );

our $VERSION = '0.36';

use AnyEvent::Stomper::Frame;
use AnyEvent::Stomper::Error;

use AnyEvent;
use AnyEvent::Handle;
use Scalar::Util qw( looks_like_number weaken );
use List::Util qw( max );
use List::MoreUtils qw( bsearch_index );
use Carp qw( croak );

my %ERROR_CODES;

BEGIN {
  %ERROR_CODES = %AnyEvent::Stomper::Error::ERROR_CODES;
  our @EXPORT_OK   = keys %ERROR_CODES;
  our %EXPORT_TAGS = ( err_codes => \@EXPORT_OK );
}

use constant {
  # Default values
  D_HOST      => 'localhost',
  D_PORT      => 61613,
  D_HEARTBEAT => [ 0, 0 ],

  %ERROR_CODES,

  # Operation status
  S_NEED_DO     => 1,
  S_IN_PROGRESS => 2,
  S_DONE        => 3,

  EOL    => "\n",
  RE_EOL => qr/\r?\n/,
};

my %SUBUNSUB_CMDS = (
  SUBSCRIBE   => 1,
  UNSUBSCRIBE => 1,
);

my %ACK_CMDS = (
  ACK  => 1,
  NACK => 1,
);

my %NEED_RECEIPT = (
  CONNECT    => 1,
  DISCONNECT => 1,
  %SUBUNSUB_CMDS,
);

my %ESCAPE_MAP = (
  "\r" => "\\r",
  "\n" => "\\n",
  ':'  => "\\c",
  "\\" => "\\\\",
);
my %UNESCAPE_MAP = reverse %ESCAPE_MAP;

my $RECEIPT_SEQ = 1;
my $MESSAGE_SEQ = 1;


sub new {
  my $class  = shift;
  my %params = @_;

lib/AnyEvent/Stomper.pm  view on Meta::CPAN


      if (@_) {
        my $seconds = shift;

        if ( defined $seconds
          && ( !looks_like_number($seconds) || $seconds < 0 ) )
        {
          croak qq{"$name" must be a positive number};
        }
        $self->{$name} = $seconds;
      }

      return $self->{$name};
    };
  }

  foreach my $name ( qw( on_connect on_disconnect ) ) {
    *{$name} = sub {
      my $self = shift;

      if (@_) {
        $self->{$name} = shift;
      }

      return $self->{$name};
    };
  }
}

sub force_disconnect {
  my $self = shift;

  $self->_disconnect();

  return;
}

sub _connect {
  my $self = shift;

  $self->{_handle} = AnyEvent::Handle->new(
    %{ $self->{handle_params} },
    connect          => [ $self->{host}, $self->{port} ],
    on_prepare       => $self->_create_on_prepare,
    on_connect       => $self->_create_on_connect,
    on_connect_error => $self->_create_on_connect_error,
    on_wtimeout      => $self->_create_on_wtimeout,
    on_rtimeout      => $self->_create_on_rtimeout,
    on_eof           => $self->_create_on_eof,
    on_error         => $self->_create_on_handle_error,
    on_drain         => $self->_create_on_drain,
    on_read          => $self->_create_on_read,
  );

  return;
}

sub _create_on_prepare {
  my $self = shift;

  weaken($self);

  return sub {
    if ( defined $self->{connection_timeout} ) {
      return $self->{connection_timeout};
    }

    return;
  };
}

sub _create_on_connect {
  my $self = shift;

  weaken($self);

  return sub {
    $self->{_connected} = 1;
    $self->_login;

    if ( defined $self->{on_connect} ) {
      $self->{on_connect}->();
    }
  };
}

sub _create_on_connect_error {
  my $self = shift;

  weaken($self);

  return sub {
    my $err_msg = pop;

    my $err = _new_error(
      "Can't connect to $self->{host}:$self->{port}: $err_msg",
      E_CANT_CONN
    );
    $self->_disconnect($err);
  };
}

sub _create_on_wtimeout {
  my $self = shift;

  weaken($self);

  return sub {
    $self->{_handle}->push_write(EOL);
  };
}

sub _create_on_rtimeout {
  my $self = shift;

  weaken($self);

  return sub {
    my $err = _new_error( 'Read timed out.', E_READ_TIMEDOUT );
    $self->_disconnect($err);
  };
}

sub _create_on_eof {
  my $self = shift;

  weaken($self);

  return sub {
    my $err = _new_error( 'Connection closed by remote host.',
        E_CONN_CLOSED_BY_REMOTE_HOST );
    $self->_disconnect($err);
  };
}

sub _create_on_handle_error {
  my $self = shift;

  weaken($self);

  return sub {
    my $err_msg = pop;

    my $err = _new_error( $err_msg, E_IO );
    $self->_disconnect($err);
  };
}

sub _create_on_drain {
  my $self = shift;

  weaken($self);

  return sub {
    return unless @{ $self->{_write_queue} };

    $self->{_temp_write_queue} = $self->{_write_queue};
    $self->{_write_queue} = [];

    while ( my $cmd = shift @{ $self->{_temp_write_queue} } ) {
      $cmd->{on_receipt}->();
    }
  };
}

sub _create_on_read {
  my $self = shift;

  weaken($self);

  my $cmd_name;
  my $headers;

  return sub {
    my $handle = shift;

    my $frame;

    while (1) {
      return if $handle->destroyed;

      if ( defined $cmd_name ) {
        my $content_length;
        if ( defined $headers->{'content-length'} ) {
          $content_length = $headers->{'content-length'};
          return if length( $handle->{rbuf} ) < $content_length + 1;
        }
        else {
          $content_length = index( $handle->{rbuf}, "\0" );
          return if $content_length < 0
        }

        my $body = substr( $handle->{rbuf}, 0, $content_length, '' );
        $handle->{rbuf} =~ s/^\0(?:${\(RE_EOL)})*//;

        $frame = _new_frame( $cmd_name, $headers, $body );

        undef $cmd_name;
        undef $headers;
      }
      else {
        $handle->{rbuf} =~ s/^(?:${\(RE_EOL)})+//;

        return unless $handle->{rbuf} =~ s/^(.+?)(?:${\(RE_EOL)}){2}//s;

        ( $cmd_name, my @header_strings ) = split( m/${\(RE_EOL)}/, $1 );
        foreach my $header_str (@header_strings) {
          my ( $name, $value ) = split( /:/, $header_str, 2 );
          $headers->{ _unescape($name) } = _unescape($value);
        }

        next;
      }

      $self->_process_frame($frame);
    }
  };
}

sub _prepare {
  my $self     = shift;
  my $cmd_name = uc(shift);
  my $args     = shift;

  my %params;

  if ( ref( $args->[-1] ) eq 'CODE'
    && scalar @{$args} % 2 > 0 )
  {
    if ( $cmd_name eq 'SUBSCRIBE' ) {
      $params{on_message} = pop @{$args};
    }
    else {
      $params{on_receipt} = pop @{$args};
    }
  }

  my %headers = @{$args};

  foreach my $name ( qw( body on_receipt on_message ) ) {
    if ( defined $headers{$name} ) {
      $params{$name} = delete $headers{$name};
    }
  }
  if ( exists $ACK_CMDS{$cmd_name} ) {
    $params{message} = delete $headers{message};
  }

  %headers = (
    %{ $self->{default_headers} },

    defined $self->{command_headers}{$cmd_name}
    ? %{ $self->{command_headers}{$cmd_name} }
    : (),

    %headers,
  );

  my $cmd = {
    name    => $cmd_name,
    headers => \%headers,
    %params,
  };

  unless ( defined $cmd->{on_receipt} ) {
    weaken($self);

    $cmd->{on_receipt} = sub {
      my $receipt = shift;
      my $err     = shift;

      if ( defined $err ) {
        $self->{on_error}->($err);
        return;
      }
    };
  }

  return $cmd;
}

sub _execute {
  my $self = shift;
  my $cmd  = shift;

  if ( $cmd->{name} eq 'SUBSCRIBE'
    && !defined $cmd->{on_message} )
  {
    croak '"on_message" callback must be specified';
  }
  elsif ( exists $ACK_CMDS{ $cmd->{name} }
    && !defined $cmd->{message} )
  {
    croak '"message" parameter must be specified';
  }

  unless ( $self->{_ready} ) {
    if ( defined $self->{_handle} ) {
      if ( $self->{_connected} ) {
        if ( $self->{_login_state} == S_NEED_DO ) {
          $self->_login;
        }
      }
    }
    elsif ( $self->{lazy} ) {
      undef $self->{lazy};
      $self->_connect;
    }
    else {
      if ( defined $self->{reconnect_interval}
        && $self->{reconnect_interval} > 0 )
      {
        unless ( defined $self->{_reconnect_timer} ) {
          weaken($self);

          $self->{_reconnect_timer} = AE::timer(
            $self->{reconnect_interval}, 0,
            sub {
              undef $self->{_reconnect_timer};
              $self->_connect;
            }
          );
        }
      }
      else {
        $self->_connect;
      }
    }

    push( @{ $self->{_input_queue} }, $cmd );

    return;
  }

  $self->_push_write($cmd);

  return;
}

sub _push_write {
  my $self = shift;
  my $cmd  = shift;

  my $cmd_headers = $cmd->{headers};

  if ( exists $ACK_CMDS{ $cmd->{name} } ) {
    unless ( $self->_check_ack( $cmd->{message} ) ) {
      my $err = _new_error( "Unexpected $cmd->{name} sent.", E_OPRN_ERROR );
      AE::postpone { $cmd->{on_receipt}->( undef, $err ) };

      return;
    }

    my $msg_headers = $cmd->{message}->headers;

    if ( $self->{_version} <= 1.1 ) {
      $cmd_headers->{'message-id'} = $msg_headers->{'message-id'};
      if ( $self->{_version} > 1.0 ) {
        $cmd_headers->{subscription} = $msg_headers->{subscription};
      }
    }
    else {
      $cmd_headers->{id} = $msg_headers->{ack};
    }
  }

  if ( exists $NEED_RECEIPT{ $cmd->{name} }
    || defined $cmd_headers->{receipt} )
  {
    if ( $cmd->{name} eq 'CONNECT' ) {
      $self->{_pending_receipts}{CONNECTED} = $cmd;
    }
    else {
      if ( !defined $cmd_headers->{receipt}
        || $cmd_headers->{receipt} eq 'auto' )
      {
        $cmd_headers->{receipt} = 'R_' . $self->{_session_id} . '.'
            . $RECEIPT_SEQ++;
      }
      $self->{_pending_receipts}{ $cmd_headers->{receipt} } = $cmd;
    }
  }
  else {
    push( @{ $self->{_write_queue} }, $cmd );
  }

  my $body = $cmd->{body};
  unless ( defined $body ) {
    $body = '';
  }
  unless ( defined $cmd_headers->{'content-length'} ) {
    $cmd_headers->{'content-length'} = length($body);
  }

  my $frame_str = $cmd->{name} . EOL;
  while ( my ( $name, $value ) = each %{$cmd_headers} ) {
    unless ( defined $value ) {
      $value = '';
    }
    $frame_str .= _escape($name) . ':' . _escape($value) . EOL;
  }
  $frame_str .= EOL . "$body\0";

  $self->{_handle}->push_write($frame_str);

  return;
}

sub _login {
  my $self = shift;

  my ( $cx, $cy ) = @{ $self->{heartbeat} };

  if ( $cy > 0 ) {
    $self->_rtimeout($cy);
  }

  my %cmd_headers = (
    'accept-version' => '1.0,1.1,1.2',
    'heart-beat'     => join( ',', $cx, $cy ),
  );
  if ( defined $self->{login} ) {
    $cmd_headers{login} = $self->{login};
  }
  if ( defined $self->{passcode} ) {
    $cmd_headers{passcode} = $self->{passcode};
  }
  if ( defined $self->{vhost} ) {
    $cmd_headers{host} = $self->{vhost};
  }

  weaken($self);
  $self->{_login_state} = S_IN_PROGRESS;

  $self->_push_write(
    { name    => 'CONNECT',
      headers => \%cmd_headers,

      on_receipt => sub {
        my $receipt = shift;
        my $err     = shift;

        if ( defined $err ) {
          $self->{_login_state} = S_NEED_DO;
          $self->_abort($err);

          return;
        }

        $self->{_login_state} = S_DONE;

        my $receipt_headers = $receipt->headers;

        if ( defined $receipt_headers->{'heart-beat'} ) {
          my ( $sx, $sy ) = split( /,/, $receipt_headers->{'heart-beat'} );

          if ( $sx > 0 ) {
            $self->_rtimeout( max( $cy, $sx ) );
          }
          if ( $sy > 0 ) {
            $self->_wtimeout( max( $cx, $sy ) );
          }
        }

        $self->{_ready} = 1;
        $self->{_version}
            = version->parse( $receipt_headers->{version} || 1.0 );
        $self->{_session_id} = $receipt_headers->{session} || '';

        $self->_process_input_queue;
      },
    }
  );

  return;
}

sub _rtimeout {
  my $self     = shift;
  my $rtimeout = shift;

  $self->{_handle}->rtimeout_reset;
  $self->{_handle}->rtimeout( ( $rtimeout / 1000 ) * 3 );

  return;
}

sub _wtimeout {
  my $self     = shift;
  my $wtimeout = shift;

  $self->{_handle}->wtimeout_reset;



( run in 0.687 second using v1.01-cache-2.11-cpan-d8267643d1d )