AnyEvent-STOMP-Client

 view release on metacpan or  search on metacpan

lib/AnyEvent/STOMP/Client.pm  view on Meta::CPAN


    my $connect_headers = shift || {};

    $self->{connect_headers} = {
        'accept-version' => '1.2',
        'host' => $self->{host},
        'heart-beat' => '0,0'
    };

    if (defined $connect_headers->{'virtual-host'}) {
        $self->{connect_headers}{host} = $connect_headers->{'virtual-host'};
    }

    if (defined $connect_headers->{'heart-beat'}) {
        $self->{connect_headers}{'heart-beat'} = $connect_headers->{'heart-beat'};
    }

    if (defined $connect_headers->{'login'}
        and defined $connect_headers->{'passcode'}
    ) {
        $self->{connect_headers}{'login'} = $connect_headers->{'login'};
        $self->{connect_headers}{'passcode'} = $connect_headers->{'passcode'};
    }

    my $tls_ctx = shift;
    if (defined $tls_ctx) {
        $self->{tls_hash}{tls} = 'connect';
        $self->{tls_hash}{tls_ctx} = $tls_ctx;

        if ($tls_ctx->{verify}) {
            foreach my $key (keys %$tls_ctx) {
                if ($key =~ m/_file$/ && not -r $tls_ctx->{$key}) {
                    die "ERROR: Cannot access $key at $tls_ctx->{$key}.\n";
                }
            }
        }
    }

    $self->set_exception_cb(
        sub {
            my ($exception, $eventname) = @_;
            $self->event('ERROR', $self->{host}, $self->{port}, "$eventname: $exception");
        }
    );

    return bless $self, $class;
}

sub connect {
    my $self = shift;

    if ($self->is_connected) {
        undef $self->{handle};
        $self->{connected} = 0;
    }

    $self->{subscriptions} = {};

    $self->{handle} = AnyEvent::Handle->new(
        connect => [$self->{host}, $self->{port}],
        keep_alive => 1,
        no_delay => 1,
        on_eof => sub {
            undef $self->{handle};
            $self->{connected} = 0;
            $self->event('TRANSPORT_DISCONNECTED', $self->{host}, $self->{port});
        },
        on_connect => sub {
            $self->event('TRANSPORT_CONNECTED', $self->{host}, $self->{port});
            $self->send_frame('CONNECT', $self->{connect_headers});
        },
        on_connect_error => sub {
            my ($handle, $error_message) = @_;
            $handle->destroy;
            undef $self->{handle};
            $self->{connected} = 0;
            $self->event('TRANSPORT_CONNECT_ERROR', $self->{host}, $self->{port}, $error_message);
        },
        on_error => sub {
            my ($handle, $fatal, $error_message) = @_;
            $self->event('ERROR', $self->{host}, $self->{port}, $error_message);
            if ($fatal) {
                undef $self->{handle};
                $self->{connected} = 0;
                # $handle->destroy() will be called automatically after this callback, see
                # https://metacpan.org/pod/AnyEvent::Handle#on_error-=%3E-$cb-%3E($handle,-$fatal,-$message)
                $self->event('CONNECTION_LOST', $self->{host}, $self->{port}, $error_message);
            }
        },
        on_read => sub {
            $self->read_frame;
        },
        %{$self->{tls_hash}},
    );
}

sub disconnect {
    my $self = shift;
    my $ungraceful = shift;

    unless ($self->is_connected) {
        if (defined $self->{handle}) {
            $self->{handle}->destroy;
            delete $self->{handle};
        }
        $self->event('DISCONNECTED', $self->{host}, $self->{port}, $ungraceful);
        delete $self->{heartbeat}{timer};
        return;
    }

    if (defined $ungraceful and $ungraceful) {
        $self->send_frame('DISCONNECT');
        $self->{connected} = 0;
        if (defined $self->{handle}) {
            $self->{handle}->push_shutdown;
            $self->{handle}->destroy;
            delete $self->{handle};
        }
        $self->event('DISCONNECTED', $self->{host}, $self->{port}, $ungraceful);
        delete $self->{heartbeat}{timer};
    }



( run in 2.599 seconds using v1.01-cache-2.11-cpan-df04353d9ac )