AnyEvent-MQTT

 view release on metacpan or  search on metacpan

README  view on Meta::CPAN

      The server host. Defaults to 127.0.0.1.

    port

      The server port. Defaults to 1883.

    timeout

      The timeout for responses from the server.

    keep_alive_timer

      The keep alive timer.

    user_name

      The user name for the MQTT broker.

    password

      The password for the MQTT broker.

    tls

bin/anyevent-mqtt-monitor  view on Meta::CPAN


my $xpl;
my $help;
my $man;
my $verbose = 0;
my $retain = 1;
my $history_size = 20;
my $host = '127.0.0.1';
my $port = 1883;
my $qos = MQTT_QOS_AT_MOST_ONCE;
my $keep_alive_timer = 120;
GetOptions('help|?' => \$help,
           'man' => \$man,
           'verbose+' => \$verbose,
           'retain!' => \$retain,
           'history-size=i' => \$history_size,
           'host=s' => \$host,
           'port=i' => \$port,
           'qos=i' => \$qos,
           'keepalive=i' => \$keep_alive_timer) or pod2usage(2);
pod2usage(1) if ($help);
pod2usage(-exitstatus => 0, -verbose => 2) if $man;

my $mqtt =
  AnyEvent::MQTT->new(host => $host, port => $port,
                      keep_alive_timer => $keep_alive_timer,
                      on_error => sub {
                        my ($fatal, $message) = @_;
                        if ($fatal) {
                          die $message, "\n";
                        } else {
                          warn $message, "\n";
                        }
                      });

foreach my $topic (scalar @ARGV ? @ARGV : '#') {

bin/anyevent-mqtt-monitor  view on Meta::CPAN


The QoS level for the published message.  The default is
0 (C<MQTT_QOS_AT_MOST_ONCE>).

=item B<-verbose>

Include more verbose output.

=item B<-keepalive NNN>

The keep alive timer value.  Defaults to 120 seconds.  For simplicity,
it is also currently used as the connection/subscription timeout.

=item B<--history-size NNN>

Number of messages to keep for each topic.  Defaults to keeping 20 messages.

=item B<--no-retain>

Ignore retained messages.  That is, wait for new messages rather than
processing existing retained messages.

bin/anyevent-mqtt-pub  view on Meta::CPAN

use AnyEvent::MQTT;
use Getopt::Long;
use Pod::Usage;

my $help;
my $man;
my $verbose = 0;
my $host = '127.0.0.1';
my $port = 1883;
my $qos = MQTT_QOS_AT_MOST_ONCE;
my $keep_alive_timer = 120;
my $retain;
my $client_id;
GetOptions('help|?' => \$help,
           'man' => \$man,
           'verbose+' => \$verbose,
           'host=s' => \$host,
           'port=i' => \$port,
           'qos=i' => \$qos,
           'keepalive=i' => \$keep_alive_timer,
           'retain' => \$retain,
           'client_id|client-id|C=s' => \$client_id,
          ) or pod2usage(2);
pod2usage(1) if ($help);
pod2usage(-exitstatus => 0, -verbose => 2) if $man;
my $topic = shift || pod2usage(2); # need a topic

my @args;
push @args, client_id => $client_id if (defined $client_id);
my $mqtt =
  AnyEvent::MQTT->new(host => $host, port => $port,
                      keep_alive_timer => $keep_alive_timer,
                      @args,
                      on_error => sub {
                        my ($fatal, $message) = @_;
                        if ($fatal) {
                          die $message, "\n";
                        } else {
                          warn $message, "\n";
                        }
                      });

bin/anyevent-mqtt-pub  view on Meta::CPAN

The QoS level for the published message.  The default is
0 (C<MQTT_QOS_AT_MOST_ONCE>).

=item B<-verbose>

Include more verbose output.  Without this option the script only
outputs errors

=item B<-keepalive NNN>

The keep alive timer value.  Defaults to 120 seconds.  For simplicity,
it is also currently used as the connection timeout.

=item B<-retain>

Set the retain flag on the message.  Default is not set.

=back

=head1 SEE ALSO

bin/anyevent-mqtt-sub  view on Meta::CPAN

use Pod::Usage;

my $help;
my $man;
my $verbose = 0;
my $host = '127.0.0.1';
my $port = 1883;
my $retain = 1;
my $qos = MQTT_QOS_AT_MOST_ONCE;
my $count;
my $keep_alive_timer = 120;
my $client_id;
my $code;
GetOptions('help|?' => \$help,
           'man' => \$man,
           'verbose+' => \$verbose,
           'host=s' => \$host,
           'port=i' => \$port,
           'retain!' => \$retain,
           'qos=i' => \$qos,
           'count=i' => \$count,
           'one|1' => sub { $count = 1 },
           'keepalive=i' => \$keep_alive_timer,
           'client_id|client-id|C=s' => \$client_id,
           'code|e=s' => \$code) or pod2usage(2);
pod2usage(1) if ($help);
pod2usage(-exitstatus => 0, -verbose => 2) if $man;
pod2usage(2) unless (@ARGV); # need a topic

my @args;
push @args, client_id => $client_id if (defined $client_id);
my $mqtt =
  AnyEvent::MQTT->new(host => $host, port => $port,
                      keep_alive_timer => $keep_alive_timer,
                      @args,
                      on_error => sub {
                        my ($fatal, $message) = @_;
                        if ($fatal) {
                          die $message, "\n";
                        } else {
                          warn $message, "\n";
                        }
                      });

bin/anyevent-mqtt-sub  view on Meta::CPAN


With one B<-verbose> options, publish messages are printed in a form
of a summary of the header fields and the payload in hex dump and text
form.

With two B<-verbose> options, summaries are printed for all messages
sent and received.

=item B<-keepalive NNN>

The keep alive timer value.  Defaults to 120 seconds.  For simplicity,
it is also currently used as the connection/subscription timeout.

=item B<-count NNN>

Read the specificed number of MQTT messages and then exit.  Default
is 0 - read forever.

=item B<-one> or B<-1>

Short for B<-count 1>.  Read one message and exit.

lib/AnyEvent/MQTT.pm  view on Meta::CPAN


sub new {
  my ($pkg, %p) = @_;
  my $self =
    bless {
           socket => undef,
           host => '127.0.0.1',
           port => '1883',
           timeout => 30,
           wait => 'nothing',
           keep_alive_timer => 120,
           qos => MQTT_QOS_AT_MOST_ONCE,
           message_id => 1,
           user_name => undef,
           password => undef,
           tls => undef,
           will_topic => undef,
           will_qos => MQTT_QOS_AT_MOST_ONCE,
           will_retain => 0,
           will_message => '',
           client_id => undef,

lib/AnyEvent/MQTT.pm  view on Meta::CPAN

    }
    my $mid = $args->{message_id};
    my $send_cv = AnyEvent->condvar;
    $send_cv->cb(subname 'ack_cb_for_'.$mid => sub {
                   $self->{inflight}->{$mid} =
                     {
                      expect => $expect,
                      message => $args,
                      cv => $cv,
                      timeout =>
                        AnyEvent->timer(after => $self->{keep_alive_timer},
                                        cb => subname 'ack_timeout_for_'.$mid =>
                                        sub {
                          print ref $self, " timeout waiting for ",
                            message_type_string($expect), "\n" if DEBUG;
                          delete $self->{inflight}->{$mid};
                          $self->_send_with_ack($args, $cv, $expect, 1);
                        }),
                     };
                   });
    $args->{cv} = $send_cv;

lib/AnyEvent/MQTT.pm  view on Meta::CPAN

sub _write_now {
  my $self = shift;
  my ($msg, $cv);
  undef $self->{_waiting};
  if (@_) {
    ($msg, $cv) = @_;
  } else {
    my $args = shift @{$self->{write_queue}} or return;
    ($msg, $cv) = @$args;
  }
  $self->_reset_keep_alive_timer();
  print STDERR "Sending: ", $msg->string, "\n" if DEBUG;
  $self->{message_log_callback}->('>', $msg) if ($self->{message_log_callback});
  $self->{_waiting} = [$msg, $cv];
  print '  ', (unpack 'H*', $msg->bytes), "\n" if DEBUG;
  $self->{handle}->push_write($msg->bytes);
  $cv;
}

sub _reset_keep_alive_timer {
  my ($self, $wait) = @_;
  undef $self->{_keep_alive_handle};
  my $method = $wait ? '_keep_alive_timeout' : '_send_keep_alive';
  $self->{_keep_alive_waiting} = $wait;
  my $weak_self = $self;
  weaken $weak_self;
  $self->{_keep_alive_handle} =
    AnyEvent->timer(after => $self->{keep_alive_timer},
                    cb => subname((substr $method, 1).'_cb' =>
                                  sub { $weak_self->$method(@_) }));
}

sub _send_keep_alive {
  my $self = shift;
  print STDERR "Sending: keep alive\n" if DEBUG;
  $self->_send(message_type => MQTT_PINGREQ);
  $self->_reset_keep_alive_timer(1);
}

sub _keep_alive_timeout {
  my $self = shift;
  print STDERR "keep alive timeout\n" if DEBUG;
  undef $self->{_keep_alive_waiting};
  $self->{handle}->destroy;
  $self->_error(0, 'keep alive timeout', 1);
}

sub _keep_alive_received {
  my $self = shift;
  print STDERR "keep alive received\n" if DEBUG;
  return unless (defined $self->{_keep_alive_waiting});
  $self->_reset_keep_alive_timer();
}


sub connect {
  my ($self, $msg, $cv) = @_;
  print STDERR "connect\n" if DEBUG;
  $self->{_waiting} = 'connect';
  if ($msg) {
    $cv = AnyEvent->condvar unless ($cv);
    $self->_queue_write($msg, $cv);

lib/AnyEvent/MQTT.pm  view on Meta::CPAN

                            $weak_self->{wait} = 'nothing';
                          }),
                          on_connect => subname('on_connect_cb' => sub {
                            my ($handle, $host, $port, $retry) = @_;
                            print STDERR "TCP handshake complete\n" if DEBUG;
                            # call user-defined on_connect function.
                            $weak_self->{on_connect}->($handle, $retry) if $weak_self->{on_connect};
                            my $msg =
                              Net::MQTT::Message->new(
                                message_type => MQTT_CONNECT,
                                keep_alive_timer => $weak_self->{keep_alive_timer},
                                client_id => $weak_self->{client_id},
                                clean_session => $weak_self->{clean_session},
                                will_topic => $weak_self->{will_topic},
                                will_qos => $weak_self->{will_qos},
                                will_retain => $weak_self->{will_retain},
                                will_message => $weak_self->{will_message},
                                user_name => $weak_self->{user_name},
                                password => $weak_self->{password},
                              );
                            $weak_self->_write_now($msg);

lib/AnyEvent/MQTT.pm  view on Meta::CPAN

The server host.  Defaults to C<127.0.0.1>.

=item C<port>

The server port.  Defaults to C<1883>.

=item C<timeout>

The timeout for responses from the server.

=item C<keep_alive_timer>

The keep alive timer.

=item C<user_name>

The user name for the MQTT broker.

=item C<password>

The password for the MQTT broker.

=item C<tls>

misc/t/08-pub-topic-wildcard-hash.t  view on Meta::CPAN

#!/usr/bin/perl
use warnings;
use strict;
use Test::More tests => 9;
use AnyEvent::MQTT;

my $timeout = AnyEvent->timer(after => 5, cb => sub { die "timeout\n" });
my ($test) = ($0 =~ m!([^/]+)$!);
my $topic = '/zqk/test';
my @messages;
my $mqtt = AnyEvent::MQTT->new(host => $ENV{ANYEVENT_MQTT_SERVER},
                               on_error => sub {
                                 warn $_[1], "\n"; die "\n" if ($_[0])
                               },
                               client_id => $test,
                               message_log_callback => sub {
                                 my $str = $_[1]->string;

misc/t/08-pub-topic-wildcard-plus-retain.t  view on Meta::CPAN

#!/usr/bin/perl
use warnings;
use strict;
use Test::More tests => 11;
use AnyEvent::MQTT;

my $timeout = AnyEvent->timer(after => 5, cb => sub { die "timeout\n" });
my ($test) = ($0 =~ m!([^/]+)$!);
my $topic = '/zqk/test';
my @messages;
my $mqtt = AnyEvent::MQTT->new(host => $ENV{ANYEVENT_MQTT_SERVER},
                               on_error => sub {
                                 warn $_[1], "\n"; die "\n" if ($_[0])
                               },
                               client_id => $test,
                               message_log_callback => sub {
                                 my $str = $_[1]->string;

misc/t/08-pub-topic-wildcard-plus.t  view on Meta::CPAN

#!/usr/bin/perl
use warnings;
use strict;
use Test::More tests => 9;
use AnyEvent::MQTT;

my $timeout = AnyEvent->timer(after => 5, cb => sub { die "timeout\n" });
my ($test) = ($0 =~ m!([^/]+)$!);
my $topic = '/zqk/test';
my @messages;
my $mqtt = AnyEvent::MQTT->new(host => $ENV{ANYEVENT_MQTT_SERVER},
                               on_error => sub {
                                 warn $_[1], "\n"; die "\n" if ($_[0])
                               },
                               client_id => $test,
                               message_log_callback => sub {
                                 my $str = $_[1]->string;

misc/t/08-pub-topic-wildcard.t  view on Meta::CPAN

#!/usr/bin/perl
use warnings;
use strict;
use Test::More tests => 9;
use AnyEvent::MQTT;

my $timeout = AnyEvent->timer(after => 5, cb => sub { die "timeout\n" });
my ($test) = ($0 =~ m!([^/]+)$!);
my $topic = '/zqk/test';
my @messages;
my $mqtt = AnyEvent::MQTT->new(host => $ENV{ANYEVENT_MQTT_SERVER},
                               on_error => sub {
                                 warn $_[1], "\n"; die "\n" if ($_[0])
                               },
                               client_id => $test,
                               message_log_callback => sub {
                                 my $str = $_[1]->string;

misc/t/Tester.pm  view on Meta::CPAN

  push @$new, @$logs foreach (1..$conf->{jobs});
  $logs = $new;

  $conf->{topic} ||= '/zqk/test';
  $conf->{host} ||= SERVER;
  $conf->{repeat} ||= REPEAT;
  $conf->{timeout} ||= TIMEOUT * $conf->{repeat} * $conf->{jobs};
  my ($test) = ($0 =~ m!([^/]+)\.t$!);
  $conf->{testname} ||= $test;

  my $timeout = AnyEvent->timer(after => $conf->{timeout},
                                cb => sub { die "timeout\n" });

  foreach my $n (0..($conf->{repeat}-1)) {

    my @pids;
    foreach my $i (0..(@$streams-1)) {
      my $pid = fork;
      die "Fork failed\n" unless (defined $pid);
      if ($pid) {
        push @pids, $pid;

misc/t/Tester.pm  view on Meta::CPAN

    }
  }
  done_testing();
}

sub run_stream {
  my ($conf, $stream, $log) = @_;
  my $cv;
  my $mqtt;
  my %cv = ();
  my %timer = ();
  my $index = 0;
  foreach my $index (0..-1+@$stream) {
    $conf->{index} = $index;
    my $rec = $stream->[$index];
    my $name =
      $rec->{name} || $index.':'.($rec->{action}||'item').'/'.$conf->{pid};
    my $args = $rec->{arguments} || {};
    $_ = replace_conf($_, $conf) foreach (values %$args);
    if ($rec->{action} eq 'connect') {
      $mqtt = AnyEvent::MQTT->new(host => $conf->{host},

misc/t/Tester.pm  view on Meta::CPAN

          is($msg->$k, replace_conf($result->{$k}, $conf),
             '...result '.$k.' - '.$name);
        }
      } else {
        is($msg, replace_conf($result, $conf),
           '...result '.$result.' - '.$name);
      }
    } elsif ($rec->{action} eq 'timeout') {
      my $cvname = $rec->{cvname}||$name;
      $cv{$cvname} = AnyEvent->condvar unless (exists $cv{$cvname});
      $timer{$name} = AnyEvent->timer(after => $rec->{timeout},
                      cb => sub { $cv{$cvname}->send("timeout") });
    } elsif ($rec->{action} eq 'send') {
      ok($cv = $mqtt->_send(%$args, cv => AnyEvent->condvar),
         '...send - '.$name);
      ok($cv->recv, '...sent - '.$name);
      my $cvname = $rec->{cvname}||$name;
      $cv{$cvname} = AnyEvent->condvar;
      my $callback = 'before_'.($rec->{response}||'msg').'_callback';
      $mqtt->{$callback} =
        sub {

t/01-keep-alive.t  view on Meta::CPAN


use_ok('AnyEvent::MQTT');

my $mqtt =
  AnyEvent::MQTT->new(host => $host, port => $port, client_id => 'acme_mqtt');

ok($mqtt, 'instantiate AnyEvent::MQTT object');
my $cv = $mqtt->connect();

is($cv->recv, 1, '... connection handshake complete');
$mqtt->{keep_alive_timer} = 0.2; # hack keep alive timer to avoid long test
$mqtt->_reset_keep_alive_timer(); # reset it
$cv = AnyEvent->condvar;
my $timer = AnyEvent->timer(after => 0.4, cb => sub { $cv->send(1); });
$cv->recv;

$cv = AnyEvent->condvar;
$timer = AnyEvent->timer(after => 0.8, cb => sub { $cv->send(0,'oops'); });
$mqtt->{on_error} = sub { $cv->send(@_); };
my ($fatal, $err) = $cv->recv;
is($fatal, 0, 'keep alive timeout error - non-fatal');
is($err, 'keep alive timeout', 'keep alive timeout error - message');

$mqtt->{keep_alive_timer} = 120; # hack keep alive timer back to default
$cv = AnyEvent->condvar;
my $sub_cv = $mqtt->subscribe(topic => '/t1',
                              callback => sub {
                                my ($topic, $message) = @_;
                                $cv->send($topic.' '.$message);
                              });
is($sub_cv->recv, 0, 'subscribe after reconnect');
is($cv->recv, '/t1 message1', '... received message');

t/03-pub-qos-1.t  view on Meta::CPAN


ok($mqtt, 'instantiate AnyEvent::MQTT object');

$published = AnyEvent->condvar;
my $cv = $mqtt->publish(message => 'message1', topic => '/topic',
                        qos => MQTT_QOS_AT_LEAST_ONCE);
ok($cv, 'simple message publish');
is($cv->recv, 1, '... client complete');
is($published->recv, 1, '... server complete');

$mqtt->{keep_alive_timer} = 0.1;
$published = AnyEvent->condvar;
$cv = $mqtt->publish(message => 'message2', topic => '/topic',
                     qos => MQTT_QOS_AT_LEAST_ONCE);
$mqtt->{keep_alive_timer} = 120;
ok($cv, 'message publish timeout and re-publish');
my $res;
is(test_warn(sub { $res = $cv->recv }),
   'Received PubRec but expected PubAck for message id 2',
   '... unexpected pubrec');
is($res, 1, '... client complete');
is($published->recv, 1, '... server complete');



( run in 1.224 second using v1.01-cache-2.11-cpan-49f99fa48dc )