AnyEvent-STOMP-Client
view release on metacpan or search on metacpan
lib/AnyEvent/STOMP/Client/All.pm view on Meta::CPAN
$self->{stomp_clients}{$id}->on_error($callback);
}
}
sub on_message {
my ($self, $callback) = @_;
foreach my $id (keys %{$self->{stomp_clients}}) {
$self->{stomp_clients}{$id}->on_message(
sub {
my ($self, $header, $body) = @_;
delete $header->{'subscription'};
delete $header->{'message-id'};
delete $header->{'receipt'};
if (defined $header->{'ack'}) {
$log->debug("$id message $header->{'ack'} received.");
$header->{'ack'} = $id.$SEPARATOR_ID_ACK.$header->{'ack'} if defined $header->{'ack'};
}
else {
$log->debug("$id message received.");
}
&$callback($self, $header, $body);
}
);
}
}
sub ack {
my ($self, $id_ack) = @_;
my ($id, $ack) = split $SEPARATOR_ID_ACK, $id_ack;
$log->debug("$id sending ack $ack.");
$self->{stomp_clients}{$id}->ack($ack);
}
sub nack {
my ($self, $id_ack) = @_;
my ($id, $ack) = split $SEPARATOR_ID_ACK, $id_ack;
$log->debug("$id sending nack $ack.");
$self->{stomp_clients}{$id}->nack($ack);
}
sub backoff {
my ($self, $id) = @_;
if (defined $self->{backoff}{$id}{current}) {
$self->increase_backoff($id);
}
else {
$self->{backoff}{$id}{current} = $self->{config}{backoff}{start_value};
}
$log->debug("$id backoff: ".$self->{backoff}{$id}{current});
$self->{reconnect_timers}{$id} = AnyEvent->timer (
after => $self->get_backoff($id),
cb => sub {
$log->debug("$id trying to connect.");
$self->{stomp_clients}{$id}->connect;
},
);
}
sub increase_backoff {
my ($self, $id) = @_;
if ($self->{backoff}{$id}{current} < $self->{config}{backoff}{maximum}) {
$self->{backoff}{$id}{current} *= $self->{config}{backoff}{multiplier};
}
}
sub reset_backoff {
my ($self, $id) = @_;
delete $self->{reconnect_timer}{$id};
delete $self->{backoff}{$id}{current};
}
sub get_backoff {
my ($self, $id) = @_;
return $self->{backoff}{$id}{current};
}
1;
( run in 1.653 second using v1.01-cache-2.11-cpan-f56aa216473 )