Beekeeper
view release on metacpan or search on metacpan
lib/Beekeeper/MQTT.pm view on Meta::CPAN
utf8::decode($str);
return $str;
}
sub _decode_binary_data {
my ($packet, $offs) = @_;
my $data = unpack("n/a", substr($$packet, $$offs));
$$offs += 2 + length($data);
return $data;
}
sub _decode_var_int {
my ($packet, $offs) = @_;
my $int = 0;
my $mult = 1;
my $byte;
do {
$byte = unpack("C", substr($$packet, $$offs, 1));
$int += ($byte & 0x7F) * $mult;
$mult *= 128;
$$offs++;
} while ($byte & 0x80);
return $int;
}
sub _encode_var_int {
return pack("C", $_[0]) if ($_[0] < 128);
my @a = unpack("C*", pack("w", $_[0]));
$a[0] &= 0x7F;
$a[-1] |= 0x80;
return pack("C*", reverse @a);
}
sub new {
my ($class, %args) = @_;
my $self = {
bus_id => undef,
bus_role => undef,
handle => undef, # the socket
hosts => undef, # list of all hosts in cluster
is_connected => undef, # true once connected
try_hosts => undef, # list of hosts to try to connect
connect_err => undef, # count of connection errors
timeout_tmr => undef, # timer used for connection timeout
reconnect_tmr => undef, # timer used for connection retry
connack_cb => undef, # connack callback
error_cb => undef, # error callback
client_id => undef, # client id
server_prop => {}, # server properties
server_alias => {}, # server topic aliases
client_alias => {}, # client topic aliases
subscriptions => {}, # topic subscriptions
subscr_cb => {}, # subscription callbacks
packet_cb => {}, # packet callbacks
buffers => {}, # raw mqtt buffers
packet_seq => 1, # sequence used for packet ids
subscr_seq => 1, # sequence used for subscription ids
alias_seq => 1, # sequence used for topic alias ids
use_alias => 0, # topic alias enabled
config => \%args,
};
$self->{bus_id} = delete $args{'bus_id'};
$self->{bus_role} = delete $args{'bus_role'} || $self->{bus_id};
$self->{error_cb} = delete $args{'on_error'};
bless $self, $class;
return $self;
}
sub bus_id { $_[0]->{bus_id} }
sub bus_role { $_[0]->{bus_role} }
sub _fatal {
my ($self, $errstr) = @_;
die "(" . __PACKAGE__ . ") $errstr\n" unless $self->{error_cb};
$self->{error_cb}->($errstr);
}
our $BUSY_SINCE = undef;
our $BUSY_TIME = 0;
sub connect {
my ($self, %args) = @_;
$self->{connack_cb} = $args{'on_connack'};
$self->{connect_cv} = AnyEvent->condvar;
$self->_connect;
$self->{connect_cv}->recv if $args{'blocking'};
$self->{connect_cv} = undef;
return $args{'blocking'} ? $self->{is_connected} : 1;
}
sub _connect {
my ($self) = @_;
weaken($self);
my $config = $self->{config};
my $timeout = $config->{'timeout'};
$timeout = 30 unless defined $timeout;
# Ensure that timeout is set properly when the event loop was blocked
AnyEvent->now_update;
# Connection timeout handler
if ($timeout && !$self->{timeout_tmr}) {
$self->{timeout_tmr} = AnyEvent->timer( after => $timeout, cb => sub {
$self->_reset_connection;
$self->{connect_cv}->send;
$self->_fatal("Could not connect to MQTT broker after $timeout seconds");
lib/Beekeeper/MQTT.pm view on Meta::CPAN
$prop{'message_expiry_interval'} = unpack("N", substr($$packet, $offs, 4));
$offs += 4;
}
elsif ($prop_id == MQTT_TOPIC_ALIAS) {
# 3.3.2.3.4 Topic Alias (short int)
my $alias = unpack("n", substr($$packet, $offs, 2));
$offs += 2;
if (length $topic) {
$self->{server_alias}->{$alias} = $topic;
}
else {
$prop{'topic'} = $self->{server_alias}->{$alias};
}
}
elsif ($prop_id == MQTT_RESPONSE_TOPIC) {
# 3.3.2.3.5 Response Topic (utf8 string)
my $resp_topic = unpack("n/a", substr($$packet, $offs));
$offs += 2 + length $resp_topic;
utf8::decode( $resp_topic );
$prop{'response_topic'} = $resp_topic;
}
elsif ($prop_id == MQTT_CORRELATION_DATA) {
# 3.3.2.3.6 Correlation Data (binary data)
$prop{'correlation_data'} = unpack("n/a", substr($$packet, $offs));
$offs += 2 + length $prop{'correlation_data'};
}
elsif ($prop_id == MQTT_USER_PROPERTY) {
# 3.3.2.3.7 User Property (utf8 string pair)
my ($key, $val) = unpack("n/a n/a", substr($$packet, $offs));
$offs += 4 + length($key) + length($val);
utf8::decode( $key );
utf8::decode( $val );
$prop{$key} = $val;
}
elsif ($prop_id == MQTT_SUBSCRIPTION_IDENTIFIER) {
# 3.3.2.3.8 Subscription Identifier (variable int)
push @subscr_ids, _decode_var_int($packet, \$offs);
}
elsif ($prop_id == MQTT_CONTENT_TYPE) {
# 3.3.2.3.9 Content Type (utf8 string)
my $content_type = unpack("n/a", substr($$packet, $offs));
$offs += 2 + length $content_type;
utf8::decode( $content_type );
$prop{'content_type'} = $content_type;
}
else {
# Protocol error
$self->_fatal("Received PUBLISH with unknown property $prop_id");
}
}
# Trim variable header from packet, the remaining is the payload
substr($$packet, 0, $prop_end, '');
if ($prop{'payload_format'}) {
# Payload is UTF-8 Encoded Character Data
utf8::decode( $$packet );
}
foreach (@subscr_ids) {
# Execute subscriptions callbacks
$self->{subscr_cb}->{$_}->($packet, \%prop);
}
}
sub puback {
my ($self, %args) = @_;
croak "Missing packet_id" unless $args{'packet_id'};
my $raw_mqtt = pack(
"C C n C",
MQTT_PUBACK << 4, # 3.4.1 Packet type
3, # 3.4.1 Remaining length
$args{'packet_id'}, # 3.4.2 Packet identifier
$args{'reason_code'} || 0, # 3.4.2.1 Reason code
);
if ($args{'buffer_id'}) {
# Do not send right now, wait until flush_buffer
$self->{buffers}->{$args{'buffer_id'}}->{raw_mqtt} .= $raw_mqtt;
return 1;
}
$self->{handle}->push_write( $raw_mqtt );
1;
}
sub _receive_puback {
my ($self, $packet) = @_;
my ($packet_id, $reason_code) = unpack("n C", $$packet);
$reason_code = 0 unless defined $reason_code;
#TODO: 3.5.2.2 Properties
my $puback_cb = delete $self->{packet_cb}->{$packet_id};
return unless defined $puback_cb;
$puback_cb->($reason_code);
}
sub pubrec {
my ($self, %args) = @_;
croak "Missing packet_id" unless $args{'packet_id'};
my $raw_mqtt = pack(
"C C n C",
MQTT_PUBREC << 4, # 3.5.1 Packet type
3, # 3.5.1 Remaining length
$args{'packet_id'}, # 3.5.2 Packet identifier
$args{'reason_code'} || 0, # 3.5.2.1 Reason code
);
#TODO: set PUBREL callback
$self->{handle}->push_write( $raw_mqtt );
lib/Beekeeper/MQTT.pm view on Meta::CPAN
$prop{'authentication_data'} = _decode_binary_data($packet, \$offs);
}
elsif ($prop_id == MQTT_REASON_STRING) {
# 3.15.2.2.4 Reason String (utf8 string)
$prop{'reason_string'} = _decode_utf8_str($packet, \$offs);
}
elsif ($prop_id == MQTT_USER_PROPERTY) {
# 3.15.2.2.5 User Property (utf8 string pair)
my $key = _decode_utf8_str($packet, \$offs);
my $val = _decode_utf8_str($packet, \$offs);
$prop{$key} = $val;
}
else {
# Protocol error
$self->_fatal("Received AUTH with unexpected property $prop_id");
}
}
my $auth_cb = delete $self->{packet_cb}->{'auth'};
$auth_cb->(\%prop) if $auth_cb;
}
sub flush_buffer {
my ($self, %args) = @_;
my $buffer = delete $self->{buffers}->{$args{'buffer_id'}};
# Nothing to do if nothing was buffered
return unless $buffer;
$self->{handle}->push_write( $buffer->{raw_mqtt} );
if (defined $self->{handle}->{wbuf} && length $self->{handle}->{wbuf} > 0) {
# Kernel write buffer is full, see publish() above
# Make AnyEvent allow one level of recursive condvar blocking
$AE_WAITING && Carp::confess "Recursive condvar blocking wait attempted";
local $AE_WAITING = 1;
local $AnyEvent::CondVar::Base::WAITING = 0;
my $flushed = AnyEvent->condvar;
$self->{handle}->on_drain( $flushed );
$flushed->recv;
$self->{handle}->on_drain(); # clear
}
1;
}
sub discard_buffer {
my ($self, %args) = @_;
my $buffer = delete $self->{buffers}->{$args{'buffer_id'}};
# Nothing to do if nothing was buffered
return unless $buffer;
# Remove all pending puback callbacks, as those will never be executed
foreach my $packet_id (keys %{$buffer->{packet_ids}}) {
delete $self->{packet_cb}->{$packet_id};
}
1;
}
sub DESTROY {
my $self = shift;
# Disconnect gracefully from server if already connected
return unless defined $self->{handle};
$self->disconnect;
}
1;
__END__
=pod
=encoding utf8
=head1 NAME
Beekeeper::MQTT - Asynchronous MQTT 5.0 client
=head1 VERSION
Version 0.09
=head1 SYNOPSIS
my $mqtt = Beekeeper::MQTT->new(
host => 'localhost',
username => 'guest',
password => 'guest',
);
$mqtt->connect(
blocking => 1,
on_connack => sub {
my ($success, $properties) = @_;
die $properties->{reason_string} unless $success;
},
);
$mqtt->subscribe(
topic => 'foo/bar',
on_publish => sub {
my ($payload, $properties) = @_;
print "Got a message: $$payload";
},
);
$mqtt->publish(
topic => 'foo/bar',
payload => 'Hello',
);
( run in 1.665 second using v1.01-cache-2.11-cpan-5837b0d9d2c )