AnyEvent-STOMP-Client

 view release on metacpan or  search on metacpan

lib/AnyEvent/STOMP/Client/Any.pm  view on Meta::CPAN

package AnyEvent::STOMP::Client::Any;

use strict;
use warnings;

use parent 'Object::Event';

use AnyEvent::STOMP::Client;
use Log::Any '$log';
use Time::HiRes 'time';


our $VERSION = '0.42';


my $SEPARATOR_ID_ACK = '#';
my $SEPARATOR_BROKER_ID = ':';

sub new {
    my $class = shift;
    my $config = shift;

    my $self = $class->SUPER::new;
    bless $self, $class;

    $self->{config} = $config;
    $self->setup_stomp_clients();

    return $self;
}

sub setup_stomp_clients {
    my $self = shift;

    if (ref($self->{config}{broker}) ne 'ARRAY') {
        $self->{config}{broker} = [$self->{config}{broker}];
    }

    foreach (@{$self->{config}{broker}}) {
        my $host = $_->{host};
        my $port = $_->{port};
        my $id = "$host$SEPARATOR_BROKER_ID$port";

        my $config = {
            connect_headers => {},
            tls_context => {
                %{$self->{config}{tls_context}},
            },
        };

        if (defined $self->{config}{connect_headers}) {
            $config->{connect_headers} = $self->{config}{connect_headers};
        }

        if (defined $_->{connect_headers}) {
            $config->{connect_headers}{keys %{$_->{connect_headers}}} = values %{$_->{connect_headers}};
        }

        $self->{stomp_clients}{$id} = new AnyEvent::STOMP::Client(
            $host, $port,
            $config->{connect_headers},
            $config->{tls_context}
        );

        $self->{stomp_clients}{$id}->on_connected(
            sub {
                my (undef, $header) = @_;

                $log->debug("$id STOMP connection established.");

                $self->{current_stomp_client} = $self->{stomp_clients}{$id};
                $self->reset_backoff;
                delete $self->{connect_timeout_timer};

                $self->event('ANY_CONNECTED', $header, $id);
            }
        );

        $self->{stomp_clients}{$id}->on_transport_connected(
            sub {
                $log->debug("$id TCP/TLS connection established.");
            }
        );



( run in 0.577 second using v1.01-cache-2.11-cpan-0bb4e1dffa6 )