AnyEvent-STOMP-Client

 view release on metacpan or  search on metacpan

lib/AnyEvent/STOMP/Client/Any.pm  view on Meta::CPAN

        $self->{stomp_clients}{$id}->on_connection_lost(
            sub {
                my (undef, undef, undef, $reason) = @_;

                delete $self->{current_stomp_client};

                $log->debug("$id Connection lost ($reason).");
                delete $self->{connect_timeout_timer};
                $self->set_client_unavailable($id);
                $self->event('ANY_CONNECTION_LOST', $id);
                $self->backoff;
            }
        );

        $self->{stomp_clients}{$id}->on_connect_error(
            sub {
                my (undef, undef, undef, $reason) = @_;
                $log->debug("$id Could not establish connection ($reason).");
                delete $self->{connect_timeout_timer};
                $self->set_client_unavailable($id);
                $self->backoff;
            }
        );

        $self->{stomp_clients}{$id}->on_receipt(
            sub {
                my (undef, $header) = @_;
                $self->event('ANY_RECEIPT', $header, $id);
            }
        );

        $self->{stomp_clients}{$id}->on_message(
            sub {
                my (undef, $header, $body) = @_;
                $self->event('ANY_MESSAGE', $header, $body, $id);
            }
        );

        $self->{stomp_clients}{$id}->on_subscribed(
            sub {
                my (undef, $destination) = @_;
                $self->event('ANY_SUBSCRIBED', $destination, $id);
            }
        );
    }

    $self->reset_clients_state;
    $log->debug("STOMP clients set up.");
}

sub connect {
    my $self = shift;
    my $id = $self->get_random_client_id;

    $log->debug("$id Establishing TCP/TLS connection.");
    $self->{stomp_clients}{$id}->connect;

    $self->{connect_timeout_timer} = AnyEvent->timer(
        after => 10,
        cb => sub {
            $log->debug("$id Timeout establishing STOMP connection.");
            $self->{stomp_clients}{$id}->disconnect;
            $self->set_client_unavailable($id);
            $self->backoff;
        }
    );
}

sub disconnect {
    my $self = shift;
    $self->get_instance->disconnect if $self->is_connected;
}

sub backoff {
    my $self = shift;

    if ($self->is_client_available) {
        $self->connect;
    }
    else {
        $self->increase_backoff;
        $self->reset_clients_state;

        $self->{reconnect_timer} = AnyEvent->timer(
            after => $self->get_backoff,
            cb => sub {
                $self->backoff;
            },
        );
    }
}

sub increase_backoff {
    my $self = shift;

    if (defined $self->{backoff}) {
        if ($self->{backoff} < $self->{config}{backoff}{maximum}) {
            my $old_backoff = $self->{backoff};
            my $randomness = rand($old_backoff)-$old_backoff/2;
            $self->{backoff} = $old_backoff*$self->{config}{backoff}{multiplier}+$randomness;
        }
        else {
            my $max = $self->{config}{backoff}{maximum};
            my $randomness = rand($max)-$max/2;
            $self->{backoff} = $max+$randomness;
        }
    }
    else {
        my $val = $self->{config}{backoff}{start_value};
        $self->{backoff} = rand($val)+$val/2;
    }

    $log->debug("Backing off ".$self->{backoff});
}

sub reset_backoff {
    my $self = shift;

    delete $self->{reconnect_timer};
    delete $self->{backoff};
    $self->reset_clients_state;

 view all matches for this distribution
 view release on metacpan -  search on metacpan

( run in 1.357 second using v1.00-cache-2.02-grep-82fe00e-cpan-cec75d87357c )