view release on metacpan or search on metacpan
handle_params => {
autocork => 1,
linger => 60,
}
Enabling of the `autocork` parameter can improve performance. See
documentation on [AnyEvent::Handle](https://metacpan.org/pod/AnyEvent::Handle) for more information.
- default\_headers => \\%headers
Specifies default headers for all outgoing frames.
default_headers => {
'x-foo' => 'foo_value',
'x-bar' => 'bar_value',
}
- command\_headers
Specifies default headers for particular commands.
The `on_error` callback is called when occurred an error, which was affected
on entire client (e. g. connection error or authentication error). Also the
`on_error` callback is called on command errors if the command callback is not
specified. If the `on_error` callback is not specified, the client just print
an error messages to `STDERR`.
# COMMAND METHODS
To execute the STOMP command you must call appropriate method. STOMP headers
can be specified as command parameters. The client automatically adds
`content-length` header to all outgoing frames. Every command method can also
accept two additional parameters: the `body` parameter where you can specify
the body of the frame, and the `on_receipt` parameter that is the alternative
way to specify the command callback.
If you want to receive `RECEIPT` frame, you must specify `receipt` header.
The `receipt` header can take the special value `auto`. If it set, the
receipt identifier will be generated automatically by the client. The
`RECEIPT` frame is passed to the command callback in first argument as the
object of the class [AnyEvent::Stomper::Frame](https://metacpan.org/pod/AnyEvent::Stomper::Frame). If the `receipt` header is
not specified the first argument of the command callback will be `undef`.
For commands `SUBSCRIBE`, `UNSUBSCRIBE`, `DISCONNECT` the client
automatically adds `receipt` header for internal usage.
The command callback is called in one of two cases depending on the presence of
the `receipt` header. First case, when the command was successfully written to
the socket. Second case, when the `RECEIPT` frame will be received. In first
case `on_receipt` callback can be called synchronously. If any error occurred
during the command execution, the error object is passed to the callback in
second argument. Error object is the instance of the class
[AnyEvent::Stomper::Error](https://metacpan.org/pod/AnyEvent::Stomper::Error).
The command callback is optional. If it is not specified and any error
occurred, the `on_error` callback of the client is called.
The full list of all available headers for every command you can find in STOMP
protocol specification and in documentation on your STOMP server. For various
$stomper->send(
destination => '/queue/foo',
body => 'Hello, world!',
sub {
my $err = $_[1];
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
my $err_frame = $err->frame;
# error handling...
return;
}
}
);
$stomper->send(
destination => '/queue/foo',
receipt => 'auto',
body => 'Hello, world!',
on_receipt => sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
my $err_frame = $err->frame;
# error handling...
return;
}
# receipt handling...
}
);
## subscribe( \[ %params \] \[, $cb->( $msg ) \] )
The method is used to register to listen to a given destination. The
`subscribe` method require the `on_message` callback, which is called on
every received `MESSAGE` frame from the server. The `MESSAGE` frame is passed
to the `on_message` callback in first argument as the object of the class
[AnyEvent::Stomper::Frame](https://metacpan.org/pod/AnyEvent::Stomper::Frame). If the `subscribe` method is called with one
callback, this callback will be act as `on_message` callback.
$stomper->subscribe(
id => 'foo',
destination => '/queue/foo',
sub {
my $msg = shift;
destination => '/queue/foo',
ack => 'client',
on_receipt => sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
my $err_frame = $err->frame;
return;
}
# receipt handling...
},
on_message => sub {
my $msg = shift;
id => 'foo',
destination => '/queue/foo',
sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
my $err_frame = $err->frame;
return;
}
# receipt handling...
}
);
## ack( \[ %params \] \[, $cb->( $receipt, $err ) \] )
The method is used to acknowledge consumption of a message from a subscription
using `client` or `client-individual` acknowledgment. Any messages received
from such a subscription will not be considered to have been consumed until the
message has been acknowledged via an `ack()` method. Method `ack()` must be
called with required parameter `message` in which must be specified the
`MESSAGE` frame.
$stomper->ack( message => $msg );
$stomper->ack(
message => $msg,
receipt => 'auto',
sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
my $err_frame = $err->frame;
# error handling...
}
# receipt handling...
}
);
## nack( \[ %params \] \[, $cb->( $receipt, $err ) \] )
The `nack` method is the opposite of `ack` method. It is used to tell the
server that the client did not consume the message. Method `nack()` must be
called with required parameter `message` in which must be specified the
`MESSAGE` frame.
$stomper->nack( message => $msg );
$stomper->nack(
message => $msg,
receipt => 'auto',
sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
my $err_frame = $err->frame;
# error handling...
}
# receipt handling...
}
);
## begin( \[ %params \] \[, $cb->( $receipt, $err ) \] )
The method `commit` is used to commit a transaction.
## abort( \[ %params \] \[, $cb->( $receipt, $err ) \] )
The method `abort` is used to roll back a transaction.
## disconnect( \[ %params \] \[, $cb->( $receipt, $err ) \] )
A client can disconnect from the server at anytime by closing the socket but
there is no guarantee that the previously sent frames have been received by
the server. To do a graceful shutdown, where the client is assured that all
previous frames have been received by the server, you must call `disconnect`
method and wait for the `RECEIPT` frame.
## execute( $command, \[ %params \] \[, $cb->( $receipt, $err ) \] )
An alternative method to execute commands. In some cases it can be more
convenient.
$stomper->execute( 'SEND',
destination => '/queue/foo',
receipt => 'auto',
body => 'Hello, world!',
sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
my $err_frame = $err->frame;
# error handling...
return;
}
# receipt handling...
}
);
lib/AnyEvent/Stomper.pm view on Meta::CPAN
my $self = shift;
weaken($self);
my $cmd_name;
my $headers;
return sub {
my $handle = shift;
my $frame;
while (1) {
return if $handle->destroyed;
if ( defined $cmd_name ) {
my $content_length;
if ( defined $headers->{'content-length'} ) {
$content_length = $headers->{'content-length'};
return if length( $handle->{rbuf} ) < $content_length + 1;
}
else {
$content_length = index( $handle->{rbuf}, "\0" );
return if $content_length < 0
}
my $body = substr( $handle->{rbuf}, 0, $content_length, '' );
$handle->{rbuf} =~ s/^\0(?:${\(RE_EOL)})*//;
$frame = _new_frame( $cmd_name, $headers, $body );
undef $cmd_name;
undef $headers;
}
else {
$handle->{rbuf} =~ s/^(?:${\(RE_EOL)})+//;
return unless $handle->{rbuf} =~ s/^(.+?)(?:${\(RE_EOL)}){2}//s;
( $cmd_name, my @header_strings ) = split( m/${\(RE_EOL)}/, $1 );
foreach my $header_str (@header_strings) {
my ( $name, $value ) = split( /:/, $header_str, 2 );
$headers->{ _unescape($name) } = _unescape($value);
}
next;
}
$self->_process_frame($frame);
}
};
}
sub _prepare {
my $self = shift;
my $cmd_name = uc(shift);
my $args = shift;
my %params;
lib/AnyEvent/Stomper.pm view on Meta::CPAN
}
my $body = $cmd->{body};
unless ( defined $body ) {
$body = '';
}
unless ( defined $cmd_headers->{'content-length'} ) {
$cmd_headers->{'content-length'} = length($body);
}
my $frame_str = $cmd->{name} . EOL;
while ( my ( $name, $value ) = each %{$cmd_headers} ) {
unless ( defined $value ) {
$value = '';
}
$frame_str .= _escape($name) . ':' . _escape($value) . EOL;
}
$frame_str .= EOL . "$body\0";
$self->{_handle}->push_write($frame_str);
return;
}
sub _login {
my $self = shift;
my ( $cx, $cy ) = @{ $self->{heartbeat} };
if ( $cy > 0 ) {
lib/AnyEvent/Stomper.pm view on Meta::CPAN
}
else { # HASH
return 1 if delete $sub->{pending_acks}{$msg_tag};
}
}
}
return;
}
sub _process_frame {
my $self = shift;
my $frame = shift;
if ( $frame->command eq 'MESSAGE' ) {
$self->_process_message($frame);
}
elsif ( $frame->command eq 'RECEIPT' ) {
$self->_process_receipt($frame);
}
elsif ( $frame->command eq 'ERROR' ) {
if ( defined $self->{_pending_receipts}{CONNECTED} ) {
$frame->headers->{'receipt-id'} = 'CONNECTED';
}
$self->_process_error($frame);
}
else { # CONNECTED
$frame->headers->{'receipt-id'} = 'CONNECTED';
$self->_process_receipt($frame);
}
return;
}
sub _process_message {
my $self = shift;
my $msg = shift;
my $msg_headers = $msg->headers;
my $sub_id = $msg_headers->{subscription} || $msg_headers->{destination};
my $sub = $self->{_subs}{$sub_id};
unless ( defined $sub ) {
my $err = _new_error(
qq{Don't know how process MESSAGE frame. Unknown subscription "$sub_id"},
E_UNEXPECTED_DATA
);
$self->_disconnect($err);
return;
}
my $msg_tag = $MESSAGE_SEQ++;
$msg_headers->{'message-tag'} = $msg_tag;
lib/AnyEvent/Stomper.pm view on Meta::CPAN
sub _process_receipt {
my $self = shift;
my $receipt = shift;
my $receipt_id = $receipt->headers->{'receipt-id'};
my $cmd = delete $self->{_pending_receipts}{$receipt_id};
unless ( defined $cmd ) {
my $err = _new_error(
qq{Unknown RECEIPT frame received: receipt-id=$receipt_id},
E_UNEXPECTED_DATA
);
$self->_disconnect($err);
return;
}
if ( exists $SUBUNSUB_CMDS{ $cmd->{name} } ) {
my $cmd_headers = $cmd->{headers};
my $sub_id = $cmd_headers->{id} || $cmd_headers->{destination};
lib/AnyEvent/Stomper.pm view on Meta::CPAN
$self->_disconnect;
}
$cmd->{on_receipt}->($receipt);
return;
}
sub _process_error {
my $self = shift;
my $err_frame = shift;
my $err_headers = $err_frame->headers;
my $err = _new_error( $err_headers->{message}, E_OPRN_ERROR, $err_frame );
my $cmd;
if ( defined $err_headers->{'receipt-id'} ) {
$cmd = delete $self->{_pending_receipts}{ $err_headers->{'receipt-id'} };
}
if ( defined $cmd ) {
$cmd->{on_receipt}->( undef, $err );
}
else {
lib/AnyEvent/Stomper.pm view on Meta::CPAN
$self->{_subs} = {};
if ( !defined $err && @queued_commands ) {
$err = _new_error( 'Connection closed by client prematurely.',
E_CONN_CLOSED_BY_CLIENT );
}
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
my $err_frame = $err->frame;
$self->{on_error}->($err);
if ( %subs && $err_code != E_CONN_CLOSED_BY_CLIENT ) {
foreach my $sub_id ( keys %subs ) {
my $err = _new_error( qq{Subscription "$sub_id" lost: $err_msg},
$err_code, $err_frame );
my $sub = $subs{$sub_id};
$sub->{on_receipt}->( undef, $err );
}
}
foreach my $cmd (@queued_commands) {
my $err = _new_error( qq{Operation "$cmd->{name}" aborted: $err_msg},
$err_code, $err_frame );
$cmd->{on_receipt}->( undef, $err );
}
}
return;
}
sub _queued_commands {
my $self = shift;
lib/AnyEvent/Stomper.pm view on Meta::CPAN
}
sub _unescape {
my $str = shift;
$str =~ s/(\\[rnc\\])/$UNESCAPE_MAP{$1}/ge;
return $str;
}
sub _new_frame {
return AnyEvent::Stomper::Frame->new(@_);
}
sub _new_error {
return AnyEvent::Stomper::Error->new(@_);
}
sub DESTROY {
my $self = shift;
lib/AnyEvent/Stomper.pm view on Meta::CPAN
handle_params => {
autocork => 1,
linger => 60,
}
Enabling of the C<autocork> parameter can improve performance. See
documentation on L<AnyEvent::Handle> for more information.
=item default_headers => \%headers
Specifies default headers for all outgoing frames.
default_headers => {
'x-foo' => 'foo_value',
'x-bar' => 'bar_value',
}
=item command_headers
Specifies default headers for particular commands.
lib/AnyEvent/Stomper.pm view on Meta::CPAN
C<on_error> callback is called on command errors if the command callback is not
specified. If the C<on_error> callback is not specified, the client just print
an error messages to C<STDERR>.
=back
=head1 COMMAND METHODS
To execute the STOMP command you must call appropriate method. STOMP headers
can be specified as command parameters. The client automatically adds
C<content-length> header to all outgoing frames. Every command method can also
accept two additional parameters: the C<body> parameter where you can specify
the body of the frame, and the C<on_receipt> parameter that is the alternative
way to specify the command callback.
If you want to receive C<RECEIPT> frame, you must specify C<receipt> header.
The C<receipt> header can take the special value C<auto>. If it set, the
receipt identifier will be generated automatically by the client. The
C<RECEIPT> frame is passed to the command callback in first argument as the
object of the class L<AnyEvent::Stomper::Frame>. If the C<receipt> header is
not specified the first argument of the command callback will be C<undef>.
For commands C<SUBSCRIBE>, C<UNSUBSCRIBE>, C<DISCONNECT> the client
automatically adds C<receipt> header for internal usage.
The command callback is called in one of two cases depending on the presence of
the C<receipt> header. First case, when the command was successfully written to
the socket. Second case, when the C<RECEIPT> frame will be received. In first
case C<on_receipt> callback can be called synchronously. If any error occurred
during the command execution, the error object is passed to the callback in
second argument. Error object is the instance of the class
L<AnyEvent::Stomper::Error>.
The command callback is optional. If it is not specified and any error
occurred, the C<on_error> callback of the client is called.
The full list of all available headers for every command you can find in STOMP
protocol specification and in documentation on your STOMP server. For various
lib/AnyEvent/Stomper.pm view on Meta::CPAN
$stomper->send(
destination => '/queue/foo',
body => 'Hello, world!',
sub {
my $err = $_[1];
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
my $err_frame = $err->frame;
# error handling...
return;
}
}
);
$stomper->send(
destination => '/queue/foo',
receipt => 'auto',
body => 'Hello, world!',
on_receipt => sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
my $err_frame = $err->frame;
# error handling...
return;
}
# receipt handling...
}
);
=head2 subscribe( [ %params ] [, $cb->( $msg ) ] )
The method is used to register to listen to a given destination. The
C<subscribe> method require the C<on_message> callback, which is called on
every received C<MESSAGE> frame from the server. The C<MESSAGE> frame is passed
to the C<on_message> callback in first argument as the object of the class
L<AnyEvent::Stomper::Frame>. If the C<subscribe> method is called with one
callback, this callback will be act as C<on_message> callback.
$stomper->subscribe(
id => 'foo',
destination => '/queue/foo',
sub {
my $msg = shift;
lib/AnyEvent/Stomper.pm view on Meta::CPAN
destination => '/queue/foo',
ack => 'client',
on_receipt => sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
my $err_frame = $err->frame;
return;
}
# receipt handling...
},
on_message => sub {
my $msg = shift;
lib/AnyEvent/Stomper.pm view on Meta::CPAN
id => 'foo',
destination => '/queue/foo',
sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
my $err_frame = $err->frame;
return;
}
# receipt handling...
}
);
=head2 ack( [ %params ] [, $cb->( $receipt, $err ) ] )
The method is used to acknowledge consumption of a message from a subscription
using C<client> or C<client-individual> acknowledgment. Any messages received
from such a subscription will not be considered to have been consumed until the
message has been acknowledged via an C<ack()> method. Method C<ack()> must be
called with required parameter C<message> in which must be specified the
C<MESSAGE> frame.
$stomper->ack( message => $msg );
$stomper->ack(
message => $msg,
receipt => 'auto',
sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
my $err_frame = $err->frame;
# error handling...
}
# receipt handling...
}
);
=head2 nack( [ %params ] [, $cb->( $receipt, $err ) ] )
The C<nack> method is the opposite of C<ack> method. It is used to tell the
server that the client did not consume the message. Method C<nack()> must be
called with required parameter C<message> in which must be specified the
C<MESSAGE> frame.
$stomper->nack( message => $msg );
$stomper->nack(
message => $msg,
receipt => 'auto',
sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
my $err_frame = $err->frame;
# error handling...
}
# receipt handling...
}
);
=head2 begin( [ %params ] [, $cb->( $receipt, $err ) ] )
lib/AnyEvent/Stomper.pm view on Meta::CPAN
The method C<commit> is used to commit a transaction.
=head2 abort( [ %params ] [, $cb->( $receipt, $err ) ] )
The method C<abort> is used to roll back a transaction.
=head2 disconnect( [ %params ] [, $cb->( $receipt, $err ) ] )
A client can disconnect from the server at anytime by closing the socket but
there is no guarantee that the previously sent frames have been received by
the server. To do a graceful shutdown, where the client is assured that all
previous frames have been received by the server, you must call C<disconnect>
method and wait for the C<RECEIPT> frame.
=head2 execute( $command, [ %params ] [, $cb->( $receipt, $err ) ] )
An alternative method to execute commands. In some cases it can be more
convenient.
$stomper->execute( 'SEND',
destination => '/queue/foo',
receipt => 'auto',
body => 'Hello, world!',
sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
my $err_frame = $err->frame;
# error handling...
return;
}
# receipt handling...
}
);
lib/AnyEvent/Stomper/Cluster.pm view on Meta::CPAN
# error handling...
},
);
=over
=item nodes => \@nodes
Specifies the list of nodes. Parameter should contain array of hashes. Each
hash should contain C<host> and C<port> elements. At the start the client gets
random node from this list, connects to it and sends all frames to this node.
If current active node fails, the client gets next node from the list.
=item login => $login
The user identifier used to authenticate against a secured STOMP server. Must
be the same for all nodes.
=item passcode => $passcode
The password used to authenticate against a secured STOMP server. Must be the
lib/AnyEvent/Stomper/Cluster.pm view on Meta::CPAN
handle_params => {
autocork => 1,
linger => 60,
}
Enabling of the C<autocork> parameter can improve performance. See
documentation on L<AnyEvent::Handle> for more information.
=item default_headers => \%headers
Specifies default headers for all outgoing frames.
default_headers => {
'x-foo' => 'foo_value',
'x-bar' => 'bar_value',
}
=item command_headers
Specifies default headers for particular commands.
lib/AnyEvent/Stomper/Cluster.pm view on Meta::CPAN
The C<on_error> callback is called on command errors if the command callback
is not specified. If the C<on_error> callback is not specified, the client
just print an error messages to C<STDERR>.
=back
=head1 COMMAND METHODS
To execute the STOMP command you must call appropriate method. STOMP headers
can be specified as command parameters. The client automatically adds
C<content-length> header to all outgoing frames. Every command method can also
accept two additional parameters: the C<body> parameter where you can specify
the body of the frame, and the C<on_receipt> parameter that is the alternative
way to specify the command callback.
If you want to receive C<RECEIPT> frame, you must specify C<receipt> header.
The C<receipt> header can take the special value C<auto>. If it set, the
receipt identifier will be generated automatically by the client. The
C<RECEIPT> frame is passed to the command callback in first argument as the
object of the class L<AnyEvent::Stomper::Frame>. If the C<receipt> header is
not specified the first argument of the command callback will be C<undef>.
For commands C<SUBSCRIBE>, C<UNSUBSCRIBE>, C<DISCONNECT> the client
automatically adds C<receipt> header for internal usage.
The command callback is called in one of two cases depending on the presence of
the C<receipt> header. First case, when the command was successfully sent to
the server. Second case, when the C<RECEIPT> frame will be received. If any
error occurred during the command execution, the error object is passed to the
callback in second argument. Error object is the instance of the class
L<AnyEvent::Stomper::Error>.
The command callback is optional. If it is not specified and any error
occurred, the C<on_error> callback of the client is called.
If you want to track errors on particular nodes for particular command, you
must specify C<on_node_error> callback in command method.
lib/AnyEvent/Stomper/Cluster.pm view on Meta::CPAN
destination => '/queue/foo',
body => 'Hello, world!',
on_receipt => sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
my $err_frame = $err->frame;
# error handling...
return;
}
# receipt handling...
},
on_node_error => sub {
lib/AnyEvent/Stomper/Cluster.pm view on Meta::CPAN
$cluster->send(
destination => '/queue/foo',
body => 'Hello, world!',
sub {
my $err = $_[1];
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
my $err_frame = $err->frame;
# error handling...
return;
}
}
);
$cluster->send(
destination => '/queue/foo',
receipt => 'auto',
body => 'Hello, world!',
on_receipt => sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
my $err_frame = $err->frame;
# error handling...
return;
}
# receipt handling...
}
);
=head2 subscribe( [ %params ] [, $cb->( $msg ) ] )
The method is used to register to listen to a given destination. The
C<subscribe> method require the C<on_message> callback, which is called on
every received C<MESSAGE> frame from the server. The C<MESSAGE> frame is passed
to the C<on_message> callback in first argument as the object of the class
L<AnyEvent::Stomper::Frame>. If the C<subscribe> method is called with one
callback, this callback will be act as C<on_message> callback.
$cluster->subscribe(
id => 'foo',
destination => '/queue/foo',
sub {
my $msg = shift;
lib/AnyEvent/Stomper/Cluster.pm view on Meta::CPAN
destination => '/queue/foo',
ack => 'client',
on_receipt => sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
my $err_frame = $err->frame;
return;
}
# receipt handling...
},
on_message => sub {
my $msg = shift;
lib/AnyEvent/Stomper/Cluster.pm view on Meta::CPAN
id => 'foo',
destination => '/queue/foo',
sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
my $err_frame = $err->frame;
return;
}
# receipt handling...
}
);
=head2 ack( [ %params ] [, $cb->( $receipt, $err ) ] )
The method is used to acknowledge consumption of a message from a subscription
using C<client> or C<client-individual> acknowledgment. Any messages received
from such a subscription will not be considered to have been consumed until the
message has been acknowledged via an C<ack()> method. Method C<ack()> must be
called with required parameter C<message> in which must be specified the
C<MESSAGE> frame.
$stomper->ack( message => $msg );
$stomper->ack(
message => $msg,
receipt => 'auto',
sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
my $err_frame = $err->frame;
# error handling...
}
# receipt handling...
}
);
=head2 nack( [ %params ] [, $cb->( $receipt, $err ) ] )
The C<nack> method is the opposite of C<ack> method. It is used to tell the
server that the client did not consume the message. Method C<nack()> must be
called with required parameter C<message> in which must be specified the
C<MESSAGE> frame.
$stomper->nack( message => $msg );
$stomper->nack(
message => $msg,
receipt => 'auto',
sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
my $err_frame = $err->frame;
# error handling...
}
# receipt handling...
}
);
=head2 begin( [ %params ] [, $cb->( $receipt, $err ) ] )
lib/AnyEvent/Stomper/Cluster.pm view on Meta::CPAN
The method C<commit> is used to commit a transaction.
=head2 abort( [ %params ] [, $cb->( $receipt, $err ) ] )
The method C<abort> is used to roll back a transaction.
=head2 disconnect( [ %params ] [, $cb->( $receipt, $err ) ] )
A client can disconnect from the current active node at anytime by closing the
socket but there is no guarantee that the previously sent frames have been
received by the node. To do a graceful shutdown, where the client is assured
that all previous frames have been received by the node, you must call
C<disconnect> method and wait for the C<RECEIPT> frame.
=head2 execute( $command, [ %params ] [, $cb->( $receipt, $err ) ] )
An alternative method to execute commands. In some cases it can be more
convenient.
$cluster->execute( 'SEND',
destination => '/queue/foo',
receipt => 'auto',
body => 'Hello, world!',
sub {
my $receipt = shift;
my $err = shift;
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
my $err_frame = $err->frame;
# error handling...
return;
}
# receipt handling...
}
);
lib/AnyEvent/Stomper/Error.pm view on Meta::CPAN
E_OPRN_ERROR => 5,
E_UNEXPECTED_DATA => 6,
E_READ_TIMEDOUT => 7,
);
sub new {
my $class = shift;
my $err_msg = shift;
my $err_code = shift;
my $err_frame = shift;
my $self = bless {}, $class;
$self->{message} = $err_msg;
$self->{code} = $err_code;
$self->{frame} = $err_frame;
return $self;
}
# Generate getters
{
no strict qw( refs );
foreach my $name ( qw( message code frame ) )
{
*{$name} = sub {
my $self = shift;
return $self->{$name};
}
}
}
1;
__END__
lib/AnyEvent/Stomper/Error.pm view on Meta::CPAN
AnyEvent::Stomper::Error - Class of error for AnyEvent::Stomper
=head1 DESCRIPTION
Class of error for L<AnyEvent::Stomper>. Objects of this class can be passed
to callbacks.
=head1 CONSTRUCTOR
=head2 new( $err_msg, $err_code [, $frame ] )
Creates error object.
=head1 METHODS
=head2 message()
Gets error message.
=head2 code()
Gets error code.
=head2 frame()
Gets error frame
=head1 SEE ALSO
L<AnyEvent::Stomper>
=head1 AUTHOR
Eugene Ponizovsky, E<lt>ponizovsky@gmail.comE<gt>
Sponsored by SMS Online, E<lt>dev.opensource@sms-online.comE<gt>
lib/AnyEvent/Stomper/Frame.pm view on Meta::CPAN
return $self->{$name};
}
}
}
1;
__END__
=head1 NAME
AnyEvent::Stomper::Frame - Class of STOMP frame for AnyEvent::Stomper
=head1 DESCRIPTION
Class of frame for L<AnyEvent::Stomper>. Objects of this class can be passed
to callbacks.
=head1 CONSTRUCTOR
=head2 new( $command, \%headers [, $body ] )
Creates error object.
=head1 METHODS
=head2 command()
Gets command name
=head2 headers()
Gets frame headers
=head2 body()
Gets frame body
=head1 SEE ALSO
L<AnyEvent::Stomper>
=head1 AUTHOR
Eugene Ponizovsky, E<lt>ponizovsky@gmail.comE<gt>
Sponsored by SMS Online, E<lt>dev.opensource@sms-online.comE<gt>
t/00-base.t view on Meta::CPAN
use 5.008000;
use strict;
use warnings;
use Test::More tests => 43;
my $t_client_class;
my $t_cluster_class;
my $t_frame_class;
my $t_err_class;
BEGIN {
$t_client_class = 'AnyEvent::Stomper';
use_ok( $t_client_class );
$t_cluster_class = 'AnyEvent::Stomper::Cluster';
use_ok( $t_cluster_class );
$t_frame_class = 'AnyEvent::Stomper::Frame';
use_ok( $t_frame_class );
$t_err_class = 'AnyEvent::Stomper::Error';
use_ok( $t_err_class );
}
can_ok( $t_client_class, 'new' );
my $stomper = new_ok( $t_client_class => [ lazy => 1 ] );
can_ok( $stomper, 'execute' );
can_ok( $stomper, 'send' );
t/00-base.t view on Meta::CPAN
can_ok( $cluster, 'abort' );
can_ok( $cluster, 'force_disconnect' );
can_ok( $cluster, 'nodes' );
my @nodes = $cluster->nodes;
is ( scalar @nodes, 3, 'cluster; get all nodes; number' );
foreach my $node (@nodes) {
isa_ok( $node, 'AnyEvent::Stomper' );
}
can_ok( $t_frame_class, 'new' );
my $frame = new_ok( $t_frame_class => [ 'MESSAGE', { 'message-id' => '123' },
'Hello, world!' ] );
can_ok( $frame, 'command' );
can_ok( $frame, 'headers' );
can_ok( $frame, 'body' );
can_ok( $t_err_class, 'new' );
my $err = new_ok( $t_err_class => [ 'Some error', 6 ] );
can_ok( $err, 'message' );
can_ok( $err, 'code' );