AnyEvent-Stomper
view release on metacpan or search on metacpan
lib/AnyEvent/Stomper.pm view on Meta::CPAN
? %{ $self->{command_headers}{$cmd_name} }
: (),
%headers,
);
my $cmd = {
name => $cmd_name,
headers => \%headers,
%params,
};
unless ( defined $cmd->{on_receipt} ) {
weaken($self);
$cmd->{on_receipt} = sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
$self->{on_error}->($err);
return;
}
};
}
return $cmd;
}
sub _execute {
my $self = shift;
my $cmd = shift;
if ( $cmd->{name} eq 'SUBSCRIBE'
&& !defined $cmd->{on_message} )
{
croak '"on_message" callback must be specified';
}
elsif ( exists $ACK_CMDS{ $cmd->{name} }
&& !defined $cmd->{message} )
{
croak '"message" parameter must be specified';
}
unless ( $self->{_ready} ) {
if ( defined $self->{_handle} ) {
if ( $self->{_connected} ) {
if ( $self->{_login_state} == S_NEED_DO ) {
$self->_login;
}
}
}
elsif ( $self->{lazy} ) {
undef $self->{lazy};
$self->_connect;
}
else {
if ( defined $self->{reconnect_interval}
&& $self->{reconnect_interval} > 0 )
{
unless ( defined $self->{_reconnect_timer} ) {
weaken($self);
$self->{_reconnect_timer} = AE::timer(
$self->{reconnect_interval}, 0,
sub {
undef $self->{_reconnect_timer};
$self->_connect;
}
);
}
}
else {
$self->_connect;
}
}
push( @{ $self->{_input_queue} }, $cmd );
return;
}
$self->_push_write($cmd);
return;
}
sub _push_write {
my $self = shift;
my $cmd = shift;
my $cmd_headers = $cmd->{headers};
if ( exists $ACK_CMDS{ $cmd->{name} } ) {
unless ( $self->_check_ack( $cmd->{message} ) ) {
my $err = _new_error( "Unexpected $cmd->{name} sent.", E_OPRN_ERROR );
AE::postpone { $cmd->{on_receipt}->( undef, $err ) };
return;
}
my $msg_headers = $cmd->{message}->headers;
if ( $self->{_version} <= 1.1 ) {
$cmd_headers->{'message-id'} = $msg_headers->{'message-id'};
if ( $self->{_version} > 1.0 ) {
$cmd_headers->{subscription} = $msg_headers->{subscription};
}
}
else {
$cmd_headers->{id} = $msg_headers->{ack};
}
}
if ( exists $NEED_RECEIPT{ $cmd->{name} }
|| defined $cmd_headers->{receipt} )
{
if ( $cmd->{name} eq 'CONNECT' ) {
$self->{_pending_receipts}{CONNECTED} = $cmd;
}
else {
if ( !defined $cmd_headers->{receipt}
|| $cmd_headers->{receipt} eq 'auto' )
{
$cmd_headers->{receipt} = 'R_' . $self->{_session_id} . '.'
. $RECEIPT_SEQ++;
}
lib/AnyEvent/Stomper.pm view on Meta::CPAN
}
elsif ( $cmd->{name} eq 'DISCONNECT' ) {
$self->_disconnect;
}
$cmd->{on_receipt}->($receipt);
return;
}
sub _process_error {
my $self = shift;
my $err_frame = shift;
my $err_headers = $err_frame->headers;
my $err = _new_error( $err_headers->{message}, E_OPRN_ERROR, $err_frame );
my $cmd;
if ( defined $err_headers->{'receipt-id'} ) {
$cmd = delete $self->{_pending_receipts}{ $err_headers->{'receipt-id'} };
}
if ( defined $cmd ) {
$cmd->{on_receipt}->( undef, $err );
}
else {
$self->_disconnect($err);
}
return;
}
sub _disconnect {
my $self = shift;
my $err = shift;
my $was_connected = $self->{_connected};
if ( defined $self->{_handle} ) {
$self->{_handle}->destroy;
}
$self->_reset_internals;
$self->_abort($err);
if ( $was_connected && defined $self->{on_disconnect} ) {
$self->{on_disconnect}->();
}
return;
}
sub _reset_internals {
my $self = shift;
$self->{_handle} = undef;
$self->{_connected} = 0;
$self->{_login_state} = S_NEED_DO;
$self->{_ready} = 0;
$self->{_version} = undef;
$self->{_session_id} = undef;
$self->{_reconnect_timer} = undef;
return;
}
sub _abort {
my $self = shift;
my $err = shift;
my @queued_commands = $self->_queued_commands;
my %subs = %{ $self->{_subs} };
$self->{_input_queue} = [];
$self->{_temp_input_queue} = [];
$self->{_write_queue} = [];
$self->{_temp_write_queue} = [];
$self->{_pending_receipts} = {};
$self->{_subs} = {};
if ( !defined $err && @queued_commands ) {
$err = _new_error( 'Connection closed by client prematurely.',
E_CONN_CLOSED_BY_CLIENT );
}
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
my $err_frame = $err->frame;
$self->{on_error}->($err);
if ( %subs && $err_code != E_CONN_CLOSED_BY_CLIENT ) {
foreach my $sub_id ( keys %subs ) {
my $err = _new_error( qq{Subscription "$sub_id" lost: $err_msg},
$err_code, $err_frame );
my $sub = $subs{$sub_id};
$sub->{on_receipt}->( undef, $err );
}
}
foreach my $cmd (@queued_commands) {
my $err = _new_error( qq{Operation "$cmd->{name}" aborted: $err_msg},
$err_code, $err_frame );
$cmd->{on_receipt}->( undef, $err );
}
}
return;
}
sub _queued_commands {
my $self = shift;
return (
values %{ $self->{_pending_receipts} },
@{ $self->{_temp_write_queue} },
@{ $self->{_write_queue} },
@{ $self->{_temp_input_queue} },
@{ $self->{_input_queue} },
);
( run in 0.395 second using v1.01-cache-2.11-cpan-39bf76dae61 )