AnyEvent-STOMP-Client

 view release on metacpan or  search on metacpan

Changes  view on Meta::CPAN

Revision history for Perl module AnyEvent::STOMP::Client.

0.42  2024-06-18
      - Bugfix to remove NULL character frame delimiter from frame body

0.41  2022-02-14
      - Avoid memory leak by allowing to fully shutdown the client object (by @kbucheli)
      - Properly fish error message from the header of the stomp ERROR message (by @kbucheli)

0.40  2021-10-02
      - Properly propagate low-level errors and error messages in the on_error callback (by @kbucheli)

0.39  2014-02-12
      - Made connect method of AnyEvent::STOMP::Client not die.

README.pod  view on Meta::CPAN

  AnyEvent->condvar->recv;


=head1 DESCRIPTION

AnyEvent::STOMP::Client provides a STOMP (Simple Text Oriented Messaging
Protocol) client. Thanks to AnyEvent, AnyEvent::STOMP::Client is completely
non-blocking, by making extensive use of the AnyEvent::Handle and timers (and,
under the hood, AnyEvent::Socket). Building on Object::Event,
AnyEvent::STOMP::Client implements various events (e.g. the MESSAGE event, when
a STOMP MESSAGE frame is received) and offers callbacks for these (e.g.
on_message($callback)).

=head1 METHODS

=head2 $client = new $host, $port, $connect_headers, $tls_context

Create an instance of C<AnyEvent::STOMP::Client>.

=over

README.pod  view on Meta::CPAN

message broker is running.

=item C<$port>

Integer, optional, defaults to C<61613>. The TCP port we connect to. I.e. the
port where the message broker instance is listening.

=item C<$connect_headers>

Hash, optional, empty by default. May be used to add arbitrary headers to the
STOMP C<CONNECT> frame. STOMP login headers would, for example, be supplied
using this parameter.

=item C<$tls_context>

Hash, optional, undef by default. May be used to supply a SSL/TLS context
directly to C<AnyEvent::Handle>. See L<AnyEvent::TLS> for documentation.

=back

=head3 Example

README.pod  view on Meta::CPAN

    {'login' => 'guest', 'passcode' => 'guest', 'virtual-host' => 'foo'}
); >>

=head2 $client = connect

Connect to the specified  STOMP message broker. Croaks if you already
established a connection.

=head2 $client->disconnect

Sends a C<DISCONNECT> STOMP frame to the message broker (if we are still
connected). Croaks, if you are trying to disconnect without actually being
connected.

=over

=item C<$ungraceful>

Boolean, defaults to 0. If the ungraceful option is set, then simply a
C<DISCONNECT> STOMP frame is sent and the connection state is considered to be
disconnected without awaiting any response from the server.
If, however, the option is not set, then a receipt is asked for and the
connection is only considered to be no longer established upon receiving a
receipt for the C<DISCONNECT> frame.

=back

=head2 bool $client->is_connected

Check whether we are still connected to the broker. May only be accurate if
STOMP heart-beats are used.

=head2 $subscription_id = $client->subscribe $destination, $ack_mode, $additional_headers

Subscribe to a destination by sending a C<SUBSCRIBE> STOMP frame to the message
broker. Returns the subscription identifier.

=over

=item C<$destination>

String, mandatory. The destination to which we want to subscribe to.

=item C<$ack_mode>

C<auto> | C<client> | C<client-individual>, optional, defaults to C<auto>.
See the STOMP documentation for further information on acknowledgement modes.

=item C<$additional_headers>

Used to pass arbitrary headers to the C<SUBSCRIBE> STOMP frame. Broker specific
flow control parameters for example is what would want to supply here.

=back

=head2 $client->unsubscribe $destination, $additional_headers

Unsubscribe from a destination by sending an C<UNSUBSCRIBE> STOMP frame to the
message broker.

=over

=item C<$destination>

String, mandatory. The destination from which we want to unsubscribe.

=item C<$additional_headers>

Used to pass arbitrary headers to the C<UNSUBSCRIBE> STOMP frame.

=back

=head2 $client->send $destination, $headers, $body

Send a STOMP C<SEND> frame to the message broker.

=over

=item C<$destination>

String, mandatory. The destination to which to send the message to.

=item C<$header>

Hash, optional, empty by default. Arbitrary headers included in the C<SEND>
frame. See the STOMP documentation for supported headers.

=item C<$body>

String, optional, empty by default. The body of the message, according to the
content-type specified in the header.

=back

=head2 $client->ack $ack_id, $transaction_id

Send an C<ACK> frame to acknowledge a received message.

=over

=item C<$ack_id>

String, mandatory. Has to match the C<ack> header of the message that is to be
acknowledged.

=item C<$transaction_id>

String, optional. A transaction identifier, if the C<ACK> is part of a transaction.

=back

=head2 $client->nack $ack_id, $transaction_id

Send an C<NACK> frame to NOT acknowledge a received message.

=over

=item C<$ack_id>

String, mandatory. Has to match the C<ack> header of the message that is to be
nacked.

=item C<$transaction_id>

README.pod  view on Meta::CPAN


=over

=item C<$transaction_id>

String, mandatory. A unique identifier for the transaction.

=item C<$additional_headers>

Hash, optional, empty by default. Used to pass arbitrary headers to the STOMP
frame.

=back

=head2 $client->commit_transaction $transaction_id, $additional_headers

Commit a STOMP transaction.

=over

=item C<$transaction_id>

String, mandatory. A unique identifier for the transaction.

=item C<$additional_headers>

Hash, optional, empty by default. Used to pass arbitrary headers to the STOMP
frame.

=back

=head2 $client->abort_transaction $transaction_id, $additional_headers

Abort a STOMP transaction.

=over

=item C<$transaction_id>

String, mandatory. A unique identifier for the transaction.

=item C<$additional_headers>

Hash, optional, empty by default. Used to pass arbitrary headers to the STOMP
frame.

=back

=head2 $client->destroy

Disconnects and cleans up all callbacks. To be called when the client object
is not used any more and should be cleaned up.

=head2 Callbacks

In order for the C<AnyEvent::STOMP::Client> to be useful, callback subroutines
can be registered for the following events:

=head3 $guard = $client->on_connected $callback

Invoked when a CONNECTED frame is received. Parameters passed to the callback:
C<$self>, C<$header_hashref>.

=head3 $guard = $client->on_disconnected $callback

Invoked after having successfully disconnected from a broker. I.e. when a
callback is registered for this event and the C<disconnect> subroutine is
called, then a receipt header is included in the DISCONNECT frame and the
disconnected event is fired upon receiving the receipt for the DISCONNECT frame.
Parameters passed to the callback: C<$self>, C<$host>, C<$port>.

=head3 $guard = $client->on_connection_lost $callback

Invoked when either the C<on_error> callback specified in the
C<AnyEvent::Handle> constructor is called, or when no more
heartbeats arrive from the server.
Parameters passed to the callback: C<$self>, C<$host>, C<$port>, C<$error_message>.

=head3 $guard = $client->on_connect_error $callback

Invoked when the C<on_connect_error> callback specified in the
C<AnyEvent::Handle> constructor is called.
Parameters passed to the callback: C<$self>, C<$host>, C<$port>.

=head3 $guard = $client->on_send_frame $callback

Invoked when a STOMP frame is sent. Parameters passed to the callback:
C<$self>, C<$frame> (the sent frame as string).

=head3 $guard = $client->on_send $callback

Invoked when a STOMP SEND command is sent. Parameters passed to the callback:
C<$self>, C<$frame> (the sent frame as string).

=head3 $guard = $client->on_ack $callback

Invoked when a STOMP ACK command is sent. Parameters passed to the callback:
C<$self>, C<$frame> (the sent frame as string).

=head3 $guard = $client->on_nack $callback

Invoked when a STOMP NACK command is sent. Parameters passed to the callback:
C<$self>, C<$frame> (the sent frame as string).

=head3 $guard = $client->on_read_frame $callback

Invoked when a STOMP frame is received (irrespective of the STOMP command).
Parameters passed to the callback: C<$self>, C<$command>, C<$header_hashref>,
C<$body> (may be C<undef>, if the frame is not specified to contain a body).

=head3 $guard = $client->on_message $callback $destination

Invoked when a MESSAGE frame is received. Optionally, a C<$destination>
parameter may be specified, resulting in the callback only being invoked,
when a MESSAGE is received from that specific destination.
Parameters passed to the callback: C<$self>, C<$header_hashref>, C<$body>.

=head3 $guard = $client->on_receipt $callback

Invoked when a RECEIPT frame is received. Parameters passed to the callback:
C<$self>, C<$header_hashref>.

=head3 $guard = $client->on_error $callback

Invoked when an ERROR frame is received. Parameters passed to the callback: C<$self>, C<$host>, C<$port>, C<$error_message>.

=head3 $guard = $client->on_subscribed $callback

Invoked after having successfully subscribed to a destination. Works behind the
scenes like the C<on_disconnected> described above. Parameters passed to the
callback: C<$self>, C<$destination>.

=head3 $guard = $client->on_unsubscribed $callback

Invoked after having successfully unsubscribed to a destination. Works behind

lib/AnyEvent/STOMP/Client.pm  view on Meta::CPAN

        connect => [$self->{host}, $self->{port}],
        keep_alive => 1,
        no_delay => 1,
        on_eof => sub {
            undef $self->{handle};
            $self->{connected} = 0;
            $self->event('TRANSPORT_DISCONNECTED', $self->{host}, $self->{port});
        },
        on_connect => sub {
            $self->event('TRANSPORT_CONNECTED', $self->{host}, $self->{port});
            $self->send_frame('CONNECT', $self->{connect_headers});
        },
        on_connect_error => sub {
            my ($handle, $error_message) = @_;
            $handle->destroy;
            undef $self->{handle};
            $self->{connected} = 0;
            $self->event('TRANSPORT_CONNECT_ERROR', $self->{host}, $self->{port}, $error_message);
        },
        on_error => sub {
            my ($handle, $fatal, $error_message) = @_;
            $self->event('ERROR', $self->{host}, $self->{port}, $error_message);
            if ($fatal) {
                undef $self->{handle};
                $self->{connected} = 0;
                # $handle->destroy() will be called automatically after this callback, see
                # https://metacpan.org/pod/AnyEvent::Handle#on_error-=%3E-$cb-%3E($handle,-$fatal,-$message)
                $self->event('CONNECTION_LOST', $self->{host}, $self->{port}, $error_message);
            }
        },
        on_read => sub {
            $self->read_frame;
        },
        %{$self->{tls_hash}},
    );
}

sub disconnect {
    my $self = shift;
    my $ungraceful = shift;

    unless ($self->is_connected) {
        if (defined $self->{handle}) {
            $self->{handle}->destroy;
            delete $self->{handle};
        }
        $self->event('DISCONNECTED', $self->{host}, $self->{port}, $ungraceful);
        delete $self->{heartbeat}{timer};
        return;
    }

    if (defined $ungraceful and $ungraceful) {
        $self->send_frame('DISCONNECT');
        $self->{connected} = 0;
        if (defined $self->{handle}) {
            $self->{handle}->push_shutdown;
            $self->{handle}->destroy;
            delete $self->{handle};
        }
        $self->event('DISCONNECTED', $self->{host}, $self->{port}, $ungraceful);
        delete $self->{heartbeat}{timer};
    }
    else {
        my $receipt_id = $self->get_uuid;

        $self->send_frame('DISCONNECT', {receipt => $receipt_id,});

        $self->before_receipt(
            sub {
                my ($self, $header) = @_;

                if ($header->{'receipt-id'} eq $receipt_id) {
                    $self->{connected} = 0;
                    $self->stop_event;
                    $self->unreg_me;
                    if (defined $self->{handle}) {

lib/AnyEvent/STOMP/Client.pm  view on Meta::CPAN

                    my ($self, $receipt_header) = @_;
                    if ($receipt_header->{'receipt-id'} eq $header->{receipt}) {
                        $self->event('SUBSCRIBED', $header->{destination});
                        $self->stop_event;
                        $self->unreg_me;
                    }
                }
            );
        }

        $self->send_frame('SUBSCRIBE', $header);
    }

    return $self->{subscriptions}{$destination};
}

sub unsubscribe {
    my $self = shift;
    my $destination = shift;
    my $additional_headers = shift || {};

lib/AnyEvent/STOMP/Client.pm  view on Meta::CPAN

                my ($self, $receipt_header) = @_;
                if ($receipt_header->{'receipt-id'} eq $header->{receipt}) {
                    $self->event('UNSUBSCRIBED', $destination);
                    $self->stop_event;
                    $self->unreg_me;
                }
            }
        );
    }

    $self->send_frame('UNSUBSCRIBE', $header);
    delete $self->{subscriptions}{$destination};
}

sub header_hash2string {
    my $header_hashref = shift;
    return join($EOL, map { "$_:$header_hashref->{$_}" } keys %$header_hashref);
}

sub header_string2hash {
    my $header_string = shift;

lib/AnyEvent/STOMP/Client.pm  view on Meta::CPAN

        }
        if ($k =~ m/(\\.)/) {
            $k =~ s/(\\.)/$DECODE_MAP{$1}/eg || croak "Invalid header key.";
        }
        $result_hashref->{$k} = $v;
    }

    return $result_hashref;
}

sub send_frame {
    my ($self, $command, $header_hashref, $body) = @_;

    unless ($self->is_connected or $command eq 'CONNECT') {
        croak "Have you considered connecting to a STOMP broker first before "
            ."trying to send something?";
    }

    utf8::encode($command);

    my $header;
    if ($command eq 'CONNECT') {
        $header = header_hash2string($header_hashref);
    }
    else {
        $header = header_hash2string(encode_header($header_hashref));
    }
    utf8::encode($header);

    my $frame;
    if ($command eq 'SEND') {
        $body = '' unless defined $body;
        $frame = $command.$EOL.$header.$EOL.$EOL.$body.$NULL;
    }
    else {
        $frame = $command.$EOL.$header.$EOL.$EOL.$NULL;
    }

    $self->event('SEND_FRAME', $frame);
    $self->event($command, $frame) if ($command =~ m/SEND|ACK|NACK|/);
    $self->{handle}->push_write($frame);
    $self->reset_client_heartbeat_timer;
}

sub send {
    my ($self, $destination, $headers, $body) = @_;

    if (defined $destination) {
        $headers->{destination} = $destination;
    }
    else {

lib/AnyEvent/STOMP/Client.pm  view on Meta::CPAN

    }

    unless (defined $headers->{'content-length'}) {
        $headers->{'content-length'} = length $body || 0;
    }

    unless (defined $headers->{'content-type'}) {
        carp "It is strongly recommended to set the 'content-type' header.";
    }

    $self->send_frame('SEND', $headers, $body);
}

sub ack {
    my ($self, $ack_id, $transaction) = @_;

    unless ($ack_id) {
        croak "I do really need the message's ack header to ACK it.";
    }

    my $header = {id => $ack_id,};
    $header->{transaction} = $transaction if (defined $transaction);

    $self->send_frame('ACK', $header);
}

sub nack {
    my ($self, $ack_id, $transaction) = @_;

    unless ($ack_id) {
        croak "I do really need the message's ack header to NACK it.";
    }

    my $header = {id => $ack_id,};
    $header->{transaction} = $transaction if (defined $transaction);

    $self->send_frame('NACK', $header);
}

sub send_heartbeat {
    my $self = shift;

    if ($self->is_connected) {
        $self->{handle}->push_write($EOL);
        $self->reset_client_heartbeat_timer;
    }
}

lib/AnyEvent/STOMP/Client.pm  view on Meta::CPAN

    my $self = shift;
    my $id = shift;
    my $additional_headers = shift || {};

    croak "I really need a transaction identifier here!" unless (defined $id);

    if (defined $self->{transactions}{$id}) {
        carp "You've already begun transaction '$id'";
    }
    else {
        $self->send_frame('BEGIN', {transaction => $id, %$additional_headers,});
        $self->{transactions}{$id} = 1;
    }
}

sub commit_transaction {
    my $self = shift;
    my $id = shift;
    my $additional_headers = shift || {};

    croak "I really need a transaction identifier here!" unless (defined $id);

    unless (defined $self->{transactions}{$id}) {
        carp "You've already commited transaction '$id'";
    }

    $self->send_frame('COMMIT', {transaction => $id, %$additional_headers,});
    delete $self->{transactions}{$id};
}

sub abort_transaction {
    my $self = shift;
    my $id = shift;
    my $additional_headers = shift || {};

    croak "I really need a transaction identifier here!" unless (defined $id);

    unless (defined $self->{transactions}{$id}) {
        carp "You've already commited transaction '$id'";
    }

    $self->send_frame('ABORT', {transaction => $id, %$additional_headers,});
    delete $self->{transactions}{$id};
}

sub read_frame {
    my $self = shift;
    $self->{handle}->unshift_read(
        line => sub {
            my ($handle, $command, $eol) = @_;

            $self->reset_server_heartbeat_timer;

            if ($command =~ /^(CONNECTED|MESSAGE|RECEIPT|ERROR)$/) {
                $command = $1;
            }

lib/AnyEvent/STOMP/Client.pm  view on Meta::CPAN

                return;
            }

            $self->{handle}->unshift_read(
                regex => qr<\r?\n\r?\n>,
                cb => sub {
                    my ($handle, $header_string) = @_;
                    my $header_hashref = header_string2hash($header_string);
                    my $args;

                    # The headers of the CONNECTED frame are not en-/decoded
                    # for backwards compatibility with STOMP 1.0
                    unless ($command eq 'CONNECTED') {
                        $header_hashref = decode_header($header_hashref);
                    }

                    if ($command =~ m/MESSAGE|ERROR/) {
                        if (defined $header_hashref->{'content-length'}) {
                            $args->{chunk} = $header_hashref->{'content-length'};
                        }
                        else {

lib/AnyEvent/STOMP/Client.pm  view on Meta::CPAN

                                    $self->event(
                                        'ERROR',
                                        $self->{host},
                                        $self->{port},
                                        $body || $header_hashref->{message} || 'unknown'
                                    );
                                }
                                else {
                                    $self->event('MESSAGE', $header_hashref, $body);

                                    # If frame end was determined by matching
                                    # for a NULL character, then this character
                                    # is not part of the frame body and thus
                                    # removed
                                    if (exists $args->{regex}) {
                                        $body =~ s/\0$//g;
                                    }


                                    if (defined $header_hashref->{subscription}) {
                                        $self->event(
                                            "MESSAGE-$header_hashref->{subscription}",
                                            $header_hashref,

lib/AnyEvent/STOMP/Client.pm  view on Meta::CPAN

}

sub on_connection_lost {
    return shift->reg_cb('CONNECTION_LOST', shift);
}

sub on_connect_error {
    return shift->reg_cb('TRANSPORT_CONNECT_ERROR', shift);
}

sub on_send_frame {
    return shift->reg_cb('SEND_FRAME', shift);
}

sub on_send {
    return shift->reg_cb('SEND', shift);
}

sub on_ack {
    return shift->reg_cb('ACK', shift);
}

sub on_nack {
    return shift->reg_cb('NACK', shift);
}

sub on_read_frame {
    return shift->reg_cb('READ_FRAME', shift);
}

sub on_message {
    my ($self, $cb, $destination) = @_;

    if (defined $destination) {
        unless (defined $self->{subscriptions}{$destination}) {
            carp "You haven't subscribed to '"
                .$self->{subscriptions}{$destination}."' yet. This callback may never be called.";

lib/AnyEvent/STOMP/Client.pm  view on Meta::CPAN

  AnyEvent->condvar->recv;


=head1 DESCRIPTION

AnyEvent::STOMP::Client provides a STOMP (Simple Text Oriented Messaging
Protocol) client. Thanks to AnyEvent, AnyEvent::STOMP::Client is completely
non-blocking, by making extensive use of the AnyEvent::Handle and timers (and,
under the hood, AnyEvent::Socket). Building on Object::Event,
AnyEvent::STOMP::Client implements various events (e.g. the MESSAGE event, when
a STOMP MESSAGE frame is received) and offers callbacks for these (e.g.
on_message($callback)).

=head1 METHODS

=head2 $client = new $host, $port, $connect_headers, $tls_context

Create an instance of C<AnyEvent::STOMP::Client>.

=over

lib/AnyEvent/STOMP/Client.pm  view on Meta::CPAN

message broker is running.

=item C<$port>

Integer, optional, defaults to C<61613>. The TCP port we connect to. I.e. the
port where the message broker instance is listening.

=item C<$connect_headers>

Hash, optional, empty by default. May be used to add arbitrary headers to the
STOMP C<CONNECT> frame. STOMP login headers would, for example, be supplied
using this parameter.

=item C<$tls_context>

Hash, optional, undef by default. May be used to supply a SSL/TLS context
directly to C<AnyEvent::Handle>. See L<AnyEvent::TLS> for documentation.

=back

=head3 Example

lib/AnyEvent/STOMP/Client.pm  view on Meta::CPAN

    {'login' => 'guest', 'passcode' => 'guest', 'virtual-host' => 'foo'}
); >>

=head2 $client = connect

Connect to the specified  STOMP message broker. Croaks if you already
established a connection.

=head2 $client->disconnect

Sends a C<DISCONNECT> STOMP frame to the message broker (if we are still
connected). Croaks, if you are trying to disconnect without actually being
connected.

=over

=item C<$ungraceful>

Boolean, defaults to 0. If the ungraceful option is set, then simply a
C<DISCONNECT> STOMP frame is sent and the connection state is considered to be
disconnected without awaiting any response from the server.
If, however, the option is not set, then a receipt is asked for and the
connection is only considered to be no longer established upon receiving a
receipt for the C<DISCONNECT> frame.

=back

=head2 bool $client->is_connected

Check whether we are still connected to the broker. May only be accurate if
STOMP heart-beats are used.

=head2 $subscription_id = $client->subscribe $destination, $ack_mode, $additional_headers

Subscribe to a destination by sending a C<SUBSCRIBE> STOMP frame to the message
broker. Returns the subscription identifier.

=over

=item C<$destination>

String, mandatory. The destination to which we want to subscribe to.

=item C<$ack_mode>

C<auto> | C<client> | C<client-individual>, optional, defaults to C<auto>.
See the STOMP documentation for further information on acknowledgement modes.

=item C<$additional_headers>

Used to pass arbitrary headers to the C<SUBSCRIBE> STOMP frame. Broker specific
flow control parameters for example is what would want to supply here.

=back

=head2 $client->unsubscribe $destination, $additional_headers

Unsubscribe from a destination by sending an C<UNSUBSCRIBE> STOMP frame to the
message broker.

=over

=item C<$destination>

String, mandatory. The destination from which we want to unsubscribe.

=item C<$additional_headers>

Used to pass arbitrary headers to the C<UNSUBSCRIBE> STOMP frame.

=back

=head2 $client->send $destination, $headers, $body

Send a STOMP C<SEND> frame to the message broker.

=over

=item C<$destination>

String, mandatory. The destination to which to send the message to.

=item C<$header>

Hash, optional, empty by default. Arbitrary headers included in the C<SEND>
frame. See the STOMP documentation for supported headers.

=item C<$body>

String, optional, empty by default. The body of the message, according to the
content-type specified in the header.

=back

=head2 $client->ack $ack_id, $transaction_id

Send an C<ACK> frame to acknowledge a received message.

=over

=item C<$ack_id>

String, mandatory. Has to match the C<ack> header of the message that is to be
acknowledged.

=item C<$transaction_id>

String, optional. A transaction identifier, if the C<ACK> is part of a transaction.

=back

=head2 $client->nack $ack_id, $transaction_id

Send an C<NACK> frame to NOT acknowledge a received message.

=over

=item C<$ack_id>

String, mandatory. Has to match the C<ack> header of the message that is to be
nacked.

=item C<$transaction_id>

lib/AnyEvent/STOMP/Client.pm  view on Meta::CPAN


=over

=item C<$transaction_id>

String, mandatory. A unique identifier for the transaction.

=item C<$additional_headers>

Hash, optional, empty by default. Used to pass arbitrary headers to the STOMP
frame.

=back

=head2 $client->commit_transaction $transaction_id, $additional_headers

Commit a STOMP transaction.

=over

=item C<$transaction_id>

String, mandatory. A unique identifier for the transaction.

=item C<$additional_headers>

Hash, optional, empty by default. Used to pass arbitrary headers to the STOMP
frame.

=back

=head2 $client->abort_transaction $transaction_id, $additional_headers

Abort a STOMP transaction.

=over

=item C<$transaction_id>

String, mandatory. A unique identifier for the transaction.

=item C<$additional_headers>

Hash, optional, empty by default. Used to pass arbitrary headers to the STOMP
frame.

=back

=head2 $client->destroy

Disconnects and cleans up all callbacks. To be called when the client object
is not used any more and should be cleaned up.

=head2 Callbacks

In order for the C<AnyEvent::STOMP::Client> to be useful, callback subroutines
can be registered for the following events:

=head3 $guard = $client->on_connected $callback

Invoked when a CONNECTED frame is received. Parameters passed to the callback:
C<$self>, C<$header_hashref>.

=head3 $guard = $client->on_disconnected $callback

Invoked after having successfully disconnected from a broker. I.e. when a
callback is registered for this event and the C<disconnect> subroutine is
called, then a receipt header is included in the DISCONNECT frame and the
disconnected event is fired upon receiving the receipt for the DISCONNECT frame.
Parameters passed to the callback: C<$self>, C<$host>, C<$port>.

=head3 $guard = $client->on_connection_lost $callback

Invoked when either the C<on_error> callback specified in the
C<AnyEvent::Handle> constructor is called, or when no more
heartbeats arrive from the server.
Parameters passed to the callback: C<$self>, C<$host>, C<$port>, C<$error_message>.

=head3 $guard = $client->on_connect_error $callback

Invoked when the C<on_connect_error> callback specified in the
C<AnyEvent::Handle> constructor is called.
Parameters passed to the callback: C<$self>, C<$host>, C<$port>.

=head3 $guard = $client->on_send_frame $callback

Invoked when a STOMP frame is sent. Parameters passed to the callback:
C<$self>, C<$frame> (the sent frame as string).

=head3 $guard = $client->on_send $callback

Invoked when a STOMP SEND command is sent. Parameters passed to the callback:
C<$self>, C<$frame> (the sent frame as string).

=head3 $guard = $client->on_ack $callback

Invoked when a STOMP ACK command is sent. Parameters passed to the callback:
C<$self>, C<$frame> (the sent frame as string).

=head3 $guard = $client->on_nack $callback

Invoked when a STOMP NACK command is sent. Parameters passed to the callback:
C<$self>, C<$frame> (the sent frame as string).

=head3 $guard = $client->on_read_frame $callback

Invoked when a STOMP frame is received (irrespective of the STOMP command).
Parameters passed to the callback: C<$self>, C<$command>, C<$header_hashref>,
C<$body> (may be C<undef>, if the frame is not specified to contain a body).

=head3 $guard = $client->on_message $callback $destination

Invoked when a MESSAGE frame is received. Optionally, a C<$destination>
parameter may be specified, resulting in the callback only being invoked,
when a MESSAGE is received from that specific destination.
Parameters passed to the callback: C<$self>, C<$header_hashref>, C<$body>.

=head3 $guard = $client->on_receipt $callback

Invoked when a RECEIPT frame is received. Parameters passed to the callback:
C<$self>, C<$header_hashref>.

=head3 $guard = $client->on_error $callback

Invoked when an ERROR frame is received. Parameters passed to the callback: C<$self>, C<$host>, C<$port>, C<$error_message>.

=head3 $guard = $client->on_subscribed $callback

Invoked after having successfully subscribed to a destination. Works behind the
scenes like the C<on_disconnected> described above. Parameters passed to the
callback: C<$self>, C<$destination>.

=head3 $guard = $client->on_unsubscribed $callback

Invoked after having successfully unsubscribed to a destination. Works behind



( run in 1.013 second using v1.01-cache-2.11-cpan-df04353d9ac )