AnyEvent-Stomper

 view release on metacpan or  search on metacpan

lib/AnyEvent/Stomper.pm  view on Meta::CPAN

    ? %{ $self->{command_headers}{$cmd_name} }
    : (),

    %headers,
  );

  my $cmd = {
    name    => $cmd_name,
    headers => \%headers,
    %params,
  };

  unless ( defined $cmd->{on_receipt} ) {
    weaken($self);

    $cmd->{on_receipt} = sub {
      my $receipt = shift;
      my $err     = shift;

      if ( defined $err ) {
        $self->{on_error}->($err);
        return;
      }
    };
  }

  return $cmd;
}

sub _execute {
  my $self = shift;
  my $cmd  = shift;

  if ( $cmd->{name} eq 'SUBSCRIBE'
    && !defined $cmd->{on_message} )
  {
    croak '"on_message" callback must be specified';
  }
  elsif ( exists $ACK_CMDS{ $cmd->{name} }
    && !defined $cmd->{message} )
  {
    croak '"message" parameter must be specified';
  }

  unless ( $self->{_ready} ) {
    if ( defined $self->{_handle} ) {
      if ( $self->{_connected} ) {
        if ( $self->{_login_state} == S_NEED_DO ) {
          $self->_login;
        }
      }
    }
    elsif ( $self->{lazy} ) {
      undef $self->{lazy};
      $self->_connect;
    }
    else {
      if ( defined $self->{reconnect_interval}
        && $self->{reconnect_interval} > 0 )
      {
        unless ( defined $self->{_reconnect_timer} ) {
          weaken($self);

          $self->{_reconnect_timer} = AE::timer(
            $self->{reconnect_interval}, 0,
            sub {
              undef $self->{_reconnect_timer};
              $self->_connect;
            }
          );
        }
      }
      else {
        $self->_connect;
      }
    }

    push( @{ $self->{_input_queue} }, $cmd );

    return;
  }

  $self->_push_write($cmd);

  return;
}

sub _push_write {
  my $self = shift;
  my $cmd  = shift;

  my $cmd_headers = $cmd->{headers};

  if ( exists $ACK_CMDS{ $cmd->{name} } ) {
    unless ( $self->_check_ack( $cmd->{message} ) ) {
      my $err = _new_error( "Unexpected $cmd->{name} sent.", E_OPRN_ERROR );
      AE::postpone { $cmd->{on_receipt}->( undef, $err ) };

      return;
    }

    my $msg_headers = $cmd->{message}->headers;

    if ( $self->{_version} <= 1.1 ) {
      $cmd_headers->{'message-id'} = $msg_headers->{'message-id'};
      if ( $self->{_version} > 1.0 ) {
        $cmd_headers->{subscription} = $msg_headers->{subscription};
      }
    }
    else {
      $cmd_headers->{id} = $msg_headers->{ack};
    }
  }

  if ( exists $NEED_RECEIPT{ $cmd->{name} }
    || defined $cmd_headers->{receipt} )
  {
    if ( $cmd->{name} eq 'CONNECT' ) {
      $self->{_pending_receipts}{CONNECTED} = $cmd;
    }
    else {
      if ( !defined $cmd_headers->{receipt}
        || $cmd_headers->{receipt} eq 'auto' )
      {
        $cmd_headers->{receipt} = 'R_' . $self->{_session_id} . '.'
            . $RECEIPT_SEQ++;
      }

lib/AnyEvent/Stomper.pm  view on Meta::CPAN

  }
  elsif ( $cmd->{name} eq 'DISCONNECT' ) {
    $self->_disconnect;
  }

  $cmd->{on_receipt}->($receipt);

  return;
}

sub _process_error {
  my $self      = shift;
  my $err_frame = shift;

  my $err_headers = $err_frame->headers;
  my $err = _new_error( $err_headers->{message}, E_OPRN_ERROR, $err_frame );

  my $cmd;
  if ( defined $err_headers->{'receipt-id'} ) {
    $cmd = delete $self->{_pending_receipts}{ $err_headers->{'receipt-id'} };
  }

  if ( defined $cmd ) {
    $cmd->{on_receipt}->( undef, $err );
  }
  else {
    $self->_disconnect($err);
  }

  return;
}

sub _disconnect {
  my $self = shift;
  my $err  = shift;

  my $was_connected = $self->{_connected};

  if ( defined $self->{_handle} ) {
    $self->{_handle}->destroy;
  }
  $self->_reset_internals;
  $self->_abort($err);

  if ( $was_connected && defined $self->{on_disconnect} ) {
    $self->{on_disconnect}->();
  }

  return;
}

sub _reset_internals {
  my $self = shift;

  $self->{_handle}          = undef;
  $self->{_connected}       = 0;
  $self->{_login_state}     = S_NEED_DO;
  $self->{_ready}           = 0;
  $self->{_version}         = undef;
  $self->{_session_id}      = undef;
  $self->{_reconnect_timer} = undef;

  return;
}

sub _abort {
  my $self = shift;
  my $err  = shift;

  my @queued_commands = $self->_queued_commands;
  my %subs            = %{ $self->{_subs} };

  $self->{_input_queue}      = [];
  $self->{_temp_input_queue} = [];
  $self->{_write_queue}      = [];
  $self->{_temp_write_queue} = [];
  $self->{_pending_receipts} = {};
  $self->{_subs}             = {};

  if ( !defined $err && @queued_commands ) {
    $err = _new_error( 'Connection closed by client prematurely.',
        E_CONN_CLOSED_BY_CLIENT );
  }

  if ( defined $err ) {
    my $err_msg   = $err->message;
    my $err_code  = $err->code;
    my $err_frame = $err->frame;

    $self->{on_error}->($err);

    if ( %subs && $err_code != E_CONN_CLOSED_BY_CLIENT ) {
      foreach my $sub_id ( keys %subs ) {
        my $err = _new_error( qq{Subscription "$sub_id" lost: $err_msg},
            $err_code, $err_frame );

        my $sub = $subs{$sub_id};
        $sub->{on_receipt}->( undef, $err );
      }
    }

    foreach my $cmd (@queued_commands) {
      my $err = _new_error( qq{Operation "$cmd->{name}" aborted: $err_msg},
          $err_code, $err_frame );
      $cmd->{on_receipt}->( undef, $err );
    }
  }

  return;
}

sub _queued_commands {
  my $self = shift;

  return (
    values %{ $self->{_pending_receipts} },
    @{ $self->{_temp_write_queue} },
    @{ $self->{_write_queue} },
    @{ $self->{_temp_input_queue} },
    @{ $self->{_input_queue} },
  );



( run in 0.395 second using v1.01-cache-2.11-cpan-39bf76dae61 )