AnyEvent-STOMP-Client

 view release on metacpan or  search on metacpan

lib/AnyEvent/STOMP/Client.pm  view on Meta::CPAN

            $self->read_frame;
        },
        %{$self->{tls_hash}},
    );
}

sub disconnect {
    my $self = shift;
    my $ungraceful = shift;

    unless ($self->is_connected) {
        if (defined $self->{handle}) {
            $self->{handle}->destroy;
            delete $self->{handle};
        }
        $self->event('DISCONNECTED', $self->{host}, $self->{port}, $ungraceful);
        delete $self->{heartbeat}{timer};
        return;
    }

    if (defined $ungraceful and $ungraceful) {
        $self->send_frame('DISCONNECT');
        $self->{connected} = 0;
        if (defined $self->{handle}) {
            $self->{handle}->push_shutdown;
            $self->{handle}->destroy;
            delete $self->{handle};
        }
        $self->event('DISCONNECTED', $self->{host}, $self->{port}, $ungraceful);
        delete $self->{heartbeat}{timer};
    }
    else {
        my $receipt_id = $self->get_uuid;

        $self->send_frame('DISCONNECT', {receipt => $receipt_id,});

        $self->before_receipt(
            sub {
                my ($self, $header) = @_;

                if ($header->{'receipt-id'} eq $receipt_id) {
                    $self->{connected} = 0;
                    $self->stop_event;
                    $self->unreg_me;
                    if (defined $self->{handle}) {
                        $self->{handle}->push_shutdown;
                        $self->{handle}->destroy;
                        delete $self->{handle};
                    }
                    $self->event('DISCONNECTED', $self->{host}, $self->{port}, $ungraceful);
                    delete $self->{heartbeat}{timer};
                }
            }
        );
    }
}

sub destroy {
    my $self = shift;
    $self->disconnect(1) if $self->is_connected;
    $self->remove_all_callbacks;
}

sub DESTROY {
    my $self = shift;
    $self->disconnect(1) if $self->is_connected;
}

sub is_connected {
    my $self = shift;
    return defined $self->{handle} && $self->{connected};
}

sub set_connection_timeout_margin {
    my ($self, $new_connection_timeout_margin) = @_;

    if ($new_connection_timeout_margin =~ m/^\d+$/) {
        $self->{connection_timeout_margin} = $new_connection_timeout_margin;
    }
}

sub get_connection_timeout_margin {
    return shift->{connection_timeout_margin};
}

sub set_heartbeat_intervals {
    my $self = shift;

    my ($cx, $cy) = split ',', $self->{connect_headers}{'heart-beat'};
    my ($sx, $sy) = split ',', shift;

    if ($cx == 0 or $sy == 0) {
        $self->{heartbeat}{interval}{client} = 0;
    }
    else {
        $self->{heartbeat}{interval}{client} = max($cx, $sy);
    }

    if ($sx == 0 or $cy == 0) {
        $self->{heartbeat}{interval}{server} = 0;
    }
    else {
        $self->{heartbeat}{interval}{server} = max($sx, $cy);
    }
}

sub reset_client_heartbeat_timer {
    my $self = shift;
    my $interval = $self->{heartbeat}{interval}{client};

    unless (defined $interval and $interval > 0) {
        return;
    }

    $self->{heartbeat}{timer}{client} = AnyEvent->timer(
        after => ($interval/1000),
        cb => sub {
            $self->send_heartbeat;
        }
    );
}

lib/AnyEvent/STOMP/Client.pm  view on Meta::CPAN

    return shift->reg_cb('SUBSCRIBED', shift);
}

sub on_unsubscribed {
    return shift->reg_cb('UNSUBSCRIBED', shift);
}

sub unregister_callback {
    my ($self, $guard) = @_;
    $self->unreg_cb($guard);
}

1;

__END__

=head1 NAME

AnyEvent::STOMP::Client - An event-based non-blocking STOMP 1.2 client based on
AnyEvent and Object::Event.

=head1 SYNOPSIS

  use AnyEvent::STOMP::Client;

  my $stomp_client = new AnyEvent::STOMP::Client()

  $stomp_client->connect();

  $stomp_client->on_connected(
      sub {
          my $self = shift;

          $self->subscribe('/queue/test-destination');

          $self->send(
              '/queue/test-destination',
              {'content-type' => 'text/plain',},
              "Hello World!"
          );
      }
  );

  $stomp_client->on_message(
      sub {
          my ($self, $header, $body) = @_;
          print "$body\n";
      }
  );

  AnyEvent->condvar->recv;


=head1 DESCRIPTION

AnyEvent::STOMP::Client provides a STOMP (Simple Text Oriented Messaging
Protocol) client. Thanks to AnyEvent, AnyEvent::STOMP::Client is completely
non-blocking, by making extensive use of the AnyEvent::Handle and timers (and,
under the hood, AnyEvent::Socket). Building on Object::Event,
AnyEvent::STOMP::Client implements various events (e.g. the MESSAGE event, when
a STOMP MESSAGE frame is received) and offers callbacks for these (e.g.
on_message($callback)).

=head1 METHODS

=head2 $client = new $host, $port, $connect_headers, $tls_context

Create an instance of C<AnyEvent::STOMP::Client>.

=over

=item C<$host>

String, optional, defaults to C<localhost>. The host, where a STOMP-compatible
message broker is running.

=item C<$port>

Integer, optional, defaults to C<61613>. The TCP port we connect to. I.e. the
port where the message broker instance is listening.

=item C<$connect_headers>

Hash, optional, empty by default. May be used to add arbitrary headers to the
STOMP C<CONNECT> frame. STOMP login headers would, for example, be supplied
using this parameter.

=item C<$tls_context>

Hash, optional, undef by default. May be used to supply a SSL/TLS context
directly to C<AnyEvent::Handle>. See L<AnyEvent::TLS> for documentation.

=back

=head3 Example

C<< my $client = AnyEvent::STOMP::Client->new(
    '127.0.0.1',
    61614,
    {'login' => 'guest', 'passcode' => 'guest', 'virtual-host' => 'foo'}
); >>

=head2 $client = connect

Connect to the specified  STOMP message broker. Croaks if you already
established a connection.

=head2 $client->disconnect

Sends a C<DISCONNECT> STOMP frame to the message broker (if we are still
connected). Croaks, if you are trying to disconnect without actually being
connected.

=over

=item C<$ungraceful>

Boolean, defaults to 0. If the ungraceful option is set, then simply a
C<DISCONNECT> STOMP frame is sent and the connection state is considered to be
disconnected without awaiting any response from the server.
If, however, the option is not set, then a receipt is asked for and the

lib/AnyEvent/STOMP/Client.pm  view on Meta::CPAN

=item C<$transaction_id>

String, optional. A transaction identifier, if the C<NACK> is part of a
transaction.

=back

=head2 $client->begin_transaction $transaction_id, $additional_headers

Begin a STOMP transaction.

=over

=item C<$transaction_id>

String, mandatory. A unique identifier for the transaction.

=item C<$additional_headers>

Hash, optional, empty by default. Used to pass arbitrary headers to the STOMP
frame.

=back

=head2 $client->commit_transaction $transaction_id, $additional_headers

Commit a STOMP transaction.

=over

=item C<$transaction_id>

String, mandatory. A unique identifier for the transaction.

=item C<$additional_headers>

Hash, optional, empty by default. Used to pass arbitrary headers to the STOMP
frame.

=back

=head2 $client->abort_transaction $transaction_id, $additional_headers

Abort a STOMP transaction.

=over

=item C<$transaction_id>

String, mandatory. A unique identifier for the transaction.

=item C<$additional_headers>

Hash, optional, empty by default. Used to pass arbitrary headers to the STOMP
frame.

=back

=head2 $client->destroy

Disconnects and cleans up all callbacks. To be called when the client object
is not used any more and should be cleaned up.

=head2 Callbacks

In order for the C<AnyEvent::STOMP::Client> to be useful, callback subroutines
can be registered for the following events:

=head3 $guard = $client->on_connected $callback

Invoked when a CONNECTED frame is received. Parameters passed to the callback:
C<$self>, C<$header_hashref>.

=head3 $guard = $client->on_disconnected $callback

Invoked after having successfully disconnected from a broker. I.e. when a
callback is registered for this event and the C<disconnect> subroutine is
called, then a receipt header is included in the DISCONNECT frame and the
disconnected event is fired upon receiving the receipt for the DISCONNECT frame.
Parameters passed to the callback: C<$self>, C<$host>, C<$port>.

=head3 $guard = $client->on_connection_lost $callback

Invoked when either the C<on_error> callback specified in the
C<AnyEvent::Handle> constructor is called, or when no more
heartbeats arrive from the server.
Parameters passed to the callback: C<$self>, C<$host>, C<$port>, C<$error_message>.

=head3 $guard = $client->on_connect_error $callback

Invoked when the C<on_connect_error> callback specified in the
C<AnyEvent::Handle> constructor is called.
Parameters passed to the callback: C<$self>, C<$host>, C<$port>.

=head3 $guard = $client->on_send_frame $callback

Invoked when a STOMP frame is sent. Parameters passed to the callback:
C<$self>, C<$frame> (the sent frame as string).

=head3 $guard = $client->on_send $callback

Invoked when a STOMP SEND command is sent. Parameters passed to the callback:
C<$self>, C<$frame> (the sent frame as string).

=head3 $guard = $client->on_ack $callback

Invoked when a STOMP ACK command is sent. Parameters passed to the callback:
C<$self>, C<$frame> (the sent frame as string).

=head3 $guard = $client->on_nack $callback

Invoked when a STOMP NACK command is sent. Parameters passed to the callback:
C<$self>, C<$frame> (the sent frame as string).

=head3 $guard = $client->on_read_frame $callback

Invoked when a STOMP frame is received (irrespective of the STOMP command).
Parameters passed to the callback: C<$self>, C<$command>, C<$header_hashref>,
C<$body> (may be C<undef>, if the frame is not specified to contain a body).

=head3 $guard = $client->on_message $callback $destination



( run in 3.049 seconds using v1.01-cache-2.11-cpan-8f98c5d2c55 )