view release on metacpan or search on metacpan
}
);
AnyEvent->condvar->recv;
=head1 DESCRIPTION
AnyEvent::STOMP::Client provides a STOMP (Simple Text Oriented Messaging
Protocol) client. Thanks to AnyEvent, AnyEvent::STOMP::Client is completely
non-blocking, by making extensive use of the AnyEvent::Handle and timers (and,
under the hood, AnyEvent::Socket). Building on Object::Event,
AnyEvent::STOMP::Client implements various events (e.g. the MESSAGE event, when
a STOMP MESSAGE frame is received) and offers callbacks for these (e.g.
on_message($callback)).
=head1 METHODS
=head2 $client = new $host, $port, $connect_headers, $tls_context
Create an instance of C<AnyEvent::STOMP::Client>.
examples/connection_handling.pl view on Meta::CPAN
# Example for Connection Handling using AnyEvent::STOMP::Client
#
################################################################################
use AnyEvent;
use AnyEvent::STOMP::Client;
my $stomp_client = new AnyEvent::STOMP::Client();
my $backoff = 0;
my $backoff_timer;
sub backoff {
$backoff_timer = AnyEvent->timer(
after => $backoff,
cb => sub { $stomp_client->connect(); }
);
}
$stomp_client->on_connected(sub { $backoff = 0; });
$stomp_client->on_connection_lost(sub { &backoff });
$stomp_client->on_connect_error(sub { $backoff += 10; &backoff; });
$stomp_client->connect();
examples/sample-consumer.pl view on Meta::CPAN
print "MESSAGE\n";
foreach (sort keys %$header) {
print "$_:$header->{$_}\n";
}
print "\n$body\n";
$self->ack($header->{'ack'}) if (defined $header->{'ack'});
}
);
$w = AnyEvent->timer(
after => 10,
cb => sub {
$stomp_client->unsubscribe('/queue/test-destination');
}
);
$stomp_client->on_unsubscribed(
sub {
my ($self, $destination) = @_;
print "Unsubscribed from '$destination'!\n";
lib/AnyEvent/STOMP/Client.pm view on Meta::CPAN
sub disconnect {
my $self = shift;
my $ungraceful = shift;
unless ($self->is_connected) {
if (defined $self->{handle}) {
$self->{handle}->destroy;
delete $self->{handle};
}
$self->event('DISCONNECTED', $self->{host}, $self->{port}, $ungraceful);
delete $self->{heartbeat}{timer};
return;
}
if (defined $ungraceful and $ungraceful) {
$self->send_frame('DISCONNECT');
$self->{connected} = 0;
if (defined $self->{handle}) {
$self->{handle}->push_shutdown;
$self->{handle}->destroy;
delete $self->{handle};
}
$self->event('DISCONNECTED', $self->{host}, $self->{port}, $ungraceful);
delete $self->{heartbeat}{timer};
}
else {
my $receipt_id = $self->get_uuid;
$self->send_frame('DISCONNECT', {receipt => $receipt_id,});
$self->before_receipt(
sub {
my ($self, $header) = @_;
if ($header->{'receipt-id'} eq $receipt_id) {
$self->{connected} = 0;
$self->stop_event;
$self->unreg_me;
if (defined $self->{handle}) {
$self->{handle}->push_shutdown;
$self->{handle}->destroy;
delete $self->{handle};
}
$self->event('DISCONNECTED', $self->{host}, $self->{port}, $ungraceful);
delete $self->{heartbeat}{timer};
}
}
);
}
}
sub destroy {
my $self = shift;
$self->disconnect(1) if $self->is_connected;
$self->remove_all_callbacks;
lib/AnyEvent/STOMP/Client.pm view on Meta::CPAN
}
if ($sx == 0 or $cy == 0) {
$self->{heartbeat}{interval}{server} = 0;
}
else {
$self->{heartbeat}{interval}{server} = max($sx, $cy);
}
}
sub reset_client_heartbeat_timer {
my $self = shift;
my $interval = $self->{heartbeat}{interval}{client};
unless (defined $interval and $interval > 0) {
return;
}
$self->{heartbeat}{timer}{client} = AnyEvent->timer(
after => ($interval/1000),
cb => sub {
$self->send_heartbeat;
}
);
}
sub reset_server_heartbeat_timer {
my $self = shift;
my $interval = $self->{heartbeat}{interval}{server};
unless (defined $interval and $interval > 0) {
return;
}
$self->{heartbeat}{timer}{server} = AnyEvent->timer(
after => (($interval+$self->get_connection_timeout_margin)/1000),
cb => sub {
if ($self->{connected}) {
$self->{connected} = 0;
if (defined $self->{handle}) {
$self->{handle}->push_shutdown;
$self->{handle}->destroy;
undef $self->{handle};
}
lib/AnyEvent/STOMP/Client.pm view on Meta::CPAN
$body = '' unless defined $body;
$frame = $command.$EOL.$header.$EOL.$EOL.$body.$NULL;
}
else {
$frame = $command.$EOL.$header.$EOL.$EOL.$NULL;
}
$self->event('SEND_FRAME', $frame);
$self->event($command, $frame) if ($command =~ m/SEND|ACK|NACK|/);
$self->{handle}->push_write($frame);
$self->reset_client_heartbeat_timer;
}
sub send {
my ($self, $destination, $headers, $body) = @_;
if (defined $destination) {
$headers->{destination} = $destination;
}
else {
croak "Would you mind supplying me with a destination?";
lib/AnyEvent/STOMP/Client.pm view on Meta::CPAN
$header->{transaction} = $transaction if (defined $transaction);
$self->send_frame('NACK', $header);
}
sub send_heartbeat {
my $self = shift;
if ($self->is_connected) {
$self->{handle}->push_write($EOL);
$self->reset_client_heartbeat_timer;
}
}
sub begin_transaction {
my $self = shift;
my $id = shift;
my $additional_headers = shift || {};
croak "I really need a transaction identifier here!" unless (defined $id);
lib/AnyEvent/STOMP/Client.pm view on Meta::CPAN
$self->send_frame('ABORT', {transaction => $id, %$additional_headers,});
delete $self->{transactions}{$id};
}
sub read_frame {
my $self = shift;
$self->{handle}->unshift_read(
line => sub {
my ($handle, $command, $eol) = @_;
$self->reset_server_heartbeat_timer;
if ($command =~ /^(CONNECTED|MESSAGE|RECEIPT|ERROR)$/) {
$command = $1;
}
else {
return;
}
$self->{handle}->unshift_read(
regex => qr<\r?\n\r?\n>,
lib/AnyEvent/STOMP/Client.pm view on Meta::CPAN
}
);
AnyEvent->condvar->recv;
=head1 DESCRIPTION
AnyEvent::STOMP::Client provides a STOMP (Simple Text Oriented Messaging
Protocol) client. Thanks to AnyEvent, AnyEvent::STOMP::Client is completely
non-blocking, by making extensive use of the AnyEvent::Handle and timers (and,
under the hood, AnyEvent::Socket). Building on Object::Event,
AnyEvent::STOMP::Client implements various events (e.g. the MESSAGE event, when
a STOMP MESSAGE frame is received) and offers callbacks for these (e.g.
on_message($callback)).
=head1 METHODS
=head2 $client = new $host, $port, $connect_headers, $tls_context
Create an instance of C<AnyEvent::STOMP::Client>.
lib/AnyEvent/STOMP/Client/All.pm view on Meta::CPAN
if (defined $self->{backoff}{$id}{current}) {
$self->increase_backoff($id);
}
else {
$self->{backoff}{$id}{current} = $self->{config}{backoff}{start_value};
}
$log->debug("$id backoff: ".$self->{backoff}{$id}{current});
$self->{reconnect_timers}{$id} = AnyEvent->timer (
after => $self->get_backoff($id),
cb => sub {
$log->debug("$id trying to connect.");
$self->{stomp_clients}{$id}->connect;
},
);
}
sub increase_backoff {
my ($self, $id) = @_;
if ($self->{backoff}{$id}{current} < $self->{config}{backoff}{maximum}) {
$self->{backoff}{$id}{current} *= $self->{config}{backoff}{multiplier};
}
}
sub reset_backoff {
my ($self, $id) = @_;
delete $self->{reconnect_timer}{$id};
delete $self->{backoff}{$id}{current};
}
sub get_backoff {
my ($self, $id) = @_;
return $self->{backoff}{$id}{current};
}
1;
lib/AnyEvent/STOMP/Client/Any.pm view on Meta::CPAN
);
$self->{stomp_clients}{$id}->on_connected(
sub {
my (undef, $header) = @_;
$log->debug("$id STOMP connection established.");
$self->{current_stomp_client} = $self->{stomp_clients}{$id};
$self->reset_backoff;
delete $self->{connect_timeout_timer};
$self->event('ANY_CONNECTED', $header, $id);
}
);
$self->{stomp_clients}{$id}->on_transport_connected(
sub {
$log->debug("$id TCP/TLS connection established.");
}
);
lib/AnyEvent/STOMP/Client/Any.pm view on Meta::CPAN
}
);
$self->{stomp_clients}{$id}->on_connection_lost(
sub {
my (undef, undef, undef, $reason) = @_;
delete $self->{current_stomp_client};
$log->debug("$id Connection lost ($reason).");
delete $self->{connect_timeout_timer};
$self->set_client_unavailable($id);
$self->event('ANY_CONNECTION_LOST', $id);
$self->backoff;
}
);
$self->{stomp_clients}{$id}->on_connect_error(
sub {
my (undef, undef, undef, $reason) = @_;
$log->debug("$id Could not establish connection ($reason).");
delete $self->{connect_timeout_timer};
$self->set_client_unavailable($id);
$self->backoff;
}
);
$self->{stomp_clients}{$id}->on_receipt(
sub {
my (undef, $header) = @_;
$self->event('ANY_RECEIPT', $header, $id);
}
lib/AnyEvent/STOMP/Client/Any.pm view on Meta::CPAN
$log->debug("STOMP clients set up.");
}
sub connect {
my $self = shift;
my $id = $self->get_random_client_id;
$log->debug("$id Establishing TCP/TLS connection.");
$self->{stomp_clients}{$id}->connect;
$self->{connect_timeout_timer} = AnyEvent->timer(
after => 10,
cb => sub {
$log->debug("$id Timeout establishing STOMP connection.");
$self->{stomp_clients}{$id}->disconnect;
$self->set_client_unavailable($id);
$self->backoff;
}
);
}
lib/AnyEvent/STOMP/Client/Any.pm view on Meta::CPAN
sub backoff {
my $self = shift;
if ($self->is_client_available) {
$self->connect;
}
else {
$self->increase_backoff;
$self->reset_clients_state;
$self->{reconnect_timer} = AnyEvent->timer(
after => $self->get_backoff,
cb => sub {
$self->backoff;
},
);
}
}
sub increase_backoff {
my $self = shift;
lib/AnyEvent/STOMP/Client/Any.pm view on Meta::CPAN
my $val = $self->{config}{backoff}{start_value};
$self->{backoff} = rand($val)+$val/2;
}
$log->debug("Backing off ".$self->{backoff});
}
sub reset_backoff {
my $self = shift;
delete $self->{reconnect_timer};
delete $self->{backoff};
$self->reset_clients_state;
}
sub get_backoff {
return shift->{backoff};
}
sub get_random_client_id {
my $self = shift;