AnyEvent-MQTT

 view release on metacpan or  search on metacpan

lib/AnyEvent/MQTT.pm  view on Meta::CPAN

use strict;
use warnings;
package AnyEvent::MQTT;
$AnyEvent::MQTT::VERSION = '1.212810';
# ABSTRACT: AnyEvent module for an MQTT client


use constant DEBUG => $ENV{ANYEVENT_MQTT_DEBUG};
use AnyEvent;
use AnyEvent::Handle;
use Net::MQTT::Constants;
use Net::MQTT::Message;
use Net::MQTT::TopicStore;
use Carp qw/croak carp/;
use Sub::Name;
use Scalar::Util qw/weaken/;


sub new {
  my ($pkg, %p) = @_;
  my $self =
    bless {
           socket => undef,
           host => '127.0.0.1',
           port => '1883',
           timeout => 30,
           wait => 'nothing',
           keep_alive_timer => 120,
           qos => MQTT_QOS_AT_MOST_ONCE,
           message_id => 1,
           user_name => undef,
           password => undef,
           tls => undef,
           will_topic => undef,
           will_qos => MQTT_QOS_AT_MOST_ONCE,
           will_retain => 0,
           will_message => '',
           client_id => undef,
           clean_session => 1,
           handle_args => [],
           write_queue => [],
           inflight => {},
           _sub_topics => Net::MQTT::TopicStore->new(),
           %p,
          }, $pkg;
}

sub DESTROY {
  $_[0]->cleanup;
}


sub cleanup {
  my $self = shift;
  print STDERR "cleanup\n" if DEBUG;
  if ($self->{handle}) {
    my $cv = AnyEvent->condvar;
    my $handle = $self->{handle};
    weaken $handle;
    $cv->cb(sub { $handle->destroy });
    $self->_send(message_type => MQTT_DISCONNECT, cv => $cv);
  }
  delete $self->{handle};
  delete $self->{connected};
  delete $self->{wait};
  delete $self->{_keep_alive_handle};
  delete $self->{_keep_alive_waiting};
  $self->{write_queue} = [];
}

sub _error {
  my ($self, $fatal, $message, $reconnect) = @_;
  $self->cleanup($message);
  $self->{on_error}->($fatal, $message) if ($self->{on_error});
  $self->_reconnect() if ($reconnect);
}


sub publish {
  my ($self, %p) = @_;
  my $topic = exists $p{topic} ? $p{topic} :
    croak ref $self, '->publish requires "topic" parameter';
  my $qos = exists $p{qos} ? $p{qos} : MQTT_QOS_AT_MOST_ONCE;
  my $cv = exists $p{cv} ? delete $p{cv} : AnyEvent->condvar;
  my $expect;
  if ($qos) {
    $expect = ($qos == MQTT_QOS_AT_LEAST_ONCE ? MQTT_PUBACK : MQTT_PUBREC);
  }

lib/AnyEvent/MQTT.pm  view on Meta::CPAN

    my @args = @{$p{handle_args}||[]};
    print STDERR "publish: IO[$handle] => $topic @args\n" if DEBUG;
    $handle = AnyEvent::Handle->new(fh => $handle, @args);
  }
  my $error_sub = $handle->{on_error}; # Hack: There is no accessor api
  $handle->on_error(subname 'on_error_for_read_publish_'.$topic =>
                    sub {
                      my ($hdl, $fatal, $msg) = @_;
                      $error_sub->(@_) if ($error_sub);
                      $hdl->destroy;
                      undef $hdl;
                      $cv->send(1);
                    });
  my $weak_self = $self;
  weaken $weak_self;
  my @push_read_args = @{$p{push_read_args}||['line']};
  my $sub; $sub = subname 'push_read_cb_for_'.$topic => sub {
    my ($hdl, $chunk, @args) = @_;
    print STDERR "publish: $chunk => $topic\n" if DEBUG;
    my $send_cv = AnyEvent->condvar;
    print STDERR "publish: message[$chunk] => $topic\n" if DEBUG;
    $weak_self->_send_with_ack({
                           message_type => MQTT_PUBLISH,
                           qos => $qos,
                           retain => $p{retain},
                           topic => $topic,
                           message => $chunk,
                          }, $send_cv, $expect);
    $send_cv->cb(subname 'publish_ack_'.$topic =>
                 sub { $handle->push_read(@push_read_args => $sub ) });
    return;
  };
  $handle->push_read(@push_read_args => $sub);
  return $cv;
}


sub next_message_id {
  my $self = shift;
  my $res = $self->{message_id};
  $self->{message_id}++;
  $self->{message_id} = 1 if $self->{message_id} >= 65536;
  $res;
}

sub _send_with_ack {
  my ($self, $args, $cv, $expect, $dup) = @_;
  if ($args->{qos}) {
    unless (exists $args->{message_id}) {
      $args->{message_id} = $self->next_message_id();
    }
    my $mid = $args->{message_id};
    my $send_cv = AnyEvent->condvar;
    $send_cv->cb(subname 'ack_cb_for_'.$mid => sub {
                   $self->{inflight}->{$mid} =
                     {
                      expect => $expect,
                      message => $args,
                      cv => $cv,
                      timeout =>
                        AnyEvent->timer(after => $self->{keep_alive_timer},
                                        cb => subname 'ack_timeout_for_'.$mid =>
                                        sub {
                          print ref $self, " timeout waiting for ",
                            message_type_string($expect), "\n" if DEBUG;
                          delete $self->{inflight}->{$mid};
                          $self->_send_with_ack($args, $cv, $expect, 1);
                        }),
                     };
                   });
    $args->{cv} = $send_cv;
  } else {
    $args->{cv} = $cv;
  }
  $args->{dup} = 1 if ($dup);
  return $self->_send(%$args);
}


sub subscribe {
  my ($self, %p) = @_;
  my $topic = exists $p{topic} ? $p{topic} :
    croak ref $self, '->subscribe requires "topic" parameter';
  my $sub = exists $p{callback} ? $p{callback} :
    croak ref $self, '->subscribe requires "callback" parameter';
  my $qos = exists $p{qos} ? $p{qos} : MQTT_QOS_AT_MOST_ONCE;
  my $cv = exists $p{cv} ? delete $p{cv} : AnyEvent->condvar;
  my $mid = $self->_add_subscription($topic, $cv, $sub);
  if (defined $mid) { # not already subscribed/subscribing
    $self->_send(message_type => MQTT_SUBSCRIBE,
                 message_id => $mid,
                 topics => [[$topic, $qos]]);
  }
  $cv
}


sub unsubscribe {
  my ($self, %p) = @_;
  my $topic = exists $p{topic} ? $p{topic} :
    croak ref $self, '->unsubscribe requires "topic" parameter';
  my $cv = exists $p{cv} ? delete $p{cv} : AnyEvent->condvar;
  my $mid = $self->_remove_subscription($topic, $cv, $p{callback});
  if (defined $mid) { # not already subscribed/subscribing
    $self->_send(message_type => MQTT_UNSUBSCRIBE,
                 message_id => $mid,
                 topics => [$topic]);
  }
  $cv
}

sub _add_subscription {
  my ($self, $topic, $cv, $sub) = @_;
  my $rec = $self->{_sub}->{$topic};
  if ($rec) {
    print STDERR "Add $sub to existing $topic subscription\n" if DEBUG;
    $rec->{cb}->{$sub} = $sub;
    $cv->send($rec->{qos});
    foreach my $msg (values %{$rec->{retained}}) {
      $sub->($msg->topic, $msg->message, $msg);
    }

lib/AnyEvent/MQTT.pm  view on Meta::CPAN


  foreach my $cv (@{$rec->{cv}}) {
    $cv->send($qos);
  }

  # publish any matching queued QoS messages
  if (!$self->{clean_session} && $qos && $self->{_qos_msg_cache}) {
    my $cache = $self->{_qos_msg_cache};
    my $ts = Net::MQTT::TopicStore->new($topic);
    for my $i (grep { $ts->values($cache->[$_]->topic) } reverse(0..$#$cache)) {
      my $msg = delete $cache->[$i];
      print STDERR "Processing cached message for topic '", $msg->topic, "' with subscription to topic '$topic'\n" if DEBUG;
      $self->_process_publish($self->{handle}, $msg);
    }
    delete $self->{_qos_msg_cache} unless @$cache;
  }
  delete $rec->{cv};
}

sub _confirm_unsubscribe {
  my ($self, $mid) = @_;
  my $topic = delete $self->{_unsub_pending_by_message_id}->{$mid};
  unless (defined $topic) {
    carp 'UnSubAck with no pending unsubscribe for message id: ', $mid, "\n";
    return;
  }
  my $rec = delete $self->{_unsub_pending}->{$topic};
  foreach my $cv (@{$rec->{cv}}) {
    $cv->send(1);
  }
}

sub _send {
  my $self = shift;
  my %p = @_;
  my $cv = delete $p{cv};
  my $msg = Net::MQTT::Message->new(%p);
  $self->{connected} ?
    $self->_queue_write($msg, $cv) : $self->connect($msg, $cv);
}

sub _queue_write {
  my ($self, $msg, $cv) = @_;
  my $queue = $self->{write_queue};
  print STDERR 'Queuing: ', ($cv||'no cv'), ' ', $msg->string, "\n" if DEBUG;
  push @{$queue}, [$msg, $cv];
  $self->_write_now unless (defined $self->{_waiting});
  $cv;
}

sub _write_now {
  my $self = shift;
  my ($msg, $cv);
  undef $self->{_waiting};
  if (@_) {
    ($msg, $cv) = @_;
  } else {
    my $args = shift @{$self->{write_queue}} or return;
    ($msg, $cv) = @$args;
  }
  $self->_reset_keep_alive_timer();
  print STDERR "Sending: ", $msg->string, "\n" if DEBUG;
  $self->{message_log_callback}->('>', $msg) if ($self->{message_log_callback});
  $self->{_waiting} = [$msg, $cv];
  print '  ', (unpack 'H*', $msg->bytes), "\n" if DEBUG;
  $self->{handle}->push_write($msg->bytes);
  $cv;
}

sub _reset_keep_alive_timer {
  my ($self, $wait) = @_;
  undef $self->{_keep_alive_handle};
  my $method = $wait ? '_keep_alive_timeout' : '_send_keep_alive';
  $self->{_keep_alive_waiting} = $wait;
  my $weak_self = $self;
  weaken $weak_self;
  $self->{_keep_alive_handle} =
    AnyEvent->timer(after => $self->{keep_alive_timer},
                    cb => subname((substr $method, 1).'_cb' =>
                                  sub { $weak_self->$method(@_) }));
}

sub _send_keep_alive {
  my $self = shift;
  print STDERR "Sending: keep alive\n" if DEBUG;
  $self->_send(message_type => MQTT_PINGREQ);
  $self->_reset_keep_alive_timer(1);
}

sub _keep_alive_timeout {
  my $self = shift;
  print STDERR "keep alive timeout\n" if DEBUG;
  undef $self->{_keep_alive_waiting};
  $self->{handle}->destroy;
  $self->_error(0, 'keep alive timeout', 1);
}

sub _keep_alive_received {
  my $self = shift;
  print STDERR "keep alive received\n" if DEBUG;
  return unless (defined $self->{_keep_alive_waiting});
  $self->_reset_keep_alive_timer();
}


sub connect {
  my ($self, $msg, $cv) = @_;
  print STDERR "connect\n" if DEBUG;
  $self->{_waiting} = 'connect';
  if ($msg) {
    $cv = AnyEvent->condvar unless ($cv);
    $self->_queue_write($msg, $cv);
  } else {
    $self->{connect_cv} = AnyEvent->condvar unless (exists $self->{connect_cv});
    $cv = $self->{connect_cv};
  }
  return $cv if ($self->{handle});

  my $weak_self = $self;
  weaken $weak_self;

  my $hd;
  $hd = $self->{handle} =
    AnyEvent::Handle->new(connect => [$self->{host}, $self->{port}],
                          ($self->{tls} ? (tls => "connect") : ()),
                          on_error => subname('on_error_cb' => sub {
                            my ($handle, $fatal, $message) = @_;
                            print STDERR "handle error $_[1]\n" if DEBUG;
                            $handle->destroy;
                            $weak_self->_error($fatal, 'Error: '.$message, 0);
                          }),
                          on_eof => subname('on_eof_cb' => sub {
                            my ($handle) = @_;
                            print STDERR "handle eof\n" if DEBUG;
                            $handle->destroy;
                            $weak_self->_error(1, 'EOF', 1);
                          }),
                          on_timeout => subname('on_timeout_cb' => sub {
                            $weak_self->_error(0, $weak_self->{wait}.' timeout', 1);
                            $weak_self->{wait} = 'nothing';
                          }),
                          on_connect => subname('on_connect_cb' => sub {
                            my ($handle, $host, $port, $retry) = @_;
                            print STDERR "TCP handshake complete\n" if DEBUG;
                            # call user-defined on_connect function.
                            $weak_self->{on_connect}->($handle, $retry) if $weak_self->{on_connect};
                            my $msg =
                              Net::MQTT::Message->new(
                                message_type => MQTT_CONNECT,
                                keep_alive_timer => $weak_self->{keep_alive_timer},
                                client_id => $weak_self->{client_id},
                                clean_session => $weak_self->{clean_session},
                                will_topic => $weak_self->{will_topic},
                                will_qos => $weak_self->{will_qos},
                                will_retain => $weak_self->{will_retain},
                                will_message => $weak_self->{will_message},
                                user_name => $weak_self->{user_name},
                                password => $weak_self->{password},
                              );
                            $weak_self->_write_now($msg);
                            $handle->timeout($weak_self->{timeout});
                            $weak_self->{wait} = 'connack';
                            $handle->on_read(subname 'on_read_cb' => sub {
                              my ($hdl) = @_;
                              $hdl->push_read(ref $weak_self =>
                                              subname 'reader_cb' => sub {
                                                $weak_self->_handle_message(@_);
                                                1;
                                              });
                            });
                          }),
                          @{$self->{handle_args}},
                         );
  return $cv
}

sub _reconnect {
  my $self = shift;
  print STDERR "reconnecting:\n" if DEBUG;

  # must resubscribe everything
  if ($self->{clean_session}) {
    $self->{_sub_topics} = Net::MQTT::TopicStore->new();
    $self->{_sub_reconnect} = delete $self->{_sub} || {};
  }

  $self->connect(@_);
}

sub _handle_message {
  my $self = shift;
  my ($handle, $msg, $error) = @_;
  return $self->_error(0, $error, 1) if ($error);
  $self->{message_log_callback}->('<', $msg) if ($self->{message_log_callback});
  $self->_call_callback('before_msg_callback' => $msg) or return;
  my $msg_type = lc ref $msg;
  $msg_type =~ s/^.*:://;
  $self->_call_callback('before_'.$msg_type.'_callback' => $msg) or return;
  my $method = '_process_'.$msg_type;
  unless ($self->can($method)) {
    carp 'Unsupported message ', $msg->string(), "\n";
    return;
  }
  my $res = $self->$method(@_);
  $self->_call_callback('after_'.$msg_type.'_callback' => $msg, $res);
  $res;
}

sub _call_callback {
  my $self = shift;

lib/AnyEvent/MQTT.pm  view on Meta::CPAN

AnyEvent::MQTT - AnyEvent module for an MQTT client

=head1 VERSION

version 1.212810

=head1 SYNOPSIS

  use AnyEvent::MQTT;
  my $mqtt = AnyEvent::MQTT->new;
  my $cv = $mqtt->subscribe(topic => '/topic',
                            callback => sub {
                                 my ($topic, $message) = @_;
                                 print $topic, ' ', $message, "\n"
                               });
  my $qos = $cv->recv; # subscribed, negotiated QoS == $qos

  # publish a simple message
  $cv = $mqtt->publish(message => 'simple message',
                          topic => '/topic');
  $cv->recv; # sent

  # publish line-by-line from file handle
  $cv =  $mqtt->publish(handle => \*STDIN,
                        topic => '/topic');
  $cv->recv; # sent

  # publish from AnyEvent::Handle
  $cv = $mqtt->publish(handle => AnyEvent::Handle->new(my %handle_args),
                       topic => '/topic');
  $cv->recv; # sent

=head1 DESCRIPTION

AnyEvent module for MQTT client.

B<IMPORTANT:> This is an early release and the API is still subject to
change.

=head1 METHODS

=head2 C<new(%params)>

Constructs a new C<AnyEvent::MQTT> object.  The supported parameters
are:

=over

=item C<host>

The server host.  Defaults to C<127.0.0.1>.

=item C<port>

The server port.  Defaults to C<1883>.

=item C<timeout>

The timeout for responses from the server.

=item C<keep_alive_timer>

The keep alive timer.

=item C<user_name>

The user name for the MQTT broker.

=item C<password>

The password for the MQTT broker.

=item C<tls>

Set flag to enable TLS encryption, Default is no encryption.

=item C<will_topic>

Set topic for will message.  Default is undef which means no will
message will be configured.

=item C<will_qos>

Set QoS for will message.  Default is 'at-most-once'.

=item C<will_retain>

Set retain flag for will message.  Default is 0.

=item C<will_message>

Set message for will message.  Default is the empty message.

=item C<clean_session>

Set clean session flag for connect message.  Default is 1.

=item C<client_id>

Sets the client id for the client overriding the default which
is C<NetMQTTpmNNNNN> where NNNNN is the current process id.

=item C<message_log_callback>

Defines a callback to call on every message.

=item C<on_error>

Defines a callback to call when some error occurs.

Two parameters are passed to the callback.

    $on_error->($fatal, $message)

where C<$fatal> is a boolean flag and C<$message> is the error message.
If the error is fatal, C<$fatal> is true.

=item C<handle_args>

  a reference to a list to pass as arguments to the
  L<AnyEvent::Handle> constructor (defaults to
  an empty list reference).



( run in 1.203 second using v1.01-cache-2.11-cpan-39bf76dae61 )