AnyEvent-STOMP-Client
view release on metacpan or search on metacpan
lib/AnyEvent/STOMP/Client.pm view on Meta::CPAN
}
sub connect {
my $self = shift;
if ($self->is_connected) {
undef $self->{handle};
$self->{connected} = 0;
}
$self->{subscriptions} = {};
$self->{handle} = AnyEvent::Handle->new(
connect => [$self->{host}, $self->{port}],
keep_alive => 1,
no_delay => 1,
on_eof => sub {
undef $self->{handle};
$self->{connected} = 0;
$self->event('TRANSPORT_DISCONNECTED', $self->{host}, $self->{port});
},
on_connect => sub {
$self->event('TRANSPORT_CONNECTED', $self->{host}, $self->{port});
$self->send_frame('CONNECT', $self->{connect_headers});
},
on_connect_error => sub {
my ($handle, $error_message) = @_;
$handle->destroy;
undef $self->{handle};
$self->{connected} = 0;
$self->event('TRANSPORT_CONNECT_ERROR', $self->{host}, $self->{port}, $error_message);
},
on_error => sub {
my ($handle, $fatal, $error_message) = @_;
$self->event('ERROR', $self->{host}, $self->{port}, $error_message);
if ($fatal) {
undef $self->{handle};
$self->{connected} = 0;
# $handle->destroy() will be called automatically after this callback, see
# https://metacpan.org/pod/AnyEvent::Handle#on_error-=%3E-$cb-%3E($handle,-$fatal,-$message)
$self->event('CONNECTION_LOST', $self->{host}, $self->{port}, $error_message);
}
},
on_read => sub {
$self->read_frame;
},
%{$self->{tls_hash}},
);
}
sub disconnect {
my $self = shift;
my $ungraceful = shift;
unless ($self->is_connected) {
if (defined $self->{handle}) {
$self->{handle}->destroy;
delete $self->{handle};
}
$self->event('DISCONNECTED', $self->{host}, $self->{port}, $ungraceful);
delete $self->{heartbeat}{timer};
return;
}
if (defined $ungraceful and $ungraceful) {
$self->send_frame('DISCONNECT');
$self->{connected} = 0;
if (defined $self->{handle}) {
$self->{handle}->push_shutdown;
$self->{handle}->destroy;
delete $self->{handle};
}
$self->event('DISCONNECTED', $self->{host}, $self->{port}, $ungraceful);
delete $self->{heartbeat}{timer};
}
else {
my $receipt_id = $self->get_uuid;
$self->send_frame('DISCONNECT', {receipt => $receipt_id,});
$self->before_receipt(
sub {
my ($self, $header) = @_;
if ($header->{'receipt-id'} eq $receipt_id) {
$self->{connected} = 0;
$self->stop_event;
$self->unreg_me;
if (defined $self->{handle}) {
$self->{handle}->push_shutdown;
$self->{handle}->destroy;
delete $self->{handle};
}
$self->event('DISCONNECTED', $self->{host}, $self->{port}, $ungraceful);
delete $self->{heartbeat}{timer};
}
}
);
}
}
sub destroy {
my $self = shift;
$self->disconnect(1) if $self->is_connected;
$self->remove_all_callbacks;
}
sub DESTROY {
my $self = shift;
$self->disconnect(1) if $self->is_connected;
}
sub is_connected {
my $self = shift;
return defined $self->{handle} && $self->{connected};
}
sub set_connection_timeout_margin {
my ($self, $new_connection_timeout_margin) = @_;
if ($new_connection_timeout_margin =~ m/^\d+$/) {
$self->{connection_timeout_margin} = $new_connection_timeout_margin;
}
}
sub get_connection_timeout_margin {
return shift->{connection_timeout_margin};
}
sub set_heartbeat_intervals {
my $self = shift;
my ($cx, $cy) = split ',', $self->{connect_headers}{'heart-beat'};
my ($sx, $sy) = split ',', shift;
if ($cx == 0 or $sy == 0) {
$self->{heartbeat}{interval}{client} = 0;
}
else {
$self->{heartbeat}{interval}{client} = max($cx, $sy);
}
if ($sx == 0 or $cy == 0) {
$self->{heartbeat}{interval}{server} = 0;
}
else {
$self->{heartbeat}{interval}{server} = max($sx, $cy);
}
}
sub reset_client_heartbeat_timer {
my $self = shift;
my $interval = $self->{heartbeat}{interval}{client};
unless (defined $interval and $interval > 0) {
return;
}
$self->{heartbeat}{timer}{client} = AnyEvent->timer(
after => ($interval/1000),
cb => sub {
$self->send_heartbeat;
}
);
}
sub reset_server_heartbeat_timer {
my $self = shift;
my $interval = $self->{heartbeat}{interval}{server};
unless (defined $interval and $interval > 0) {
return;
}
$self->{heartbeat}{timer}{server} = AnyEvent->timer(
after => (($interval+$self->get_connection_timeout_margin)/1000),
cb => sub {
if ($self->{connected}) {
$self->{connected} = 0;
if (defined $self->{handle}) {
$self->{handle}->push_shutdown;
$self->{handle}->destroy;
undef $self->{handle};
}
$self->event('CONNECTION_LOST', $self->{host}, $self->{port}, 'Missed server heartbeat');
}
}
);
}
sub get_uuid {
my $self = shift;
return int(time).$self->{counter}++;
}
sub subscribe {
my $self = shift;
my $destination = shift;
my $ack_mode = shift || 'auto';
my $additional_headers = shift || {};
unless ($ack_mode =~ m/(?:auto|client|client-individual)/) {
croak "Invalid acknowledgement mode '$ack_mode'. "
."Valid modes are 'auto', 'client' and 'client-individual'."
}
unless (defined $destination) {
croak "Would you mind supplying me with a destination?";
}
if (defined $self->{subscriptions}{$destination}) {
carp "You already subscribed to '$destination'.";
if ($self->handles('SUBSCRIBED')) {
$self->event('SUBSCRIBED', $destination);
}
}
else {
my $subscription_id = shift || $self->get_uuid;
$self->{subscriptions}{$destination} = $subscription_id;
my $header = {
destination => $destination,
id => $subscription_id,
ack => $ack_mode,
%$additional_headers,
};
if ($self->handles('SUBSCRIBED')) {
unless (defined $header->{receipt}) {
$header->{receipt} = $self->get_uuid;
}
$self->before_receipt(
sub {
lib/AnyEvent/STOMP/Client.pm view on Meta::CPAN
my $ENCODE_KEYS = '['.join('', map(sprintf('\\x%02x', ord($_)), keys(%ENCODE_MAP))).']';
while (my ($k, $v) = each(%$header_hashref)) {
$v =~ s/($ENCODE_KEYS)/$ENCODE_MAP{$1}/ego;
$k =~ s/($ENCODE_KEYS)/$ENCODE_MAP{$1}/ego;
$result_hashref->{$k} = $v;
}
return $result_hashref;
}
sub decode_header {
my $header_hashref = shift;
my $result_hashref = {};
while (my ($k, $v) = each(%$header_hashref)) {
if ($v =~ m/(\\.)/) {
$v =~ s/(\\.)/$DECODE_MAP{$1}/eg || croak "Invalid header value.";
}
if ($k =~ m/(\\.)/) {
$k =~ s/(\\.)/$DECODE_MAP{$1}/eg || croak "Invalid header key.";
}
$result_hashref->{$k} = $v;
}
return $result_hashref;
}
sub send_frame {
my ($self, $command, $header_hashref, $body) = @_;
unless ($self->is_connected or $command eq 'CONNECT') {
croak "Have you considered connecting to a STOMP broker first before "
."trying to send something?";
}
utf8::encode($command);
my $header;
if ($command eq 'CONNECT') {
$header = header_hash2string($header_hashref);
}
else {
$header = header_hash2string(encode_header($header_hashref));
}
utf8::encode($header);
my $frame;
if ($command eq 'SEND') {
$body = '' unless defined $body;
$frame = $command.$EOL.$header.$EOL.$EOL.$body.$NULL;
}
else {
$frame = $command.$EOL.$header.$EOL.$EOL.$NULL;
}
$self->event('SEND_FRAME', $frame);
$self->event($command, $frame) if ($command =~ m/SEND|ACK|NACK|/);
$self->{handle}->push_write($frame);
$self->reset_client_heartbeat_timer;
}
sub send {
my ($self, $destination, $headers, $body) = @_;
if (defined $destination) {
$headers->{destination} = $destination;
}
else {
croak "Would you mind supplying me with a destination?";
}
unless (defined $headers->{'content-length'}) {
$headers->{'content-length'} = length $body || 0;
}
unless (defined $headers->{'content-type'}) {
carp "It is strongly recommended to set the 'content-type' header.";
}
$self->send_frame('SEND', $headers, $body);
}
sub ack {
my ($self, $ack_id, $transaction) = @_;
unless ($ack_id) {
croak "I do really need the message's ack header to ACK it.";
}
my $header = {id => $ack_id,};
$header->{transaction} = $transaction if (defined $transaction);
$self->send_frame('ACK', $header);
}
sub nack {
my ($self, $ack_id, $transaction) = @_;
unless ($ack_id) {
croak "I do really need the message's ack header to NACK it.";
}
my $header = {id => $ack_id,};
$header->{transaction} = $transaction if (defined $transaction);
$self->send_frame('NACK', $header);
}
sub send_heartbeat {
my $self = shift;
if ($self->is_connected) {
$self->{handle}->push_write($EOL);
$self->reset_client_heartbeat_timer;
}
}
sub begin_transaction {
my $self = shift;
my $id = shift;
my $additional_headers = shift || {};
croak "I really need a transaction identifier here!" unless (defined $id);
if (defined $self->{transactions}{$id}) {
carp "You've already begun transaction '$id'";
}
else {
$self->send_frame('BEGIN', {transaction => $id, %$additional_headers,});
$self->{transactions}{$id} = 1;
}
}
sub commit_transaction {
my $self = shift;
my $id = shift;
my $additional_headers = shift || {};
croak "I really need a transaction identifier here!" unless (defined $id);
unless (defined $self->{transactions}{$id}) {
carp "You've already commited transaction '$id'";
}
$self->send_frame('COMMIT', {transaction => $id, %$additional_headers,});
delete $self->{transactions}{$id};
}
sub abort_transaction {
my $self = shift;
my $id = shift;
my $additional_headers = shift || {};
croak "I really need a transaction identifier here!" unless (defined $id);
unless (defined $self->{transactions}{$id}) {
carp "You've already commited transaction '$id'";
}
$self->send_frame('ABORT', {transaction => $id, %$additional_headers,});
delete $self->{transactions}{$id};
}
sub read_frame {
my $self = shift;
$self->{handle}->unshift_read(
line => sub {
my ($handle, $command, $eol) = @_;
$self->reset_server_heartbeat_timer;
if ($command =~ /^(CONNECTED|MESSAGE|RECEIPT|ERROR)$/) {
$command = $1;
}
else {
return;
}
$self->{handle}->unshift_read(
regex => qr<\r?\n\r?\n>,
cb => sub {
my ($handle, $header_string) = @_;
my $header_hashref = header_string2hash($header_string);
my $args;
# The headers of the CONNECTED frame are not en-/decoded
# for backwards compatibility with STOMP 1.0
unless ($command eq 'CONNECTED') {
$header_hashref = decode_header($header_hashref);
}
if ($command =~ m/MESSAGE|ERROR/) {
if (defined $header_hashref->{'content-length'}) {
$args->{chunk} = $header_hashref->{'content-length'};
}
else {
$args->{regex} = qr<[^\000]*\000>;
}
$self->{handle}->unshift_read(
%$args,
cb => sub {
my ($handle, $body) = @_;
$self->event('READ_FRAME', $command, $header_hashref, $body);
if ($command eq 'ERROR') {
$body =~ s/^\s+|\s+$|\0//g; # trim and remove null char
$self->event(
'ERROR',
$self->{host},
$self->{port},
$body || $header_hashref->{message} || 'unknown'
);
}
else {
$self->event('MESSAGE', $header_hashref, $body);
# If frame end was determined by matching
# for a NULL character, then this character
# is not part of the frame body and thus
# removed
if (exists $args->{regex}) {
$body =~ s/\0$//g;
}
if (defined $header_hashref->{subscription}) {
$self->event(
"MESSAGE-$header_hashref->{subscription}",
$header_hashref,
$body
lib/AnyEvent/STOMP/Client.pm view on Meta::CPAN
}
sub on_subscribed {
return shift->reg_cb('SUBSCRIBED', shift);
}
sub on_unsubscribed {
return shift->reg_cb('UNSUBSCRIBED', shift);
}
sub unregister_callback {
my ($self, $guard) = @_;
$self->unreg_cb($guard);
}
1;
__END__
=head1 NAME
AnyEvent::STOMP::Client - An event-based non-blocking STOMP 1.2 client based on
AnyEvent and Object::Event.
=head1 SYNOPSIS
use AnyEvent::STOMP::Client;
my $stomp_client = new AnyEvent::STOMP::Client()
$stomp_client->connect();
$stomp_client->on_connected(
sub {
my $self = shift;
$self->subscribe('/queue/test-destination');
$self->send(
'/queue/test-destination',
{'content-type' => 'text/plain',},
"Hello World!"
);
}
);
$stomp_client->on_message(
sub {
my ($self, $header, $body) = @_;
print "$body\n";
}
);
AnyEvent->condvar->recv;
=head1 DESCRIPTION
AnyEvent::STOMP::Client provides a STOMP (Simple Text Oriented Messaging
Protocol) client. Thanks to AnyEvent, AnyEvent::STOMP::Client is completely
non-blocking, by making extensive use of the AnyEvent::Handle and timers (and,
under the hood, AnyEvent::Socket). Building on Object::Event,
AnyEvent::STOMP::Client implements various events (e.g. the MESSAGE event, when
a STOMP MESSAGE frame is received) and offers callbacks for these (e.g.
on_message($callback)).
=head1 METHODS
=head2 $client = new $host, $port, $connect_headers, $tls_context
Create an instance of C<AnyEvent::STOMP::Client>.
=over
=item C<$host>
String, optional, defaults to C<localhost>. The host, where a STOMP-compatible
message broker is running.
=item C<$port>
Integer, optional, defaults to C<61613>. The TCP port we connect to. I.e. the
port where the message broker instance is listening.
=item C<$connect_headers>
Hash, optional, empty by default. May be used to add arbitrary headers to the
STOMP C<CONNECT> frame. STOMP login headers would, for example, be supplied
using this parameter.
=item C<$tls_context>
Hash, optional, undef by default. May be used to supply a SSL/TLS context
directly to C<AnyEvent::Handle>. See L<AnyEvent::TLS> for documentation.
=back
=head3 Example
C<< my $client = AnyEvent::STOMP::Client->new(
'127.0.0.1',
61614,
{'login' => 'guest', 'passcode' => 'guest', 'virtual-host' => 'foo'}
); >>
=head2 $client = connect
Connect to the specified STOMP message broker. Croaks if you already
established a connection.
=head2 $client->disconnect
Sends a C<DISCONNECT> STOMP frame to the message broker (if we are still
connected). Croaks, if you are trying to disconnect without actually being
connected.
=over
=item C<$ungraceful>
Boolean, defaults to 0. If the ungraceful option is set, then simply a
( run in 1.020 second using v1.01-cache-2.11-cpan-f56aa216473 )