Beekeeper
view release on metacpan or search on metacpan
lib/Beekeeper/MQTT.pm view on Meta::CPAN
our $BUSY_SINCE = undef;
our $BUSY_TIME = 0;
sub connect {
my ($self, %args) = @_;
$self->{connack_cb} = $args{'on_connack'};
$self->{connect_cv} = AnyEvent->condvar;
$self->_connect;
$self->{connect_cv}->recv if $args{'blocking'};
$self->{connect_cv} = undef;
return $args{'blocking'} ? $self->{is_connected} : 1;
}
sub _connect {
my ($self) = @_;
weaken($self);
my $config = $self->{config};
my $timeout = $config->{'timeout'};
$timeout = 30 unless defined $timeout;
# Ensure that timeout is set properly when the event loop was blocked
AnyEvent->now_update;
# Connection timeout handler
if ($timeout && !$self->{timeout_tmr}) {
$self->{timeout_tmr} = AnyEvent->timer( after => $timeout, cb => sub {
$self->_reset_connection;
$self->{connect_cv}->send;
$self->_fatal("Could not connect to MQTT broker after $timeout seconds");
});
}
unless ($self->{hosts}) {
# Initialize the list of cluster hosts
my $hosts = $config->{'host'} || 'localhost';
my @hosts = (ref $hosts eq 'ARRAY') ? @$hosts : ( $hosts );
$self->{hosts} = [ shuffle @hosts ];
}
# Determine next host of cluster to connect to
my $try_hosts = $self->{try_hosts} ||= [];
@$try_hosts = @{$self->{hosts}} unless @$try_hosts;
# TCP connection args
my $host = shift @$try_hosts;
my $tls = $config->{'tls'} || 0;
my $port = $config->{'port'} || ( $tls ? 8883 : 1883 );
($host) = ($host =~ m/^([a-zA-Z0-9\-\.]+)$/s); # untaint
($port) = ($port =~ m/^([0-9]+)$/s);
$self->{handle} = AnyEvent::Handle->new(
connect => [ $host, $port ],
tls => $tls ? 'connect' : undef,
keepalive => 1,
no_delay => 1,
on_connect => sub {
my ($fh, $host, $port) = @_;
# Send CONNECT packet
$self->{server_prop}->{host} = $host;
$self->{server_prop}->{port} = $port;
$self->_send_connect;
},
on_connect_error => sub {
my ($fh, $errmsg) = @_;
# Some error occurred while connection, such as an unresolved hostname
# or connection refused. Try next host of cluster immediately, or retry
# in few seconds if all hosts of the cluster are unresponsive
$self->{connect_err}++;
warn "Could not connect to MQTT broker at $host:$port: $errmsg\n" if ($self->{connect_err} <= @{$self->{hosts}});
my $delay = @{$self->{try_hosts}} ? 0 : $self->{connect_err} / @{$self->{hosts}};
$self->{reconnect_tmr} = AnyEvent->timer(
after => ($delay < 10 ? $delay : 10),
cb => sub { $self->_connect },
);
},
on_error => sub {
my ($fh, $fatal, $errmsg) = @_;
# Some error occurred, such as a read error
$self->_reset_connection;
$self->_fatal("Error on connection to MQTT broker at $host:$port: $errmsg");
},
on_eof => sub {
my ($fh) = @_;
# The server has closed the connection without sending DISCONNECT
$self->_reset_connection;
$self->_fatal("MQTT broker at $host:$port has gone away");
},
on_read => sub {
my ($fh) = @_;
my $packet_type;
my $packet_flags;
my $rbuff_len;
my $packet_len;
my $mult;
my $offs;
my $byte;
my $timing_packets;
unless (defined $BUSY_SINCE) {
# Measure time elapsed while processing incoming packets
$BUSY_SINCE = Time::HiRes::time;
$timing_packets = 1;
}
PARSE_PACKET: {
$rbuff_len = length $fh->{rbuf};
last PARSE_PACKET unless $rbuff_len >= 2;
lib/Beekeeper/MQTT.pm view on Meta::CPAN
}
elsif ($packet_type == MQTT_PINGREQ) {
$self->pingresp;
}
elsif ($packet_type == MQTT_PINGRESP) {
# Client takes no action on receiving PINGRESP
}
elsif ($packet_type == MQTT_SUBACK) {
$self->_receive_suback(\$packet);
}
elsif ($packet_type == MQTT_UNSUBACK) {
$self->_receive_unsuback(\$packet);
}
elsif ($packet_type == MQTT_CONNACK) {
$self->_receive_connack(\$packet);
}
elsif ($packet_type == MQTT_DISCONNECT) {
$self->_receive_disconnect(\$packet);
}
elsif ($packet_type == MQTT_AUTH) {
$self->_receive_auth(\$packet);
}
else {
# Protocol error
$self->_fatal("Received packet with unknown type $packet_type");
}
# Prepare for next frame
undef $packet_type;
# Handle could have been destroyed at this point
redo PARSE_PACKET if defined $fh->{rbuf};
}
if (defined $timing_packets) {
$BUSY_TIME += Time::HiRes::time - $BUSY_SINCE;
undef $BUSY_SINCE;
}
},
);
1;
}
sub _send_connect {
my ($self) = @_;
my %prop = %{$self->{config}};
my $username = delete $prop{'username'};
my $password = delete $prop{'password'};
my $client_id = delete $prop{'client_id'};
my $clean_start = delete $prop{'clean_start'};
my $keep_alive = delete $prop{'keep_alive'};
my $will = delete $prop{'will'};
unless ($client_id) {
$client_id = '';
$client_id .= ('0'..'9','a'..'z','A'..'Z')[rand 62] for (1..22);
}
$self->{client_id} = $client_id;
# 3.1.2.11 Properties
my $raw_prop = '';
if (exists $prop{'session_expiry_interval'}) {
# 3.1.2.11.2 Session Expiry Interval (long int)
$raw_prop .= pack("C N", MQTT_SESSION_EXPIRY_INTERVAL, delete $prop{'session_expiry_interval'});
}
if (exists $prop{'receive_maximum'}) {
# 3.1.2.11.3 Receive Maximum (short int)
$raw_prop .= pack("C n", MQTT_RECEIVE_MAXIMUM, delete $prop{'receive_maximum'});
}
if (exists $prop{'maximum_packet_size'}) {
# 3.1.2.11.4 Maximum Packet Size (long int)
$raw_prop .= pack("C N", MQTT_MAXIMUM_PACKET_SIZE, delete $prop{'maximum_packet_size'});
}
if (exists $prop{'topic_alias_maximum'}) {
# 3.1.2.11.5 Topic Alias Maximum (short int)
$raw_prop .= pack("C n", MQTT_TOPIC_ALIAS_MAXIMUM, delete $prop{'topic_alias_maximum'});
}
if (exists $prop{'request_response_information'}) {
# 3.1.2.11.6 Request Response Information (byte)
$raw_prop .= pack("C C", MQTT_REQUEST_RESPONSE_INFORMATION, delete $prop{'request_response_information'});
}
if (exists $prop{'request_problem_information'}) {
# 3.1.2.11.7 Request Problem Information (byte)
$raw_prop .= pack("C C", MQTT_REQUEST_PROBLEM_INFORMATION, delete $prop{'request_problem_information'});
}
if (exists $prop{'authentication_method'}) {
# 3.1.2.11.9 Authentication Method (utf8 string)
utf8::encode( $prop{'authentication_method'} );
$raw_prop .= pack("C n/a*", MQTT_AUTHENTICATION_METHOD, delete $prop{'authentication_method'});
}
if (exists $prop{'authentication_data'}) {
# 3.1.2.11.10 Authentication Data (binary data)
$raw_prop .= pack("C n", MQTT_AUTHENTICATION_DATA, delete $prop{'authentication_data'});
}
foreach my $key (keys %prop) {
# 3.1.2.11.8 User Property (utf8 string pair)
my $val = $prop{$key};
next unless defined $val;
utf8::encode( $key );
utf8::encode( $val );
$raw_prop .= pack("C n/a* n/a*", MQTT_USER_PROPERTY, $key, $val);
}
# 3.1.2 Variable Header
# 3.1.2.1 Protocol Name (utf8 string)
my $raw_mqtt = pack("n/a*", "MQTT");
# 3.1.2.2 Protocol Version (byte)
$raw_mqtt .= pack("C", 5);
# 3.1.2.3 Connect Flags (byte)
my $flags = 0;
$flags |= 0x02 if $clean_start; # 3.1.2.4 Clean Start
$flags |= 0x80 if defined $username; # 3.1.2.8 User Name Flag
$flags |= 0x40 if defined $password; # 3.1.2.9 Password Flag
if ($will) {
$flags |= 0x04; # 3.1.2.5 Will Flag
$flags |= $will->{'qos'} << 3; # 3.1.2.6 Will QoS
$flags |= 0x20 if $will->{'retain'}; # 3.1.2.7 Will Retain
}
$raw_mqtt .= pack("C", $flags);
# 3.1.2.10 Keep Alive (short int)
$raw_mqtt .= pack("n", $keep_alive || 0);
# 3.1.2.11 Properties
$raw_mqtt .= _encode_var_int(length $raw_prop);
$raw_mqtt .= $raw_prop;
# 3.1.3 Payload
# 3.1.3.1 Client Identifier (utf8 string)
$raw_mqtt .= pack("n/a*", $client_id);
if ($will) {
#TODO: 3.1.3.2 Will Properties
my $will_prop = '';
$raw_mqtt .= _encode_var_int(length $will_prop);
$raw_mqtt .= $will_prop;
# 3.1.3.3 Will Topic (utf8 string)
utf8::encode( $will->{'topic'});
$raw_mqtt .= pack("n/a*", $will->{'topic'});
# 3.1.3.4 Will Payload (binary data)
$raw_mqtt .= pack("n/a*", $will->{'payload'});
}
if (defined $username) {
# 3.1.3.5 Username (utf8 string)
utf8::encode( $username );
$raw_mqtt .= pack("n/a*", $username);
}
if (defined $password) {
# 3.1.3.6 Password (binary data)
$raw_mqtt .= pack("n/a*", $password);
}
$self->{handle}->push_write(
pack("C", MQTT_CONNECT << 4) . # 3.1.1 Packet type
_encode_var_int(length $raw_mqtt) . # 3.1.1 Packet length
$raw_mqtt
);
}
sub _receive_connack {
my ($self, $packet) = @_;
my $prop = $self->{server_prop};
my $offs = 0;
# 3.2.2.1 Acknowledge flags (byte)
my $ack_flags = _decode_byte($packet, \$offs);
$prop->{'session_present'} = $ack_flags & 0x01;
# 3.2.2.2 Reason code (byte)
my $reason_code = _decode_byte($packet, \$offs);
$prop->{'reason_code'} = $reason_code;
$prop->{'reason'} = $Reason_code{$reason_code};
# 3.2.2.3.1 Properties Length (variable length int)
my $prop_len = _decode_var_int($packet, \$offs);
my $prop_end = $offs + $prop_len;
while ($offs < $prop_end) {
my $prop_id = _decode_byte($packet, \$offs);
if ($prop_id == MQTT_SESSION_EXPIRY_INTERVAL) {
# 3.2.2.3.2 Session Expiry Interval (long int)
$prop->{'session_expiry_interval'} = _decode_int_32($packet, \$offs);
}
elsif ($prop_id == MQTT_RECEIVE_MAXIMUM) {
# 3.2.2.3.3 Receive Maximum (short int)
$prop->{'receive_maximum'} = _decode_int_16($packet, \$offs);
}
elsif ($prop_id == MQTT_MAXIMUM_QOS) {
# 3.2.2.3.4 Maximum QoS (byte)
$prop->{'maximum_qos'} = _decode_byte($packet, \$offs);
}
elsif ($prop_id == MQTT_RETAIN_AVAILABLE) {
# 3.2.2.3.5 Retain Available (byte)
$prop->{'retain_available'} = _decode_byte($packet, \$offs);
}
elsif ($prop_id == MQTT_MAXIMUM_PACKET_SIZE) {
# 3.2.2.3.6 Maximum Packet Size (long int)
$prop->{'maximum_packet_size'} = _decode_int_32($packet, \$offs);
}
elsif ($prop_id == MQTT_ASSIGNED_CLIENT_IDENTIFIER) {
# 3.2.2.3.7 Assigned Client Identifier (utf8 string)
$prop->{'assigned_client_identifier'} = _decode_utf8_str($packet, \$offs);
}
elsif ($prop_id == MQTT_TOPIC_ALIAS_MAXIMUM) {
# 3.2.2.3.8 Topic Alias Maximum (short int)
$prop->{'topic_alias_maximum'} = _decode_int_16($packet, \$offs);
}
elsif ($prop_id == MQTT_REASON_STRING) {
# 3.2.2.3.9 Reason String (utf8 string)
$prop->{'reason_string'} = _decode_utf8_str($packet, \$offs);
}
elsif ($prop_id == MQTT_USER_PROPERTY) {
# 3.2.2.3.10 User Property (utf8 string pair)
my $key = _decode_utf8_str($packet, \$offs);
my $val = _decode_utf8_str($packet, \$offs);
$prop->{$key} = $val;
}
elsif ($prop_id == MQTT_WILDCARD_SUBSCRIPTION_AVAILABLE) {
# 3.2.2.3.11 Wildcard Subscription Available (byte)
$prop->{'wildcard_subscription_available'} = _decode_byte($packet, \$offs);
}
elsif ($prop_id == MQTT_SUBSCRIPTION_IDENTIFIER_AVAILABLE) {
# 3.2.2.3.12 Subscription Identifiers Available (byte)
$prop->{'subscription_identifier_available'} = _decode_byte($packet, \$offs);
}
elsif ($prop_id == MQTT_SHARED_SUBSCRIPTION_AVAILABLE) {
# 3.2.2.3.13 Shared Subscription Available (byte)
$prop->{'shared_subscription_available'} = _decode_byte($packet, \$offs);
}
elsif ($prop_id == MQTT_SERVER_KEEP_ALIVE) {
# 3.2.2.3.14 Server Keep Alive (short int)
$prop->{'server_keep_alive'} = _decode_int_16($packet, \$offs);
}
elsif ($prop_id == MQTT_RESPONSE_INFORMATION) {
# 3.2.2.3.15 Response Information (utf8 string)
$prop->{'response_information'} = _decode_utf8_str($packet, \$offs);
}
elsif ($prop_id == MQTT_SERVER_REFERENCE) {
# 3.2.2.3.16 Server Reference (utf8 string)
$prop->{'server_reference'} = _decode_utf8_str($packet, \$offs);
}
elsif ($prop_id == MQTT_AUTHENTICATION_METHOD) {
# 3.2.2.3.17 Authentication Method (utf8 string)
$prop->{'authentication_method'} = _decode_utf8_str($packet, \$offs);
}
elsif ($prop_id == MQTT_AUTHENTICATION_DATA) {
# 3.2.2.3.18 Authentication Data (binary data)
$prop->{'authentication_data'} = _decode_binary_data($packet, \$offs);
}
else {
# Protocol error
$self->_fatal("Received CONNACK with unknown property $prop_id");
}
}
my $success = ($reason_code == 0x00);
unless ( $success ) {
# Server will close the connection
# warn "Served refused CONNACK: $reason";
#TODO: handle
}
$self->{is_connected} = 1;
$self->{timeout_tmr} = undef;
$self->{reconnect_tmr} = undef;
$self->{connect_err} = undef;
#TODO: ... blocking connection
$self->{connect_cv}->send if $self->{connect_cv};
# Execute CONNACK callback
my $connack_cb = $self->{connack_cb};
$connack_cb->($success, $prop) if $connack_cb;
}
sub disconnect {
my ($self, %args) = @_;
unless (defined $self->{handle}) {
carp "Already disconnected from MQTT broker";
return;
}
my $reason_code = delete $args{'reason_code'};
# 3.14.2.2 Properties
my $raw_prop = '';
if (exists $args{'session_expiry_interval'}) {
( run in 1.018 second using v1.01-cache-2.11-cpan-df04353d9ac )