AnyEvent-Stomper
view release on metacpan or search on metacpan
lib/AnyEvent/Stomper.pm view on Meta::CPAN
sub _create_on_drain {
my $self = shift;
weaken($self);
return sub {
return unless @{ $self->{_write_queue} };
$self->{_temp_write_queue} = $self->{_write_queue};
$self->{_write_queue} = [];
while ( my $cmd = shift @{ $self->{_temp_write_queue} } ) {
$cmd->{on_receipt}->();
}
};
}
sub _create_on_read {
my $self = shift;
weaken($self);
my $cmd_name;
my $headers;
return sub {
my $handle = shift;
my $frame;
while (1) {
return if $handle->destroyed;
if ( defined $cmd_name ) {
my $content_length;
if ( defined $headers->{'content-length'} ) {
$content_length = $headers->{'content-length'};
return if length( $handle->{rbuf} ) < $content_length + 1;
}
else {
$content_length = index( $handle->{rbuf}, "\0" );
return if $content_length < 0
}
my $body = substr( $handle->{rbuf}, 0, $content_length, '' );
$handle->{rbuf} =~ s/^\0(?:${\(RE_EOL)})*//;
$frame = _new_frame( $cmd_name, $headers, $body );
undef $cmd_name;
undef $headers;
}
else {
$handle->{rbuf} =~ s/^(?:${\(RE_EOL)})+//;
return unless $handle->{rbuf} =~ s/^(.+?)(?:${\(RE_EOL)}){2}//s;
( $cmd_name, my @header_strings ) = split( m/${\(RE_EOL)}/, $1 );
foreach my $header_str (@header_strings) {
my ( $name, $value ) = split( /:/, $header_str, 2 );
$headers->{ _unescape($name) } = _unescape($value);
}
next;
}
$self->_process_frame($frame);
}
};
}
sub _prepare {
my $self = shift;
my $cmd_name = uc(shift);
my $args = shift;
my %params;
if ( ref( $args->[-1] ) eq 'CODE'
&& scalar @{$args} % 2 > 0 )
{
if ( $cmd_name eq 'SUBSCRIBE' ) {
$params{on_message} = pop @{$args};
}
else {
$params{on_receipt} = pop @{$args};
}
}
my %headers = @{$args};
foreach my $name ( qw( body on_receipt on_message ) ) {
if ( defined $headers{$name} ) {
$params{$name} = delete $headers{$name};
}
}
if ( exists $ACK_CMDS{$cmd_name} ) {
$params{message} = delete $headers{message};
}
%headers = (
%{ $self->{default_headers} },
defined $self->{command_headers}{$cmd_name}
? %{ $self->{command_headers}{$cmd_name} }
: (),
%headers,
);
my $cmd = {
name => $cmd_name,
headers => \%headers,
%params,
};
unless ( defined $cmd->{on_receipt} ) {
weaken($self);
$cmd->{on_receipt} = sub {
my $receipt = shift;
lib/AnyEvent/Stomper.pm view on Meta::CPAN
sub _push_write {
my $self = shift;
my $cmd = shift;
my $cmd_headers = $cmd->{headers};
if ( exists $ACK_CMDS{ $cmd->{name} } ) {
unless ( $self->_check_ack( $cmd->{message} ) ) {
my $err = _new_error( "Unexpected $cmd->{name} sent.", E_OPRN_ERROR );
AE::postpone { $cmd->{on_receipt}->( undef, $err ) };
return;
}
my $msg_headers = $cmd->{message}->headers;
if ( $self->{_version} <= 1.1 ) {
$cmd_headers->{'message-id'} = $msg_headers->{'message-id'};
if ( $self->{_version} > 1.0 ) {
$cmd_headers->{subscription} = $msg_headers->{subscription};
}
}
else {
$cmd_headers->{id} = $msg_headers->{ack};
}
}
if ( exists $NEED_RECEIPT{ $cmd->{name} }
|| defined $cmd_headers->{receipt} )
{
if ( $cmd->{name} eq 'CONNECT' ) {
$self->{_pending_receipts}{CONNECTED} = $cmd;
}
else {
if ( !defined $cmd_headers->{receipt}
|| $cmd_headers->{receipt} eq 'auto' )
{
$cmd_headers->{receipt} = 'R_' . $self->{_session_id} . '.'
. $RECEIPT_SEQ++;
}
$self->{_pending_receipts}{ $cmd_headers->{receipt} } = $cmd;
}
}
else {
push( @{ $self->{_write_queue} }, $cmd );
}
my $body = $cmd->{body};
unless ( defined $body ) {
$body = '';
}
unless ( defined $cmd_headers->{'content-length'} ) {
$cmd_headers->{'content-length'} = length($body);
}
my $frame_str = $cmd->{name} . EOL;
while ( my ( $name, $value ) = each %{$cmd_headers} ) {
unless ( defined $value ) {
$value = '';
}
$frame_str .= _escape($name) . ':' . _escape($value) . EOL;
}
$frame_str .= EOL . "$body\0";
$self->{_handle}->push_write($frame_str);
return;
}
sub _login {
my $self = shift;
my ( $cx, $cy ) = @{ $self->{heartbeat} };
if ( $cy > 0 ) {
$self->_rtimeout($cy);
}
my %cmd_headers = (
'accept-version' => '1.0,1.1,1.2',
'heart-beat' => join( ',', $cx, $cy ),
);
if ( defined $self->{login} ) {
$cmd_headers{login} = $self->{login};
}
if ( defined $self->{passcode} ) {
$cmd_headers{passcode} = $self->{passcode};
}
if ( defined $self->{vhost} ) {
$cmd_headers{host} = $self->{vhost};
}
weaken($self);
$self->{_login_state} = S_IN_PROGRESS;
$self->_push_write(
{ name => 'CONNECT',
headers => \%cmd_headers,
on_receipt => sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
$self->{_login_state} = S_NEED_DO;
$self->_abort($err);
return;
}
$self->{_login_state} = S_DONE;
my $receipt_headers = $receipt->headers;
if ( defined $receipt_headers->{'heart-beat'} ) {
my ( $sx, $sy ) = split( /,/, $receipt_headers->{'heart-beat'} );
if ( $sx > 0 ) {
$self->_rtimeout( max( $cy, $sx ) );
}
if ( $sy > 0 ) {
lib/AnyEvent/Stomper.pm view on Meta::CPAN
}
sub _abort {
my $self = shift;
my $err = shift;
my @queued_commands = $self->_queued_commands;
my %subs = %{ $self->{_subs} };
$self->{_input_queue} = [];
$self->{_temp_input_queue} = [];
$self->{_write_queue} = [];
$self->{_temp_write_queue} = [];
$self->{_pending_receipts} = {};
$self->{_subs} = {};
if ( !defined $err && @queued_commands ) {
$err = _new_error( 'Connection closed by client prematurely.',
E_CONN_CLOSED_BY_CLIENT );
}
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
my $err_frame = $err->frame;
$self->{on_error}->($err);
if ( %subs && $err_code != E_CONN_CLOSED_BY_CLIENT ) {
foreach my $sub_id ( keys %subs ) {
my $err = _new_error( qq{Subscription "$sub_id" lost: $err_msg},
$err_code, $err_frame );
my $sub = $subs{$sub_id};
$sub->{on_receipt}->( undef, $err );
}
}
foreach my $cmd (@queued_commands) {
my $err = _new_error( qq{Operation "$cmd->{name}" aborted: $err_msg},
$err_code, $err_frame );
$cmd->{on_receipt}->( undef, $err );
}
}
return;
}
sub _queued_commands {
my $self = shift;
return (
values %{ $self->{_pending_receipts} },
@{ $self->{_temp_write_queue} },
@{ $self->{_write_queue} },
@{ $self->{_temp_input_queue} },
@{ $self->{_input_queue} },
);
}
sub _escape {
my $str = shift;
$str =~ s/([\r\n:\\])/$ESCAPE_MAP{$1}/ge;
return $str;
}
sub _unescape {
my $str = shift;
$str =~ s/(\\[rnc\\])/$UNESCAPE_MAP{$1}/ge;
return $str;
}
sub _new_frame {
return AnyEvent::Stomper::Frame->new(@_);
}
sub _new_error {
return AnyEvent::Stomper::Error->new(@_);
}
sub DESTROY {
my $self = shift;
if ( defined $self->{_handle} ) {
$self->{_handle}->destroy;
}
if ( defined $self->{_pending_receipts} ) {
my @queued_commands = $self->_queued_commands;
foreach my $cmd (@queued_commands) {
warn qq{Operation "$cmd->{name}" aborted:}
. " Client object destroyed prematurely.\n";
}
}
return;
}
1;
__END__
=head1 NAME
AnyEvent::Stomper - Flexible non-blocking STOMP client
=head1 SYNOPSIS
use AnyEvent;
use AnyEvent::Stomper;
my $stomper = AnyEvent::Stomper->new(
host => 'localhost',
prot => '61613',
login => 'guest',
passcode => 'guest',
);
my $cv = AE::cv;
$stomper->subscribe(
id => 'foo',
destination => '/queue/foo',
on_receipt => sub {
( run in 1.099 second using v1.01-cache-2.11-cpan-59e3e3084b8 )