Beekeeper
view release on metacpan or search on metacpan
lib/Beekeeper/MQTT.pm view on Meta::CPAN
sub _fatal {
my ($self, $errstr) = @_;
die "(" . __PACKAGE__ . ") $errstr\n" unless $self->{error_cb};
$self->{error_cb}->($errstr);
}
our $BUSY_SINCE = undef;
our $BUSY_TIME = 0;
sub connect {
my ($self, %args) = @_;
$self->{connack_cb} = $args{'on_connack'};
$self->{connect_cv} = AnyEvent->condvar;
$self->_connect;
$self->{connect_cv}->recv if $args{'blocking'};
$self->{connect_cv} = undef;
return $args{'blocking'} ? $self->{is_connected} : 1;
}
sub _connect {
my ($self) = @_;
weaken($self);
my $config = $self->{config};
my $timeout = $config->{'timeout'};
$timeout = 30 unless defined $timeout;
# Ensure that timeout is set properly when the event loop was blocked
AnyEvent->now_update;
# Connection timeout handler
if ($timeout && !$self->{timeout_tmr}) {
$self->{timeout_tmr} = AnyEvent->timer( after => $timeout, cb => sub {
$self->_reset_connection;
$self->{connect_cv}->send;
$self->_fatal("Could not connect to MQTT broker after $timeout seconds");
});
}
unless ($self->{hosts}) {
# Initialize the list of cluster hosts
my $hosts = $config->{'host'} || 'localhost';
my @hosts = (ref $hosts eq 'ARRAY') ? @$hosts : ( $hosts );
$self->{hosts} = [ shuffle @hosts ];
}
# Determine next host of cluster to connect to
my $try_hosts = $self->{try_hosts} ||= [];
@$try_hosts = @{$self->{hosts}} unless @$try_hosts;
# TCP connection args
my $host = shift @$try_hosts;
my $tls = $config->{'tls'} || 0;
my $port = $config->{'port'} || ( $tls ? 8883 : 1883 );
($host) = ($host =~ m/^([a-zA-Z0-9\-\.]+)$/s); # untaint
($port) = ($port =~ m/^([0-9]+)$/s);
$self->{handle} = AnyEvent::Handle->new(
connect => [ $host, $port ],
tls => $tls ? 'connect' : undef,
keepalive => 1,
no_delay => 1,
on_connect => sub {
my ($fh, $host, $port) = @_;
# Send CONNECT packet
$self->{server_prop}->{host} = $host;
$self->{server_prop}->{port} = $port;
$self->_send_connect;
},
on_connect_error => sub {
my ($fh, $errmsg) = @_;
# Some error occurred while connection, such as an unresolved hostname
# or connection refused. Try next host of cluster immediately, or retry
# in few seconds if all hosts of the cluster are unresponsive
$self->{connect_err}++;
warn "Could not connect to MQTT broker at $host:$port: $errmsg\n" if ($self->{connect_err} <= @{$self->{hosts}});
my $delay = @{$self->{try_hosts}} ? 0 : $self->{connect_err} / @{$self->{hosts}};
$self->{reconnect_tmr} = AnyEvent->timer(
after => ($delay < 10 ? $delay : 10),
cb => sub { $self->_connect },
);
},
on_error => sub {
my ($fh, $fatal, $errmsg) = @_;
# Some error occurred, such as a read error
$self->_reset_connection;
$self->_fatal("Error on connection to MQTT broker at $host:$port: $errmsg");
},
on_eof => sub {
my ($fh) = @_;
# The server has closed the connection without sending DISCONNECT
$self->_reset_connection;
$self->_fatal("MQTT broker at $host:$port has gone away");
},
on_read => sub {
my ($fh) = @_;
my $packet_type;
my $packet_flags;
my $rbuff_len;
my $packet_len;
my $mult;
my $offs;
my $byte;
my $timing_packets;
unless (defined $BUSY_SINCE) {
# Measure time elapsed while processing incoming packets
$BUSY_SINCE = Time::HiRes::time;
$timing_packets = 1;
}
( run in 0.856 second using v1.01-cache-2.11-cpan-8f98c5d2c55 )