AnyEvent-MQTT

 view release on metacpan or  search on metacpan

lib/AnyEvent/MQTT.pm  view on Meta::CPAN

use strict;
use warnings;
package AnyEvent::MQTT;
$AnyEvent::MQTT::VERSION = '1.212810';
# ABSTRACT: AnyEvent module for an MQTT client


use constant DEBUG => $ENV{ANYEVENT_MQTT_DEBUG};
use AnyEvent;
use AnyEvent::Handle;
use Net::MQTT::Constants;
use Net::MQTT::Message;
use Net::MQTT::TopicStore;
use Carp qw/croak carp/;
use Sub::Name;
use Scalar::Util qw/weaken/;


sub new {
  my ($pkg, %p) = @_;
  my $self =
    bless {
           socket => undef,
           host => '127.0.0.1',
           port => '1883',
           timeout => 30,
           wait => 'nothing',
           keep_alive_timer => 120,
           qos => MQTT_QOS_AT_MOST_ONCE,
           message_id => 1,
           user_name => undef,
           password => undef,
           tls => undef,
           will_topic => undef,
           will_qos => MQTT_QOS_AT_MOST_ONCE,
           will_retain => 0,
           will_message => '',
           client_id => undef,
           clean_session => 1,
           handle_args => [],
           write_queue => [],
           inflight => {},
           _sub_topics => Net::MQTT::TopicStore->new(),
           %p,
          }, $pkg;
}

sub DESTROY {
  $_[0]->cleanup;
}


sub cleanup {
  my $self = shift;
  print STDERR "cleanup\n" if DEBUG;
  if ($self->{handle}) {
    my $cv = AnyEvent->condvar;
    my $handle = $self->{handle};
    weaken $handle;
    $cv->cb(sub { $handle->destroy });
    $self->_send(message_type => MQTT_DISCONNECT, cv => $cv);
  }
  delete $self->{handle};
  delete $self->{connected};
  delete $self->{wait};
  delete $self->{_keep_alive_handle};
  delete $self->{_keep_alive_waiting};
  $self->{write_queue} = [];
}

sub _error {
  my ($self, $fatal, $message, $reconnect) = @_;
  $self->cleanup($message);
  $self->{on_error}->($fatal, $message) if ($self->{on_error});
  $self->_reconnect() if ($reconnect);
}


sub publish {
  my ($self, %p) = @_;
  my $topic = exists $p{topic} ? $p{topic} :
    croak ref $self, '->publish requires "topic" parameter';
  my $qos = exists $p{qos} ? $p{qos} : MQTT_QOS_AT_MOST_ONCE;
  my $cv = exists $p{cv} ? delete $p{cv} : AnyEvent->condvar;
  my $expect;
  if ($qos) {
    $expect = ($qos == MQTT_QOS_AT_LEAST_ONCE ? MQTT_PUBACK : MQTT_PUBREC);
  }
  my $message = $p{message};
  if (defined $message) {
    print STDERR "publish: message[$message] => $topic\n" if DEBUG;
    $self->_send_with_ack({
                           message_type => MQTT_PUBLISH,
                           %p,
                          }, $cv, $expect);
    return $cv;
  }
  my $handle = exists $p{handle} ? $p{handle} :
    croak ref $self, '->publish requires "message" or "handle" parameter';
  unless ($handle->isa('AnyEvent::Handle')) {
    my @args = @{$p{handle_args}||[]};
    print STDERR "publish: IO[$handle] => $topic @args\n" if DEBUG;
    $handle = AnyEvent::Handle->new(fh => $handle, @args);
  }
  my $error_sub = $handle->{on_error}; # Hack: There is no accessor api
  $handle->on_error(subname 'on_error_for_read_publish_'.$topic =>
                    sub {
                      my ($hdl, $fatal, $msg) = @_;
                      $error_sub->(@_) if ($error_sub);
                      $hdl->destroy;
                      undef $hdl;
                      $cv->send(1);
                    });
  my $weak_self = $self;
  weaken $weak_self;
  my @push_read_args = @{$p{push_read_args}||['line']};
  my $sub; $sub = subname 'push_read_cb_for_'.$topic => sub {
    my ($hdl, $chunk, @args) = @_;
    print STDERR "publish: $chunk => $topic\n" if DEBUG;
    my $send_cv = AnyEvent->condvar;
    print STDERR "publish: message[$chunk] => $topic\n" if DEBUG;
    $weak_self->_send_with_ack({
                           message_type => MQTT_PUBLISH,
                           qos => $qos,
                           retain => $p{retain},
                           topic => $topic,
                           message => $chunk,
                          }, $send_cv, $expect);
    $send_cv->cb(subname 'publish_ack_'.$topic =>
                 sub { $handle->push_read(@push_read_args => $sub ) });
    return;
  };
  $handle->push_read(@push_read_args => $sub);
  return $cv;
}


sub next_message_id {
  my $self = shift;
  my $res = $self->{message_id};
  $self->{message_id}++;
  $self->{message_id} = 1 if $self->{message_id} >= 65536;
  $res;
}

sub _send_with_ack {
  my ($self, $args, $cv, $expect, $dup) = @_;
  if ($args->{qos}) {
    unless (exists $args->{message_id}) {
      $args->{message_id} = $self->next_message_id();
    }
    my $mid = $args->{message_id};
    my $send_cv = AnyEvent->condvar;
    $send_cv->cb(subname 'ack_cb_for_'.$mid => sub {
                   $self->{inflight}->{$mid} =
                     {
                      expect => $expect,
                      message => $args,
                      cv => $cv,
                      timeout =>
                        AnyEvent->timer(after => $self->{keep_alive_timer},
                                        cb => subname 'ack_timeout_for_'.$mid =>
                                        sub {
                          print ref $self, " timeout waiting for ",
                            message_type_string($expect), "\n" if DEBUG;
                          delete $self->{inflight}->{$mid};
                          $self->_send_with_ack($args, $cv, $expect, 1);
                        }),
                     };
                   });
    $args->{cv} = $send_cv;
  } else {
    $args->{cv} = $cv;
  }
  $args->{dup} = 1 if ($dup);

lib/AnyEvent/MQTT.pm  view on Meta::CPAN

  }
  delete $rec->{cv};
}

sub _confirm_unsubscribe {
  my ($self, $mid) = @_;
  my $topic = delete $self->{_unsub_pending_by_message_id}->{$mid};
  unless (defined $topic) {
    carp 'UnSubAck with no pending unsubscribe for message id: ', $mid, "\n";
    return;
  }
  my $rec = delete $self->{_unsub_pending}->{$topic};
  foreach my $cv (@{$rec->{cv}}) {
    $cv->send(1);
  }
}

sub _send {
  my $self = shift;
  my %p = @_;
  my $cv = delete $p{cv};
  my $msg = Net::MQTT::Message->new(%p);
  $self->{connected} ?
    $self->_queue_write($msg, $cv) : $self->connect($msg, $cv);
}

sub _queue_write {
  my ($self, $msg, $cv) = @_;
  my $queue = $self->{write_queue};
  print STDERR 'Queuing: ', ($cv||'no cv'), ' ', $msg->string, "\n" if DEBUG;
  push @{$queue}, [$msg, $cv];
  $self->_write_now unless (defined $self->{_waiting});
  $cv;
}

sub _write_now {
  my $self = shift;
  my ($msg, $cv);
  undef $self->{_waiting};
  if (@_) {
    ($msg, $cv) = @_;
  } else {
    my $args = shift @{$self->{write_queue}} or return;
    ($msg, $cv) = @$args;
  }
  $self->_reset_keep_alive_timer();
  print STDERR "Sending: ", $msg->string, "\n" if DEBUG;
  $self->{message_log_callback}->('>', $msg) if ($self->{message_log_callback});
  $self->{_waiting} = [$msg, $cv];
  print '  ', (unpack 'H*', $msg->bytes), "\n" if DEBUG;
  $self->{handle}->push_write($msg->bytes);
  $cv;
}

sub _reset_keep_alive_timer {
  my ($self, $wait) = @_;
  undef $self->{_keep_alive_handle};
  my $method = $wait ? '_keep_alive_timeout' : '_send_keep_alive';
  $self->{_keep_alive_waiting} = $wait;
  my $weak_self = $self;
  weaken $weak_self;
  $self->{_keep_alive_handle} =
    AnyEvent->timer(after => $self->{keep_alive_timer},
                    cb => subname((substr $method, 1).'_cb' =>
                                  sub { $weak_self->$method(@_) }));
}

sub _send_keep_alive {
  my $self = shift;
  print STDERR "Sending: keep alive\n" if DEBUG;
  $self->_send(message_type => MQTT_PINGREQ);
  $self->_reset_keep_alive_timer(1);
}

sub _keep_alive_timeout {
  my $self = shift;
  print STDERR "keep alive timeout\n" if DEBUG;
  undef $self->{_keep_alive_waiting};
  $self->{handle}->destroy;
  $self->_error(0, 'keep alive timeout', 1);
}

sub _keep_alive_received {
  my $self = shift;
  print STDERR "keep alive received\n" if DEBUG;
  return unless (defined $self->{_keep_alive_waiting});
  $self->_reset_keep_alive_timer();
}


sub connect {
  my ($self, $msg, $cv) = @_;
  print STDERR "connect\n" if DEBUG;
  $self->{_waiting} = 'connect';
  if ($msg) {
    $cv = AnyEvent->condvar unless ($cv);
    $self->_queue_write($msg, $cv);
  } else {
    $self->{connect_cv} = AnyEvent->condvar unless (exists $self->{connect_cv});
    $cv = $self->{connect_cv};
  }
  return $cv if ($self->{handle});

  my $weak_self = $self;
  weaken $weak_self;

  my $hd;
  $hd = $self->{handle} =
    AnyEvent::Handle->new(connect => [$self->{host}, $self->{port}],
                          ($self->{tls} ? (tls => "connect") : ()),
                          on_error => subname('on_error_cb' => sub {
                            my ($handle, $fatal, $message) = @_;
                            print STDERR "handle error $_[1]\n" if DEBUG;
                            $handle->destroy;
                            $weak_self->_error($fatal, 'Error: '.$message, 0);
                          }),
                          on_eof => subname('on_eof_cb' => sub {
                            my ($handle) = @_;
                            print STDERR "handle eof\n" if DEBUG;
                            $handle->destroy;
                            $weak_self->_error(1, 'EOF', 1);
                          }),
                          on_timeout => subname('on_timeout_cb' => sub {
                            $weak_self->_error(0, $weak_self->{wait}.' timeout', 1);
                            $weak_self->{wait} = 'nothing';
                          }),
                          on_connect => subname('on_connect_cb' => sub {
                            my ($handle, $host, $port, $retry) = @_;
                            print STDERR "TCP handshake complete\n" if DEBUG;
                            # call user-defined on_connect function.
                            $weak_self->{on_connect}->($handle, $retry) if $weak_self->{on_connect};
                            my $msg =
                              Net::MQTT::Message->new(
                                message_type => MQTT_CONNECT,
                                keep_alive_timer => $weak_self->{keep_alive_timer},
                                client_id => $weak_self->{client_id},
                                clean_session => $weak_self->{clean_session},
                                will_topic => $weak_self->{will_topic},
                                will_qos => $weak_self->{will_qos},
                                will_retain => $weak_self->{will_retain},
                                will_message => $weak_self->{will_message},
                                user_name => $weak_self->{user_name},
                                password => $weak_self->{password},
                              );
                            $weak_self->_write_now($msg);
                            $handle->timeout($weak_self->{timeout});
                            $weak_self->{wait} = 'connack';
                            $handle->on_read(subname 'on_read_cb' => sub {
                              my ($hdl) = @_;
                              $hdl->push_read(ref $weak_self =>
                                              subname 'reader_cb' => sub {
                                                $weak_self->_handle_message(@_);
                                                1;
                                              });
                            });
                          }),
                          @{$self->{handle_args}},
                         );
  return $cv
}

sub _reconnect {
  my $self = shift;
  print STDERR "reconnecting:\n" if DEBUG;

  # must resubscribe everything
  if ($self->{clean_session}) {
    $self->{_sub_topics} = Net::MQTT::TopicStore->new();
    $self->{_sub_reconnect} = delete $self->{_sub} || {};
  }

  $self->connect(@_);
}

sub _handle_message {
  my $self = shift;
  my ($handle, $msg, $error) = @_;
  return $self->_error(0, $error, 1) if ($error);
  $self->{message_log_callback}->('<', $msg) if ($self->{message_log_callback});
  $self->_call_callback('before_msg_callback' => $msg) or return;
  my $msg_type = lc ref $msg;
  $msg_type =~ s/^.*:://;
  $self->_call_callback('before_'.$msg_type.'_callback' => $msg) or return;
  my $method = '_process_'.$msg_type;
  unless ($self->can($method)) {
    carp 'Unsupported message ', $msg->string(), "\n";
    return;
  }
  my $res = $self->$method(@_);
  $self->_call_callback('after_'.$msg_type.'_callback' => $msg, $res);
  $res;
}

sub _call_callback {
  my $self = shift;
  my $cb_name = shift;
  return 1 unless (exists $self->{$cb_name});
  $self->{$cb_name}->(@_);
}

sub _process_connack {
  my ($self, $handle, $msg, $error) = @_;
  $handle->timeout(undef);
  unless ($msg->return_code == MQTT_CONNECT_ACCEPTED) {
    return $self->_error(1, 'Connection refused: '.$msg->string, 0);
  }
  print STDERR "Connection ready:\n", $msg->string('  '), "\n" if DEBUG;
  $self->_write_now();
  $self->{connected} = 1;
  $self->{connect_cv}->send(1) if ($self->{connect_cv});
  delete $self->{connect_cv};

  my $weak_self = $self;
  weaken $weak_self;

  $handle->on_drain(subname 'on_drain_cb' => sub {
                      print STDERR "drained\n" if DEBUG;
                      my $w = $weak_self->{_waiting};
                      $w->[1]->send(1) if (ref $w && defined $w->[1]);
                      $weak_self->_write_now;
                      1;
                    });

  # handle reconnect
  while (my ($topic, $rec) = each %{$self->{_sub_reconnect}}) {
    print STDERR "Resubscribing to '$topic':\n" if DEBUG;
    for my $cb (values %{$rec->{cb}}) {
      $self->subscribe(topic => $topic, callback => $cb, qos => $rec->{qos});
    }
  }
  delete $self->{_sub_reconnect};
  return
}

sub _process_pingresp {
  shift->_keep_alive_received();
}

sub _process_suback {
  my ($self, $handle, $msg, $error) = @_;
  print STDERR "Confirmed subscription:\n", $msg->string('  '), "\n" if DEBUG;
  $self->_confirm_subscription($msg->message_id, $msg->qos_levels->[0]);
  return
}

sub _process_unsuback {
  my ($self, $handle, $msg, $error) = @_;
  print STDERR "Confirmed unsubscribe:\n", $msg->string('  '), "\n" if DEBUG;
  $self->_confirm_unsubscribe($msg->message_id);
  return
}

sub _publish_locally {
  my ($self, $msg) = @_;
  my $msg_topic = $msg->topic;
  my $msg_data = $msg->message;
  my $matches = $self->{_sub_topics}->values($msg_topic);
  unless (scalar @$matches) {
    carp "Unexpected publish:\n", $msg->string('  '), "\n";
    return;
  }
  my %matched;
  my $msg_retain = $msg->retain;
  foreach my $topic (@$matches) {
    my $rec = $self->{_sub}->{$topic};
    if ($msg_retain) {
      if ($msg_data eq '') {
        delete $rec->{retained}->{$msg_topic};
        print STDERR "  retained cleared\n" if DEBUG;
      } else {
        $rec->{retained}->{$msg_topic} = $msg;
        print STDERR "  retained '", $msg_data, "'\n" if DEBUG;
      }
    }

lib/AnyEvent/MQTT.pm  view on Meta::CPAN

  }
  my $exp_type = $self->{inflight}->{$mid}->{expect};
  my $got_type = $msg->message_type;
  unless ($got_type == $exp_type) {
    carp 'Received ', message_type_string($got_type), ' but expected ',
      message_type_string($exp_type), " for message id $mid\n";
    return;
  }
  return delete $self->{inflight}->{$mid};
}

sub _process_puback {
  my ($self, $handle, $msg, $error) = @_;
  my $rec = $self->_inflight_record($msg) or return;
  my $mid = $msg->message_id;
  print STDERR 'PubAck: ', $mid, ' ', $rec->{cv}, "\n" if DEBUG;
  $rec->{cv}->send(1);
  return 1;
}

sub _process_pubrec {
  my ($self, $handle, $msg, $error) = @_;
  my $rec = $self->_inflight_record($msg) or return;
  my $mid = $msg->message_id;
  print STDERR 'PubRec: ', $mid, ' ', $rec->{cv}, "\n" if DEBUG;
  $self->_send_with_ack({
                           message_type => MQTT_PUBREL,
                           qos => MQTT_QOS_AT_LEAST_ONCE,
                           message_id => $mid,
                          }, $rec->{cv}, MQTT_PUBCOMP);
}

sub _process_pubrel {
  my ($self, $handle, $msg, $error) = @_;
  my $mid = $msg->message_id;
  print STDERR 'PubRel: ', $mid, "\n" if DEBUG;
  my $pubmsg = delete $self->{messages}->{$mid};
  unless ($pubmsg) {
    carp "Unexpected message for message id $mid\n  ".$msg->string;
    return;
  }
  $self->_publish_locally($pubmsg);
  $self->_send(message_type => MQTT_PUBCOMP, message_id => $mid);
}

sub _process_pubcomp {
  my ($self, $handle, $msg, $error) = @_;
  my $rec = $self->_inflight_record($msg) or return;
  my $mid = $msg->message_id;
  print STDERR 'PubComp: ', $mid, ' ', $rec->{cv}, "\n" if DEBUG;
  $rec->{cv}->send(1);
  return 1;
}


sub anyevent_read_type {
  my ($handle, $cb) = @_;
  subname 'anyevent_read_type_reader' => sub {
    my ($handle) = @_;
    my $rbuf = \$handle->{rbuf};
    weaken $rbuf;
    return unless (defined $$rbuf);
    while (1) {
      my $msg = Net::MQTT::Message->new_from_bytes($$rbuf, 1);
      last unless ($msg);
      $cb->($handle, $msg);
    }
    return;
  };
}

1;

__END__

=pod

=encoding UTF-8

=head1 NAME

AnyEvent::MQTT - AnyEvent module for an MQTT client

=head1 VERSION

version 1.212810

=head1 SYNOPSIS

  use AnyEvent::MQTT;
  my $mqtt = AnyEvent::MQTT->new;
  my $cv = $mqtt->subscribe(topic => '/topic',
                            callback => sub {
                                 my ($topic, $message) = @_;
                                 print $topic, ' ', $message, "\n"
                               });
  my $qos = $cv->recv; # subscribed, negotiated QoS == $qos

  # publish a simple message
  $cv = $mqtt->publish(message => 'simple message',
                          topic => '/topic');
  $cv->recv; # sent

  # publish line-by-line from file handle
  $cv =  $mqtt->publish(handle => \*STDIN,
                        topic => '/topic');
  $cv->recv; # sent

  # publish from AnyEvent::Handle
  $cv = $mqtt->publish(handle => AnyEvent::Handle->new(my %handle_args),
                       topic => '/topic');
  $cv->recv; # sent

=head1 DESCRIPTION

AnyEvent module for MQTT client.

B<IMPORTANT:> This is an early release and the API is still subject to
change.

=head1 METHODS



( run in 1.754 second using v1.01-cache-2.11-cpan-ceb78f64989 )