AnyEvent-STOMP-Client
view release on metacpan - search on metacpan
view release on metacpan or search on metacpan
lib/AnyEvent/STOMP/Client/Any.pm view on Meta::CPAN
$self->{stomp_clients}{$id}->on_connection_lost(
sub {
my (undef, undef, undef, $reason) = @_;
delete $self->{current_stomp_client};
$log->debug("$id Connection lost ($reason).");
delete $self->{connect_timeout_timer};
$self->set_client_unavailable($id);
$self->event('ANY_CONNECTION_LOST', $id);
$self->backoff;
}
);
$self->{stomp_clients}{$id}->on_connect_error(
sub {
my (undef, undef, undef, $reason) = @_;
$log->debug("$id Could not establish connection ($reason).");
delete $self->{connect_timeout_timer};
$self->set_client_unavailable($id);
$self->backoff;
}
);
$self->{stomp_clients}{$id}->on_receipt(
sub {
my (undef, $header) = @_;
$self->event('ANY_RECEIPT', $header, $id);
}
);
$self->{stomp_clients}{$id}->on_message(
sub {
my (undef, $header, $body) = @_;
$self->event('ANY_MESSAGE', $header, $body, $id);
}
);
$self->{stomp_clients}{$id}->on_subscribed(
sub {
my (undef, $destination) = @_;
$self->event('ANY_SUBSCRIBED', $destination, $id);
}
);
}
$self->reset_clients_state;
$log->debug("STOMP clients set up.");
}
sub connect {
my $self = shift;
my $id = $self->get_random_client_id;
$log->debug("$id Establishing TCP/TLS connection.");
$self->{stomp_clients}{$id}->connect;
$self->{connect_timeout_timer} = AnyEvent->timer(
after => 10,
cb => sub {
$log->debug("$id Timeout establishing STOMP connection.");
$self->{stomp_clients}{$id}->disconnect;
$self->set_client_unavailable($id);
$self->backoff;
}
);
}
sub disconnect {
my $self = shift;
$self->get_instance->disconnect if $self->is_connected;
}
sub backoff {
my $self = shift;
if ($self->is_client_available) {
$self->connect;
}
else {
$self->increase_backoff;
$self->reset_clients_state;
$self->{reconnect_timer} = AnyEvent->timer(
after => $self->get_backoff,
cb => sub {
$self->backoff;
},
);
}
}
sub increase_backoff {
my $self = shift;
if (defined $self->{backoff}) {
if ($self->{backoff} < $self->{config}{backoff}{maximum}) {
my $old_backoff = $self->{backoff};
my $randomness = rand($old_backoff)-$old_backoff/2;
$self->{backoff} = $old_backoff*$self->{config}{backoff}{multiplier}+$randomness;
}
else {
my $max = $self->{config}{backoff}{maximum};
my $randomness = rand($max)-$max/2;
$self->{backoff} = $max+$randomness;
}
}
else {
my $val = $self->{config}{backoff}{start_value};
$self->{backoff} = rand($val)+$val/2;
}
$log->debug("Backing off ".$self->{backoff});
}
sub reset_backoff {
my $self = shift;
delete $self->{reconnect_timer};
delete $self->{backoff};
$self->reset_clients_state;
view all matches for this distributionview release on metacpan - search on metacpan
( run in 1.357 second using v1.00-cache-2.02-grep-82fe00e-cpan-cec75d87357c )