AnyEvent-MQTT
view release on metacpan or search on metacpan
lib/AnyEvent/MQTT.pm view on Meta::CPAN
use strict;
use warnings;
package AnyEvent::MQTT;
$AnyEvent::MQTT::VERSION = '1.212810';
# ABSTRACT: AnyEvent module for an MQTT client
use constant DEBUG => $ENV{ANYEVENT_MQTT_DEBUG};
use AnyEvent;
use AnyEvent::Handle;
use Net::MQTT::Constants;
use Net::MQTT::Message;
use Net::MQTT::TopicStore;
use Carp qw/croak carp/;
use Sub::Name;
use Scalar::Util qw/weaken/;
sub new {
my ($pkg, %p) = @_;
my $self =
bless {
socket => undef,
host => '127.0.0.1',
port => '1883',
timeout => 30,
wait => 'nothing',
keep_alive_timer => 120,
qos => MQTT_QOS_AT_MOST_ONCE,
message_id => 1,
user_name => undef,
password => undef,
tls => undef,
will_topic => undef,
will_qos => MQTT_QOS_AT_MOST_ONCE,
will_retain => 0,
will_message => '',
client_id => undef,
clean_session => 1,
handle_args => [],
write_queue => [],
inflight => {},
_sub_topics => Net::MQTT::TopicStore->new(),
%p,
}, $pkg;
}
sub DESTROY {
$_[0]->cleanup;
}
sub cleanup {
my $self = shift;
print STDERR "cleanup\n" if DEBUG;
if ($self->{handle}) {
my $cv = AnyEvent->condvar;
my $handle = $self->{handle};
weaken $handle;
$cv->cb(sub { $handle->destroy });
$self->_send(message_type => MQTT_DISCONNECT, cv => $cv);
}
delete $self->{handle};
delete $self->{connected};
delete $self->{wait};
delete $self->{_keep_alive_handle};
delete $self->{_keep_alive_waiting};
$self->{write_queue} = [];
}
sub _error {
my ($self, $fatal, $message, $reconnect) = @_;
$self->cleanup($message);
$self->{on_error}->($fatal, $message) if ($self->{on_error});
$self->_reconnect() if ($reconnect);
}
sub publish {
my ($self, %p) = @_;
my $topic = exists $p{topic} ? $p{topic} :
croak ref $self, '->publish requires "topic" parameter';
my $qos = exists $p{qos} ? $p{qos} : MQTT_QOS_AT_MOST_ONCE;
my $cv = exists $p{cv} ? delete $p{cv} : AnyEvent->condvar;
my $expect;
if ($qos) {
$expect = ($qos == MQTT_QOS_AT_LEAST_ONCE ? MQTT_PUBACK : MQTT_PUBREC);
}
my $message = $p{message};
if (defined $message) {
print STDERR "publish: message[$message] => $topic\n" if DEBUG;
$self->_send_with_ack({
message_type => MQTT_PUBLISH,
%p,
}, $cv, $expect);
return $cv;
}
my $handle = exists $p{handle} ? $p{handle} :
croak ref $self, '->publish requires "message" or "handle" parameter';
unless ($handle->isa('AnyEvent::Handle')) {
my @args = @{$p{handle_args}||[]};
print STDERR "publish: IO[$handle] => $topic @args\n" if DEBUG;
$handle = AnyEvent::Handle->new(fh => $handle, @args);
}
my $error_sub = $handle->{on_error}; # Hack: There is no accessor api
$handle->on_error(subname 'on_error_for_read_publish_'.$topic =>
sub {
my ($hdl, $fatal, $msg) = @_;
$error_sub->(@_) if ($error_sub);
$hdl->destroy;
undef $hdl;
$cv->send(1);
});
my $weak_self = $self;
weaken $weak_self;
my @push_read_args = @{$p{push_read_args}||['line']};
my $sub; $sub = subname 'push_read_cb_for_'.$topic => sub {
my ($hdl, $chunk, @args) = @_;
print STDERR "publish: $chunk => $topic\n" if DEBUG;
my $send_cv = AnyEvent->condvar;
print STDERR "publish: message[$chunk] => $topic\n" if DEBUG;
$weak_self->_send_with_ack({
message_type => MQTT_PUBLISH,
qos => $qos,
retain => $p{retain},
topic => $topic,
message => $chunk,
}, $send_cv, $expect);
$send_cv->cb(subname 'publish_ack_'.$topic =>
sub { $handle->push_read(@push_read_args => $sub ) });
return;
};
$handle->push_read(@push_read_args => $sub);
return $cv;
}
sub next_message_id {
my $self = shift;
my $res = $self->{message_id};
$self->{message_id}++;
$self->{message_id} = 1 if $self->{message_id} >= 65536;
$res;
}
sub _send_with_ack {
my ($self, $args, $cv, $expect, $dup) = @_;
if ($args->{qos}) {
unless (exists $args->{message_id}) {
$args->{message_id} = $self->next_message_id();
}
my $mid = $args->{message_id};
my $send_cv = AnyEvent->condvar;
$send_cv->cb(subname 'ack_cb_for_'.$mid => sub {
$self->{inflight}->{$mid} =
{
expect => $expect,
message => $args,
cv => $cv,
timeout =>
AnyEvent->timer(after => $self->{keep_alive_timer},
cb => subname 'ack_timeout_for_'.$mid =>
sub {
print ref $self, " timeout waiting for ",
message_type_string($expect), "\n" if DEBUG;
delete $self->{inflight}->{$mid};
$self->_send_with_ack($args, $cv, $expect, 1);
}),
};
});
$args->{cv} = $send_cv;
} else {
$args->{cv} = $cv;
}
$args->{dup} = 1 if ($dup);
lib/AnyEvent/MQTT.pm view on Meta::CPAN
}
delete $rec->{cv};
}
sub _confirm_unsubscribe {
my ($self, $mid) = @_;
my $topic = delete $self->{_unsub_pending_by_message_id}->{$mid};
unless (defined $topic) {
carp 'UnSubAck with no pending unsubscribe for message id: ', $mid, "\n";
return;
}
my $rec = delete $self->{_unsub_pending}->{$topic};
foreach my $cv (@{$rec->{cv}}) {
$cv->send(1);
}
}
sub _send {
my $self = shift;
my %p = @_;
my $cv = delete $p{cv};
my $msg = Net::MQTT::Message->new(%p);
$self->{connected} ?
$self->_queue_write($msg, $cv) : $self->connect($msg, $cv);
}
sub _queue_write {
my ($self, $msg, $cv) = @_;
my $queue = $self->{write_queue};
print STDERR 'Queuing: ', ($cv||'no cv'), ' ', $msg->string, "\n" if DEBUG;
push @{$queue}, [$msg, $cv];
$self->_write_now unless (defined $self->{_waiting});
$cv;
}
sub _write_now {
my $self = shift;
my ($msg, $cv);
undef $self->{_waiting};
if (@_) {
($msg, $cv) = @_;
} else {
my $args = shift @{$self->{write_queue}} or return;
($msg, $cv) = @$args;
}
$self->_reset_keep_alive_timer();
print STDERR "Sending: ", $msg->string, "\n" if DEBUG;
$self->{message_log_callback}->('>', $msg) if ($self->{message_log_callback});
$self->{_waiting} = [$msg, $cv];
print ' ', (unpack 'H*', $msg->bytes), "\n" if DEBUG;
$self->{handle}->push_write($msg->bytes);
$cv;
}
sub _reset_keep_alive_timer {
my ($self, $wait) = @_;
undef $self->{_keep_alive_handle};
my $method = $wait ? '_keep_alive_timeout' : '_send_keep_alive';
$self->{_keep_alive_waiting} = $wait;
my $weak_self = $self;
weaken $weak_self;
$self->{_keep_alive_handle} =
AnyEvent->timer(after => $self->{keep_alive_timer},
cb => subname((substr $method, 1).'_cb' =>
sub { $weak_self->$method(@_) }));
}
sub _send_keep_alive {
my $self = shift;
print STDERR "Sending: keep alive\n" if DEBUG;
$self->_send(message_type => MQTT_PINGREQ);
$self->_reset_keep_alive_timer(1);
}
sub _keep_alive_timeout {
my $self = shift;
print STDERR "keep alive timeout\n" if DEBUG;
undef $self->{_keep_alive_waiting};
$self->{handle}->destroy;
$self->_error(0, 'keep alive timeout', 1);
}
sub _keep_alive_received {
my $self = shift;
print STDERR "keep alive received\n" if DEBUG;
return unless (defined $self->{_keep_alive_waiting});
$self->_reset_keep_alive_timer();
}
sub connect {
my ($self, $msg, $cv) = @_;
print STDERR "connect\n" if DEBUG;
$self->{_waiting} = 'connect';
if ($msg) {
$cv = AnyEvent->condvar unless ($cv);
$self->_queue_write($msg, $cv);
} else {
$self->{connect_cv} = AnyEvent->condvar unless (exists $self->{connect_cv});
$cv = $self->{connect_cv};
}
return $cv if ($self->{handle});
my $weak_self = $self;
weaken $weak_self;
my $hd;
$hd = $self->{handle} =
AnyEvent::Handle->new(connect => [$self->{host}, $self->{port}],
($self->{tls} ? (tls => "connect") : ()),
on_error => subname('on_error_cb' => sub {
my ($handle, $fatal, $message) = @_;
print STDERR "handle error $_[1]\n" if DEBUG;
$handle->destroy;
$weak_self->_error($fatal, 'Error: '.$message, 0);
}),
on_eof => subname('on_eof_cb' => sub {
my ($handle) = @_;
print STDERR "handle eof\n" if DEBUG;
$handle->destroy;
$weak_self->_error(1, 'EOF', 1);
}),
on_timeout => subname('on_timeout_cb' => sub {
$weak_self->_error(0, $weak_self->{wait}.' timeout', 1);
$weak_self->{wait} = 'nothing';
}),
on_connect => subname('on_connect_cb' => sub {
my ($handle, $host, $port, $retry) = @_;
print STDERR "TCP handshake complete\n" if DEBUG;
# call user-defined on_connect function.
$weak_self->{on_connect}->($handle, $retry) if $weak_self->{on_connect};
my $msg =
Net::MQTT::Message->new(
message_type => MQTT_CONNECT,
keep_alive_timer => $weak_self->{keep_alive_timer},
client_id => $weak_self->{client_id},
clean_session => $weak_self->{clean_session},
will_topic => $weak_self->{will_topic},
will_qos => $weak_self->{will_qos},
will_retain => $weak_self->{will_retain},
will_message => $weak_self->{will_message},
user_name => $weak_self->{user_name},
password => $weak_self->{password},
);
$weak_self->_write_now($msg);
$handle->timeout($weak_self->{timeout});
$weak_self->{wait} = 'connack';
$handle->on_read(subname 'on_read_cb' => sub {
my ($hdl) = @_;
$hdl->push_read(ref $weak_self =>
subname 'reader_cb' => sub {
$weak_self->_handle_message(@_);
1;
});
});
}),
@{$self->{handle_args}},
);
return $cv
}
sub _reconnect {
my $self = shift;
print STDERR "reconnecting:\n" if DEBUG;
# must resubscribe everything
if ($self->{clean_session}) {
$self->{_sub_topics} = Net::MQTT::TopicStore->new();
$self->{_sub_reconnect} = delete $self->{_sub} || {};
}
$self->connect(@_);
}
sub _handle_message {
my $self = shift;
my ($handle, $msg, $error) = @_;
return $self->_error(0, $error, 1) if ($error);
$self->{message_log_callback}->('<', $msg) if ($self->{message_log_callback});
$self->_call_callback('before_msg_callback' => $msg) or return;
my $msg_type = lc ref $msg;
$msg_type =~ s/^.*:://;
$self->_call_callback('before_'.$msg_type.'_callback' => $msg) or return;
my $method = '_process_'.$msg_type;
unless ($self->can($method)) {
carp 'Unsupported message ', $msg->string(), "\n";
return;
}
my $res = $self->$method(@_);
$self->_call_callback('after_'.$msg_type.'_callback' => $msg, $res);
$res;
}
sub _call_callback {
my $self = shift;
my $cb_name = shift;
return 1 unless (exists $self->{$cb_name});
$self->{$cb_name}->(@_);
}
sub _process_connack {
my ($self, $handle, $msg, $error) = @_;
$handle->timeout(undef);
unless ($msg->return_code == MQTT_CONNECT_ACCEPTED) {
return $self->_error(1, 'Connection refused: '.$msg->string, 0);
}
print STDERR "Connection ready:\n", $msg->string(' '), "\n" if DEBUG;
$self->_write_now();
$self->{connected} = 1;
$self->{connect_cv}->send(1) if ($self->{connect_cv});
delete $self->{connect_cv};
my $weak_self = $self;
weaken $weak_self;
$handle->on_drain(subname 'on_drain_cb' => sub {
print STDERR "drained\n" if DEBUG;
my $w = $weak_self->{_waiting};
$w->[1]->send(1) if (ref $w && defined $w->[1]);
$weak_self->_write_now;
1;
});
# handle reconnect
while (my ($topic, $rec) = each %{$self->{_sub_reconnect}}) {
print STDERR "Resubscribing to '$topic':\n" if DEBUG;
for my $cb (values %{$rec->{cb}}) {
$self->subscribe(topic => $topic, callback => $cb, qos => $rec->{qos});
}
}
delete $self->{_sub_reconnect};
return
}
sub _process_pingresp {
shift->_keep_alive_received();
}
sub _process_suback {
my ($self, $handle, $msg, $error) = @_;
print STDERR "Confirmed subscription:\n", $msg->string(' '), "\n" if DEBUG;
$self->_confirm_subscription($msg->message_id, $msg->qos_levels->[0]);
return
}
sub _process_unsuback {
my ($self, $handle, $msg, $error) = @_;
print STDERR "Confirmed unsubscribe:\n", $msg->string(' '), "\n" if DEBUG;
$self->_confirm_unsubscribe($msg->message_id);
return
}
sub _publish_locally {
my ($self, $msg) = @_;
my $msg_topic = $msg->topic;
my $msg_data = $msg->message;
my $matches = $self->{_sub_topics}->values($msg_topic);
unless (scalar @$matches) {
carp "Unexpected publish:\n", $msg->string(' '), "\n";
return;
}
my %matched;
my $msg_retain = $msg->retain;
foreach my $topic (@$matches) {
my $rec = $self->{_sub}->{$topic};
if ($msg_retain) {
if ($msg_data eq '') {
delete $rec->{retained}->{$msg_topic};
print STDERR " retained cleared\n" if DEBUG;
} else {
$rec->{retained}->{$msg_topic} = $msg;
print STDERR " retained '", $msg_data, "'\n" if DEBUG;
}
}
lib/AnyEvent/MQTT.pm view on Meta::CPAN
}
my $exp_type = $self->{inflight}->{$mid}->{expect};
my $got_type = $msg->message_type;
unless ($got_type == $exp_type) {
carp 'Received ', message_type_string($got_type), ' but expected ',
message_type_string($exp_type), " for message id $mid\n";
return;
}
return delete $self->{inflight}->{$mid};
}
sub _process_puback {
my ($self, $handle, $msg, $error) = @_;
my $rec = $self->_inflight_record($msg) or return;
my $mid = $msg->message_id;
print STDERR 'PubAck: ', $mid, ' ', $rec->{cv}, "\n" if DEBUG;
$rec->{cv}->send(1);
return 1;
}
sub _process_pubrec {
my ($self, $handle, $msg, $error) = @_;
my $rec = $self->_inflight_record($msg) or return;
my $mid = $msg->message_id;
print STDERR 'PubRec: ', $mid, ' ', $rec->{cv}, "\n" if DEBUG;
$self->_send_with_ack({
message_type => MQTT_PUBREL,
qos => MQTT_QOS_AT_LEAST_ONCE,
message_id => $mid,
}, $rec->{cv}, MQTT_PUBCOMP);
}
sub _process_pubrel {
my ($self, $handle, $msg, $error) = @_;
my $mid = $msg->message_id;
print STDERR 'PubRel: ', $mid, "\n" if DEBUG;
my $pubmsg = delete $self->{messages}->{$mid};
unless ($pubmsg) {
carp "Unexpected message for message id $mid\n ".$msg->string;
return;
}
$self->_publish_locally($pubmsg);
$self->_send(message_type => MQTT_PUBCOMP, message_id => $mid);
}
sub _process_pubcomp {
my ($self, $handle, $msg, $error) = @_;
my $rec = $self->_inflight_record($msg) or return;
my $mid = $msg->message_id;
print STDERR 'PubComp: ', $mid, ' ', $rec->{cv}, "\n" if DEBUG;
$rec->{cv}->send(1);
return 1;
}
sub anyevent_read_type {
my ($handle, $cb) = @_;
subname 'anyevent_read_type_reader' => sub {
my ($handle) = @_;
my $rbuf = \$handle->{rbuf};
weaken $rbuf;
return unless (defined $$rbuf);
while (1) {
my $msg = Net::MQTT::Message->new_from_bytes($$rbuf, 1);
last unless ($msg);
$cb->($handle, $msg);
}
return;
};
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
AnyEvent::MQTT - AnyEvent module for an MQTT client
=head1 VERSION
version 1.212810
=head1 SYNOPSIS
use AnyEvent::MQTT;
my $mqtt = AnyEvent::MQTT->new;
my $cv = $mqtt->subscribe(topic => '/topic',
callback => sub {
my ($topic, $message) = @_;
print $topic, ' ', $message, "\n"
});
my $qos = $cv->recv; # subscribed, negotiated QoS == $qos
# publish a simple message
$cv = $mqtt->publish(message => 'simple message',
topic => '/topic');
$cv->recv; # sent
# publish line-by-line from file handle
$cv = $mqtt->publish(handle => \*STDIN,
topic => '/topic');
$cv->recv; # sent
# publish from AnyEvent::Handle
$cv = $mqtt->publish(handle => AnyEvent::Handle->new(my %handle_args),
topic => '/topic');
$cv->recv; # sent
=head1 DESCRIPTION
AnyEvent module for MQTT client.
B<IMPORTANT:> This is an early release and the API is still subject to
change.
=head1 METHODS
( run in 1.754 second using v1.01-cache-2.11-cpan-ceb78f64989 )