Beekeeper
view release on metacpan or search on metacpan
lib/Beekeeper/MQTT.pm view on Meta::CPAN
return $int;
}
sub _decode_utf8_str {
my ($packet, $offs) = @_;
my $str = unpack("n/a", substr($$packet, $$offs));
$$offs += 2 + length($str);
utf8::decode($str);
return $str;
}
sub _decode_binary_data {
my ($packet, $offs) = @_;
my $data = unpack("n/a", substr($$packet, $$offs));
$$offs += 2 + length($data);
return $data;
}
sub _decode_var_int {
my ($packet, $offs) = @_;
my $int = 0;
my $mult = 1;
my $byte;
do {
$byte = unpack("C", substr($$packet, $$offs, 1));
$int += ($byte & 0x7F) * $mult;
$mult *= 128;
$$offs++;
} while ($byte & 0x80);
return $int;
}
sub _encode_var_int {
return pack("C", $_[0]) if ($_[0] < 128);
my @a = unpack("C*", pack("w", $_[0]));
$a[0] &= 0x7F;
$a[-1] |= 0x80;
return pack("C*", reverse @a);
}
sub new {
my ($class, %args) = @_;
my $self = {
bus_id => undef,
bus_role => undef,
handle => undef, # the socket
hosts => undef, # list of all hosts in cluster
is_connected => undef, # true once connected
try_hosts => undef, # list of hosts to try to connect
connect_err => undef, # count of connection errors
timeout_tmr => undef, # timer used for connection timeout
reconnect_tmr => undef, # timer used for connection retry
connack_cb => undef, # connack callback
error_cb => undef, # error callback
client_id => undef, # client id
server_prop => {}, # server properties
server_alias => {}, # server topic aliases
client_alias => {}, # client topic aliases
subscriptions => {}, # topic subscriptions
subscr_cb => {}, # subscription callbacks
packet_cb => {}, # packet callbacks
buffers => {}, # raw mqtt buffers
packet_seq => 1, # sequence used for packet ids
subscr_seq => 1, # sequence used for subscription ids
alias_seq => 1, # sequence used for topic alias ids
use_alias => 0, # topic alias enabled
config => \%args,
};
$self->{bus_id} = delete $args{'bus_id'};
$self->{bus_role} = delete $args{'bus_role'} || $self->{bus_id};
$self->{error_cb} = delete $args{'on_error'};
bless $self, $class;
return $self;
}
sub bus_id { $_[0]->{bus_id} }
sub bus_role { $_[0]->{bus_role} }
sub _fatal {
my ($self, $errstr) = @_;
die "(" . __PACKAGE__ . ") $errstr\n" unless $self->{error_cb};
$self->{error_cb}->($errstr);
}
our $BUSY_SINCE = undef;
our $BUSY_TIME = 0;
sub connect {
my ($self, %args) = @_;
$self->{connack_cb} = $args{'on_connack'};
$self->{connect_cv} = AnyEvent->condvar;
$self->_connect;
$self->{connect_cv}->recv if $args{'blocking'};
$self->{connect_cv} = undef;
return $args{'blocking'} ? $self->{is_connected} : 1;
}
sub _connect {
my ($self) = @_;
weaken($self);
my $config = $self->{config};
my $timeout = $config->{'timeout'};
$timeout = 30 unless defined $timeout;
# Ensure that timeout is set properly when the event loop was blocked
AnyEvent->now_update;
# Connection timeout handler
if ($timeout && !$self->{timeout_tmr}) {
$self->{timeout_tmr} = AnyEvent->timer( after => $timeout, cb => sub {
$self->_reset_connection;
$self->{connect_cv}->send;
$self->_fatal("Could not connect to MQTT broker after $timeout seconds");
});
}
unless ($self->{hosts}) {
# Initialize the list of cluster hosts
my $hosts = $config->{'host'} || 'localhost';
my @hosts = (ref $hosts eq 'ARRAY') ? @$hosts : ( $hosts );
$self->{hosts} = [ shuffle @hosts ];
}
# Determine next host of cluster to connect to
my $try_hosts = $self->{try_hosts} ||= [];
@$try_hosts = @{$self->{hosts}} unless @$try_hosts;
# TCP connection args
my $host = shift @$try_hosts;
my $tls = $config->{'tls'} || 0;
my $port = $config->{'port'} || ( $tls ? 8883 : 1883 );
($host) = ($host =~ m/^([a-zA-Z0-9\-\.]+)$/s); # untaint
($port) = ($port =~ m/^([0-9]+)$/s);
$self->{handle} = AnyEvent::Handle->new(
connect => [ $host, $port ],
tls => $tls ? 'connect' : undef,
keepalive => 1,
no_delay => 1,
on_connect => sub {
my ($fh, $host, $port) = @_;
# Send CONNECT packet
$self->{server_prop}->{host} = $host;
$self->{server_prop}->{port} = $port;
$self->_send_connect;
},
on_connect_error => sub {
my ($fh, $errmsg) = @_;
# Some error occurred while connection, such as an unresolved hostname
# or connection refused. Try next host of cluster immediately, or retry
# in few seconds if all hosts of the cluster are unresponsive
$self->{connect_err}++;
warn "Could not connect to MQTT broker at $host:$port: $errmsg\n" if ($self->{connect_err} <= @{$self->{hosts}});
my $delay = @{$self->{try_hosts}} ? 0 : $self->{connect_err} / @{$self->{hosts}};
$self->{reconnect_tmr} = AnyEvent->timer(
after => ($delay < 10 ? $delay : 10),
cb => sub { $self->_connect },
);
},
on_error => sub {
my ($fh, $fatal, $errmsg) = @_;
# Some error occurred, such as a read error
$self->_reset_connection;
$self->_fatal("Error on connection to MQTT broker at $host:$port: $errmsg");
},
on_eof => sub {
my ($fh) = @_;
# The server has closed the connection without sending DISCONNECT
$self->_reset_connection;
$self->_fatal("MQTT broker at $host:$port has gone away");
},
on_read => sub {
my ($fh) = @_;
my $packet_type;
my $packet_flags;
my $rbuff_len;
my $packet_len;
my $mult;
my $offs;
my $byte;
my $timing_packets;
unless (defined $BUSY_SINCE) {
# Measure time elapsed while processing incoming packets
$BUSY_SINCE = Time::HiRes::time;
$timing_packets = 1;
}
PARSE_PACKET: {
$rbuff_len = length $fh->{rbuf};
last PARSE_PACKET unless $rbuff_len >= 2;
unless ($packet_type) {
$packet_len = 0;
$mult = 1;
$offs = 1;
PARSE_LEN: {
$byte = unpack "C", substr( $fh->{rbuf}, $offs++, 1 );
$packet_len += ($byte & 0x7f) * $mult;
last unless ($byte & 0x80);
last PARSE_PACKET if ($offs >= $rbuff_len); # Not enough data
$mult *= 128;
redo if ($offs < 5);
}
#TODO: Check max packet size
lib/Beekeeper/MQTT.pm view on Meta::CPAN
sub _receive_unsuback {
my ($self, $packet) = @_;
weaken($self);
# 3.11.2 Packet id (short int)
my $offs = 0;
my $packet_id = _decode_int_16($packet, \$offs);
# 3.11.2.1.1 Property Length (variable length int)
my $prop_len = _decode_var_int($packet, \$offs);
my $prop_end = $offs + $prop_len;
my %prop;
while ($offs < $prop_end) {
my $prop_id = _decode_byte($packet, \$offs);
if ($prop_id == MQTT_REASON_STRING) {
# 3.11.2.1.2 Reason String (utf8 string)
$prop{'reason_string'} = _decode_utf8_str($packet, \$offs);
}
elsif ($prop_id == MQTT_USER_PROPERTY) {
# 3.11.2.1.3 User Property (utf8 string pair)
my $key = _decode_utf8_str($packet, \$offs);
my $val = _decode_utf8_str($packet, \$offs);
$prop{$key} = $val;
}
else {
# Protocol error
$self->_fatal("Received UNSUBACK with unexpected property $prop_id");
}
}
# 3.11.3 Payload
my @reason_codes = unpack("C*", substr($$packet, $offs));
my $packet_cb = delete $self->{packet_cb}->{$packet_id};
$self->_fatal("Received unexpected UNSUBACK") unless $packet_cb;
my $topics = $packet_cb->{topics};
my $unsuback_cb = $packet_cb->{unsuback_cb};
my $success = 1;
my @properties;
foreach my $code (@reason_codes) {
my $topic = shift @$topics;
my $reason = $Reason_code{$code};
if ($code == 0) {
# Success
my $subs = $self->{subscriptions};
my $subscr_id = delete $subs->{$topic};
if ($subscr_id) {
# Free on_publish callback if not used by another subscription
my @still_used = grep { $subs->{$_} == $subscr_id } keys %$subs;
unless (@still_used) {
# But not right now, as broker may send some messages *after* unsubscription
$self->{_timers}->{"unsub-$subscr_id"} = AnyEvent->timer( after => 60, cb => sub {
delete $self->{_timers}->{"unsub-$subscr_id"};
delete $self->{subscr_cb}->{$subscr_id};
});
}
}
}
else {
# Failure
$success = 0;
unless ($unsuback_cb) {
$self->_fatal("Unsubscription to topic '$topic' failed: $reason");
}
}
push @properties, {
topic => $topic,
reason_code => $code,
reason => $reason,
%prop
};
}
$unsuback_cb->($success, @properties) if $unsuback_cb;
}
our $AE_WAITING;
sub publish {
my ($self, %args) = @_;
my $topic = delete $args{'topic'};
my $payload = delete $args{'payload'};
my $qos = delete $args{'qos'};
my $dup = delete $args{'duplicate'};
my $retain = delete $args{'retain'};
my $on_puback = delete $args{'on_puback'};
my $buffer_id = delete $args{'buffer_id'};
croak "Message topic was not specified" unless defined $topic;
$DEBUG && warn "Sent message to: $topic\n";
$payload = '' unless defined $payload;
my $payload_ref = (ref $payload eq 'SCALAR') ? $payload : \$payload;
# 3.3.2.3.4 Topic Alias
my $topic_alias;
if ($self->{use_alias}) {
$topic_alias = $self->{client_alias}->{$topic};
if ($topic_alias) {
# Send topic alias only
$topic = '';
}
elsif ($self->{server_prop}->{'topic_alias_maximum'}) {
#TODO: Honor maximum
$topic_alias = $self->{alias_seq}++;
$self->{client_alias}->{$topic} = $topic_alias;
}
}
# 3.3.1.2 QoS level
( run in 2.266 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )