AnyEvent-STOMP-Client

 view release on metacpan or  search on metacpan

lib/AnyEvent/STOMP/Client/All.pm  view on Meta::CPAN

        $self->{stomp_clients}{$id}->on_error($callback);
    }
}

sub on_message {
    my ($self, $callback) = @_;

    foreach my $id (keys %{$self->{stomp_clients}}) {
        $self->{stomp_clients}{$id}->on_message(
            sub {
                my ($self, $header, $body) = @_;

                delete $header->{'subscription'};
                delete $header->{'message-id'};
                delete $header->{'receipt'};

                if (defined $header->{'ack'}) {
                    $log->debug("$id message $header->{'ack'} received.");
                    $header->{'ack'} = $id.$SEPARATOR_ID_ACK.$header->{'ack'} if defined $header->{'ack'};
                }
                else {
                    $log->debug("$id message received.");
                }

                &$callback($self, $header, $body);
            }
        );
    }
}

sub ack {
    my ($self, $id_ack) = @_;
    my ($id, $ack) = split $SEPARATOR_ID_ACK, $id_ack;

    $log->debug("$id sending ack $ack.");

    $self->{stomp_clients}{$id}->ack($ack);
}

sub nack {
    my ($self, $id_ack) = @_;
    my ($id, $ack) = split $SEPARATOR_ID_ACK, $id_ack;

    $log->debug("$id sending nack $ack.");

    $self->{stomp_clients}{$id}->nack($ack);
}

sub backoff {
    my ($self, $id) = @_;

    if (defined $self->{backoff}{$id}{current}) {
        $self->increase_backoff($id);
    }
    else {
        $self->{backoff}{$id}{current} = $self->{config}{backoff}{start_value};
    }

    $log->debug("$id backoff: ".$self->{backoff}{$id}{current});

    $self->{reconnect_timers}{$id} = AnyEvent->timer (
        after => $self->get_backoff($id),
        cb => sub {
            $log->debug("$id trying to connect.");
            $self->{stomp_clients}{$id}->connect;
        },
    );
}

sub increase_backoff {
    my ($self, $id) = @_;

    if ($self->{backoff}{$id}{current} < $self->{config}{backoff}{maximum}) {
        $self->{backoff}{$id}{current} *= $self->{config}{backoff}{multiplier};
    }
}

sub reset_backoff {
    my ($self, $id) = @_;

    delete $self->{reconnect_timer}{$id};
    delete $self->{backoff}{$id}{current};
}

sub get_backoff {
    my ($self, $id) = @_;

    return $self->{backoff}{$id}{current};
}

1;



( run in 1.653 second using v1.01-cache-2.11-cpan-f56aa216473 )