AnyEvent-STOMP-Client
view release on metacpan or search on metacpan
lib/AnyEvent/STOMP/Client.pm view on Meta::CPAN
use Carp;
use AnyEvent;
use AnyEvent::Handle;
use List::Util 'max';
our $VERSION = '0.42';
my $EOL = chr(10);
my $NULL = chr(0);
my %ENCODE_MAP = (
"\r" => "\\r",
"\n" => "\\n",
":" => "\\c",
"\\" => "\\\\",
);
my %DECODE_MAP = reverse %ENCODE_MAP;
sub new {
my $class = shift;
my $self = $class->SUPER::new;
$self->{connection_timeout_margin} = 250;
$self->{connected} = 0;
$self->{counter} = 0;
$self->{host} = shift || 'localhost';
$self->{port} = shift || 61613;
my $connect_headers = shift || {};
$self->{connect_headers} = {
'accept-version' => '1.2',
'host' => $self->{host},
'heart-beat' => '0,0'
};
if (defined $connect_headers->{'virtual-host'}) {
$self->{connect_headers}{host} = $connect_headers->{'virtual-host'};
}
if (defined $connect_headers->{'heart-beat'}) {
$self->{connect_headers}{'heart-beat'} = $connect_headers->{'heart-beat'};
}
if (defined $connect_headers->{'login'}
and defined $connect_headers->{'passcode'}
) {
$self->{connect_headers}{'login'} = $connect_headers->{'login'};
$self->{connect_headers}{'passcode'} = $connect_headers->{'passcode'};
}
my $tls_ctx = shift;
if (defined $tls_ctx) {
$self->{tls_hash}{tls} = 'connect';
$self->{tls_hash}{tls_ctx} = $tls_ctx;
if ($tls_ctx->{verify}) {
foreach my $key (keys %$tls_ctx) {
if ($key =~ m/_file$/ && not -r $tls_ctx->{$key}) {
die "ERROR: Cannot access $key at $tls_ctx->{$key}.\n";
}
}
}
}
$self->set_exception_cb(
sub {
my ($exception, $eventname) = @_;
$self->event('ERROR', $self->{host}, $self->{port}, "$eventname: $exception");
}
);
return bless $self, $class;
}
sub connect {
my $self = shift;
if ($self->is_connected) {
undef $self->{handle};
$self->{connected} = 0;
}
$self->{subscriptions} = {};
$self->{handle} = AnyEvent::Handle->new(
connect => [$self->{host}, $self->{port}],
keep_alive => 1,
no_delay => 1,
on_eof => sub {
undef $self->{handle};
$self->{connected} = 0;
$self->event('TRANSPORT_DISCONNECTED', $self->{host}, $self->{port});
},
on_connect => sub {
$self->event('TRANSPORT_CONNECTED', $self->{host}, $self->{port});
$self->send_frame('CONNECT', $self->{connect_headers});
},
on_connect_error => sub {
my ($handle, $error_message) = @_;
$handle->destroy;
undef $self->{handle};
$self->{connected} = 0;
$self->event('TRANSPORT_CONNECT_ERROR', $self->{host}, $self->{port}, $error_message);
},
on_error => sub {
my ($handle, $fatal, $error_message) = @_;
$self->event('ERROR', $self->{host}, $self->{port}, $error_message);
if ($fatal) {
undef $self->{handle};
$self->{connected} = 0;
# $handle->destroy() will be called automatically after this callback, see
# https://metacpan.org/pod/AnyEvent::Handle#on_error-=%3E-$cb-%3E($handle,-$fatal,-$message)
$self->event('CONNECTION_LOST', $self->{host}, $self->{port}, $error_message);
}
},
on_read => sub {
( run in 1.350 second using v1.01-cache-2.11-cpan-0bb4e1dffa6 )