Beekeeper
view release on metacpan or search on metacpan
lib/Beekeeper/MQTT.pm view on Meta::CPAN
package Beekeeper::MQTT;
use strict;
use warnings;
our $VERSION = '0.10';
use AnyEvent;
use AnyEvent::Handle;
use Time::HiRes;
use List::Util 'shuffle';
use Scalar::Util 'weaken';
use Exporter 'import';
use Carp;
our @EXPORT_OK;
our %EXPORT_TAGS;
our $DEBUG = 0;
EXPORT: {
my (@const, @encode);
foreach (keys %{Beekeeper::MQTT::}) {
push @const, $_ if m/^MQTT_/;
push @encode, $_ if m/^_(en|de)code/;
}
@EXPORT_OK = (@const, @encode);
$EXPORT_TAGS{'const'} = \@const;
$EXPORT_TAGS{'decode'} = \@encode;
}
# 2.1.2 Control Packet type
use constant MQTT_CONNECT => 0x01;
use constant MQTT_CONNACK => 0x02;
use constant MQTT_PUBLISH => 0x03;
use constant MQTT_PUBACK => 0x04;
use constant MQTT_PUBREC => 0x05;
use constant MQTT_PUBREL => 0x06;
use constant MQTT_PUBCOMP => 0x07;
use constant MQTT_SUBSCRIBE => 0x08;
use constant MQTT_SUBACK => 0x09;
use constant MQTT_UNSUBSCRIBE => 0x0A;
use constant MQTT_UNSUBACK => 0x0B;
use constant MQTT_PINGREQ => 0x0C;
use constant MQTT_PINGRESP => 0x0D;
use constant MQTT_DISCONNECT => 0x0E;
use constant MQTT_AUTH => 0x0F;
# 2.2.2.2 Properties
use constant MQTT_PAYLOAD_FORMAT_INDICATOR => 0x01; # byte PUBLISH, Will Properties
use constant MQTT_MESSAGE_EXPIRY_INTERVAL => 0x02; # long int PUBLISH, Will Properties
use constant MQTT_CONTENT_TYPE => 0x03; # utf8 string PUBLISH, Will Properties
use constant MQTT_RESPONSE_TOPIC => 0x08; # utf8 string PUBLISH, Will Properties
use constant MQTT_CORRELATION_DATA => 0x09; # binary data PUBLISH, Will Properties
use constant MQTT_SUBSCRIPTION_IDENTIFIER => 0x0B; # variable int PUBLISH, SUBSCRIBE
use constant MQTT_SESSION_EXPIRY_INTERVAL => 0x11; # long int CONNECT, CONNACK, DISCONNECT
use constant MQTT_ASSIGNED_CLIENT_IDENTIFIER => 0x12; # utf8 string CONNACK
use constant MQTT_SERVER_KEEP_ALIVE => 0x13; # short int CONNACK
use constant MQTT_AUTHENTICATION_METHOD => 0x15; # utf8 string CONNECT, CONNACK, AUTH
use constant MQTT_AUTHENTICATION_DATA => 0x16; # binary data CONNECT, CONNACK, AUTH
use constant MQTT_REQUEST_PROBLEM_INFORMATION => 0x17; # byte CONNECT
use constant MQTT_WILL_DELAY_INTERVAL => 0x18; # long int Will Properties
use constant MQTT_REQUEST_RESPONSE_INFORMATION => 0x19; # byte CONNECT
use constant MQTT_RESPONSE_INFORMATION => 0x1A; # utf8 string CONNACK
use constant MQTT_SERVER_REFERENCE => 0x1C; # utf8 string CONNACK, DISCONNECT
use constant MQTT_REASON_STRING => 0x1F; # utf8 string CONNACK, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK, UNSUBACK, DISCONNECT, AUTH
use constant MQTT_RECEIVE_MAXIMUM => 0x21; # short int CONNECT, CONNACK
use constant MQTT_TOPIC_ALIAS_MAXIMUM => 0x22; # short int CONNECT, CONNACK
use constant MQTT_TOPIC_ALIAS => 0x23; # short int PUBLISH
use constant MQTT_MAXIMUM_QOS => 0x24; # byte CONNACK
lib/Beekeeper/MQTT.pm view on Meta::CPAN
handle => undef, # the socket
hosts => undef, # list of all hosts in cluster
is_connected => undef, # true once connected
try_hosts => undef, # list of hosts to try to connect
connect_err => undef, # count of connection errors
timeout_tmr => undef, # timer used for connection timeout
reconnect_tmr => undef, # timer used for connection retry
connack_cb => undef, # connack callback
error_cb => undef, # error callback
client_id => undef, # client id
server_prop => {}, # server properties
server_alias => {}, # server topic aliases
client_alias => {}, # client topic aliases
subscriptions => {}, # topic subscriptions
subscr_cb => {}, # subscription callbacks
packet_cb => {}, # packet callbacks
buffers => {}, # raw mqtt buffers
packet_seq => 1, # sequence used for packet ids
subscr_seq => 1, # sequence used for subscription ids
alias_seq => 1, # sequence used for topic alias ids
use_alias => 0, # topic alias enabled
config => \%args,
};
$self->{bus_id} = delete $args{'bus_id'};
$self->{bus_role} = delete $args{'bus_role'} || $self->{bus_id};
$self->{error_cb} = delete $args{'on_error'};
bless $self, $class;
return $self;
}
sub bus_id { $_[0]->{bus_id} }
sub bus_role { $_[0]->{bus_role} }
sub _fatal {
my ($self, $errstr) = @_;
die "(" . __PACKAGE__ . ") $errstr\n" unless $self->{error_cb};
$self->{error_cb}->($errstr);
}
our $BUSY_SINCE = undef;
our $BUSY_TIME = 0;
sub connect {
my ($self, %args) = @_;
$self->{connack_cb} = $args{'on_connack'};
$self->{connect_cv} = AnyEvent->condvar;
$self->_connect;
$self->{connect_cv}->recv if $args{'blocking'};
$self->{connect_cv} = undef;
return $args{'blocking'} ? $self->{is_connected} : 1;
}
sub _connect {
my ($self) = @_;
weaken($self);
my $config = $self->{config};
my $timeout = $config->{'timeout'};
$timeout = 30 unless defined $timeout;
# Ensure that timeout is set properly when the event loop was blocked
AnyEvent->now_update;
# Connection timeout handler
if ($timeout && !$self->{timeout_tmr}) {
$self->{timeout_tmr} = AnyEvent->timer( after => $timeout, cb => sub {
$self->_reset_connection;
$self->{connect_cv}->send;
$self->_fatal("Could not connect to MQTT broker after $timeout seconds");
});
}
unless ($self->{hosts}) {
# Initialize the list of cluster hosts
my $hosts = $config->{'host'} || 'localhost';
my @hosts = (ref $hosts eq 'ARRAY') ? @$hosts : ( $hosts );
$self->{hosts} = [ shuffle @hosts ];
}
# Determine next host of cluster to connect to
my $try_hosts = $self->{try_hosts} ||= [];
@$try_hosts = @{$self->{hosts}} unless @$try_hosts;
# TCP connection args
my $host = shift @$try_hosts;
my $tls = $config->{'tls'} || 0;
my $port = $config->{'port'} || ( $tls ? 8883 : 1883 );
($host) = ($host =~ m/^([a-zA-Z0-9\-\.]+)$/s); # untaint
($port) = ($port =~ m/^([0-9]+)$/s);
$self->{handle} = AnyEvent::Handle->new(
connect => [ $host, $port ],
tls => $tls ? 'connect' : undef,
keepalive => 1,
no_delay => 1,
on_connect => sub {
my ($fh, $host, $port) = @_;
# Send CONNECT packet
$self->{server_prop}->{host} = $host;
$self->{server_prop}->{port} = $port;
$self->_send_connect;
},
on_connect_error => sub {
my ($fh, $errmsg) = @_;
# Some error occurred while connection, such as an unresolved hostname
# or connection refused. Try next host of cluster immediately, or retry
# in few seconds if all hosts of the cluster are unresponsive
$self->{connect_err}++;
warn "Could not connect to MQTT broker at $host:$port: $errmsg\n" if ($self->{connect_err} <= @{$self->{hosts}});
my $delay = @{$self->{try_hosts}} ? 0 : $self->{connect_err} / @{$self->{hosts}};
$self->{reconnect_tmr} = AnyEvent->timer(
after => ($delay < 10 ? $delay : 10),
cb => sub { $self->_connect },
lib/Beekeeper/MQTT.pm view on Meta::CPAN
sub unsubscribe {
my ($self, %args) = @_;
my $topic = delete $args{'topic'};
my $topics = delete $args{'topics'};
my $unsuback_cb = delete $args{'on_unsuback'};
$topics = [] unless defined $topics;
push (@$topics, $topic) if defined $topic;
croak "Unsubscription topics were not specified" unless @$topics;
croak "on_unsuback callback is required" unless $unsuback_cb;
foreach my $topic (@$topics) {
croak "Undefined unsubscription topic" unless defined $topic;
croak "Empty unsubscription topic" unless length $topic;
}
my $packet_id = $self->{packet_seq}++;
$self->{packet_seq} = 1 if $packet_id == 0xFFFF;
# Set callback for UNSUBACK
$self->{packet_cb}->{$packet_id} = {
topics => [ @$topics ], # copy
unsuback_cb => $unsuback_cb,
};
# 3.10.2.1.2 User Property (utf8 string pair)
my $raw_prop = '';
foreach my $key (keys %args) {
my $val = $args{$key};
next unless defined $val;
utf8::encode( $key );
utf8::encode( $val );
$raw_prop .= pack("C n/a* n/a*", MQTT_USER_PROPERTY, $key, $val);
}
my $raw_mqtt = pack("n", $packet_id) . # 3.10.2 Packet identifier
_encode_var_int(length $raw_prop) . # 3.10.2.1 Property Length
$raw_prop; # 3.10.2.1 Properties
# 3.10.3 Payload
foreach my $topic (@$topics) {
utf8::encode($topic);
$raw_mqtt .= pack("n/a*", $topic);
}
$self->{handle}->push_write(
pack("C", MQTT_UNSUBSCRIBE << 4 | 0x02) . # 3.10.1 Packet type
_encode_var_int(length $raw_mqtt) . # 3.10.1 Packet length
$raw_mqtt
);
1;
}
sub _receive_unsuback {
my ($self, $packet) = @_;
weaken($self);
# 3.11.2 Packet id (short int)
my $offs = 0;
my $packet_id = _decode_int_16($packet, \$offs);
# 3.11.2.1.1 Property Length (variable length int)
my $prop_len = _decode_var_int($packet, \$offs);
my $prop_end = $offs + $prop_len;
my %prop;
while ($offs < $prop_end) {
my $prop_id = _decode_byte($packet, \$offs);
if ($prop_id == MQTT_REASON_STRING) {
# 3.11.2.1.2 Reason String (utf8 string)
$prop{'reason_string'} = _decode_utf8_str($packet, \$offs);
}
elsif ($prop_id == MQTT_USER_PROPERTY) {
# 3.11.2.1.3 User Property (utf8 string pair)
my $key = _decode_utf8_str($packet, \$offs);
my $val = _decode_utf8_str($packet, \$offs);
$prop{$key} = $val;
}
else {
# Protocol error
$self->_fatal("Received UNSUBACK with unexpected property $prop_id");
}
}
# 3.11.3 Payload
my @reason_codes = unpack("C*", substr($$packet, $offs));
my $packet_cb = delete $self->{packet_cb}->{$packet_id};
$self->_fatal("Received unexpected UNSUBACK") unless $packet_cb;
my $topics = $packet_cb->{topics};
my $unsuback_cb = $packet_cb->{unsuback_cb};
my $success = 1;
my @properties;
foreach my $code (@reason_codes) {
my $topic = shift @$topics;
my $reason = $Reason_code{$code};
if ($code == 0) {
# Success
my $subs = $self->{subscriptions};
my $subscr_id = delete $subs->{$topic};
if ($subscr_id) {
# Free on_publish callback if not used by another subscription
my @still_used = grep { $subs->{$_} == $subscr_id } keys %$subs;
unless (@still_used) {
# But not right now, as broker may send some messages *after* unsubscription
$self->{_timers}->{"unsub-$subscr_id"} = AnyEvent->timer( after => 60, cb => sub {
delete $self->{_timers}->{"unsub-$subscr_id"};
delete $self->{subscr_cb}->{$subscr_id};
});
( run in 1.495 second using v1.01-cache-2.11-cpan-5b529ec07f3 )