AnyEvent-MQTT

 view release on metacpan or  search on metacpan

MANIFEST  view on Meta::CPAN

misc/t/08-pub-topic-wildcard-hash.t
misc/t/08-pub-topic-wildcard-plus-retain.t
misc/t/08-pub-topic-wildcard-plus.t
misc/t/08-pub-topic-wildcard.t
misc/t/09-ping.t
misc/t/Tester.pm
misc/test-server
t/01-close-connection.t
t/01-connect-error.t
t/01-errors.t
t/01-keep-alive.t
t/01-publish.t
t/01-subscribe.t
t/01-timeout.t
t/01-unexpected.t
t/02-dup-sub-callback.t
t/02-sub-wildcard.t
t/03-pub-qos-1.t
t/03-pub-qos-2.t
t/03-sub-qos-1.t
t/03-sub-qos-2.t

README  view on Meta::CPAN

      The server host. Defaults to 127.0.0.1.

    port

      The server port. Defaults to 1883.

    timeout

      The timeout for responses from the server.

    keep_alive_timer

      The keep alive timer.

    user_name

      The user name for the MQTT broker.

    password

      The password for the MQTT broker.

    tls

bin/anyevent-mqtt-monitor  view on Meta::CPAN


my $xpl;
my $help;
my $man;
my $verbose = 0;
my $retain = 1;
my $history_size = 20;
my $host = '127.0.0.1';
my $port = 1883;
my $qos = MQTT_QOS_AT_MOST_ONCE;
my $keep_alive_timer = 120;
GetOptions('help|?' => \$help,
           'man' => \$man,
           'verbose+' => \$verbose,
           'retain!' => \$retain,
           'history-size=i' => \$history_size,
           'host=s' => \$host,
           'port=i' => \$port,
           'qos=i' => \$qos,
           'keepalive=i' => \$keep_alive_timer) or pod2usage(2);
pod2usage(1) if ($help);
pod2usage(-exitstatus => 0, -verbose => 2) if $man;

my $mqtt =
  AnyEvent::MQTT->new(host => $host, port => $port,
                      keep_alive_timer => $keep_alive_timer,
                      on_error => sub {
                        my ($fatal, $message) = @_;
                        if ($fatal) {
                          die $message, "\n";
                        } else {
                          warn $message, "\n";
                        }
                      });

foreach my $topic (scalar @ARGV ? @ARGV : '#') {

bin/anyevent-mqtt-monitor  view on Meta::CPAN


=item B<-qos N>

The QoS level for the published message.  The default is
0 (C<MQTT_QOS_AT_MOST_ONCE>).

=item B<-verbose>

Include more verbose output.

=item B<-keepalive NNN>

The keep alive timer value.  Defaults to 120 seconds.  For simplicity,
it is also currently used as the connection/subscription timeout.

=item B<--history-size NNN>

Number of messages to keep for each topic.  Defaults to keeping 20 messages.

=item B<--no-retain>

Ignore retained messages.  That is, wait for new messages rather than
processing existing retained messages.

bin/anyevent-mqtt-pub  view on Meta::CPAN

use AnyEvent::MQTT;
use Getopt::Long;
use Pod::Usage;

my $help;
my $man;
my $verbose = 0;
my $host = '127.0.0.1';
my $port = 1883;
my $qos = MQTT_QOS_AT_MOST_ONCE;
my $keep_alive_timer = 120;
my $retain;
my $client_id;
GetOptions('help|?' => \$help,
           'man' => \$man,
           'verbose+' => \$verbose,
           'host=s' => \$host,
           'port=i' => \$port,
           'qos=i' => \$qos,
           'keepalive=i' => \$keep_alive_timer,
           'retain' => \$retain,
           'client_id|client-id|C=s' => \$client_id,
          ) or pod2usage(2);
pod2usage(1) if ($help);
pod2usage(-exitstatus => 0, -verbose => 2) if $man;
my $topic = shift || pod2usage(2); # need a topic

my @args;
push @args, client_id => $client_id if (defined $client_id);
my $mqtt =
  AnyEvent::MQTT->new(host => $host, port => $port,
                      keep_alive_timer => $keep_alive_timer,
                      @args,
                      on_error => sub {
                        my ($fatal, $message) = @_;
                        if ($fatal) {
                          die $message, "\n";
                        } else {
                          warn $message, "\n";
                        }
                      });

bin/anyevent-mqtt-pub  view on Meta::CPAN

=item B<-qos N>

The QoS level for the published message.  The default is
0 (C<MQTT_QOS_AT_MOST_ONCE>).

=item B<-verbose>

Include more verbose output.  Without this option the script only
outputs errors

=item B<-keepalive NNN>

The keep alive timer value.  Defaults to 120 seconds.  For simplicity,
it is also currently used as the connection timeout.

=item B<-retain>

Set the retain flag on the message.  Default is not set.

=back

=head1 SEE ALSO

bin/anyevent-mqtt-sub  view on Meta::CPAN

use Pod::Usage;

my $help;
my $man;
my $verbose = 0;
my $host = '127.0.0.1';
my $port = 1883;
my $retain = 1;
my $qos = MQTT_QOS_AT_MOST_ONCE;
my $count;
my $keep_alive_timer = 120;
my $client_id;
my $code;
GetOptions('help|?' => \$help,
           'man' => \$man,
           'verbose+' => \$verbose,
           'host=s' => \$host,
           'port=i' => \$port,
           'retain!' => \$retain,
           'qos=i' => \$qos,
           'count=i' => \$count,
           'one|1' => sub { $count = 1 },
           'keepalive=i' => \$keep_alive_timer,
           'client_id|client-id|C=s' => \$client_id,
           'code|e=s' => \$code) or pod2usage(2);
pod2usage(1) if ($help);
pod2usage(-exitstatus => 0, -verbose => 2) if $man;
pod2usage(2) unless (@ARGV); # need a topic

my @args;
push @args, client_id => $client_id if (defined $client_id);
my $mqtt =
  AnyEvent::MQTT->new(host => $host, port => $port,
                      keep_alive_timer => $keep_alive_timer,
                      @args,
                      on_error => sub {
                        my ($fatal, $message) = @_;
                        if ($fatal) {
                          die $message, "\n";
                        } else {
                          warn $message, "\n";
                        }
                      });

bin/anyevent-mqtt-sub  view on Meta::CPAN


  topic message

With one B<-verbose> options, publish messages are printed in a form
of a summary of the header fields and the payload in hex dump and text
form.

With two B<-verbose> options, summaries are printed for all messages
sent and received.

=item B<-keepalive NNN>

The keep alive timer value.  Defaults to 120 seconds.  For simplicity,
it is also currently used as the connection/subscription timeout.

=item B<-count NNN>

Read the specificed number of MQTT messages and then exit.  Default
is 0 - read forever.

=item B<-one> or B<-1>

Short for B<-count 1>.  Read one message and exit.

lib/AnyEvent/MQTT.pm  view on Meta::CPAN


sub new {
  my ($pkg, %p) = @_;
  my $self =
    bless {
           socket => undef,
           host => '127.0.0.1',
           port => '1883',
           timeout => 30,
           wait => 'nothing',
           keep_alive_timer => 120,
           qos => MQTT_QOS_AT_MOST_ONCE,
           message_id => 1,
           user_name => undef,
           password => undef,
           tls => undef,
           will_topic => undef,
           will_qos => MQTT_QOS_AT_MOST_ONCE,
           will_retain => 0,
           will_message => '',
           client_id => undef,

lib/AnyEvent/MQTT.pm  view on Meta::CPAN

  if ($self->{handle}) {
    my $cv = AnyEvent->condvar;
    my $handle = $self->{handle};
    weaken $handle;
    $cv->cb(sub { $handle->destroy });
    $self->_send(message_type => MQTT_DISCONNECT, cv => $cv);
  }
  delete $self->{handle};
  delete $self->{connected};
  delete $self->{wait};
  delete $self->{_keep_alive_handle};
  delete $self->{_keep_alive_waiting};
  $self->{write_queue} = [];
}

sub _error {
  my ($self, $fatal, $message, $reconnect) = @_;
  $self->cleanup($message);
  $self->{on_error}->($fatal, $message) if ($self->{on_error});
  $self->_reconnect() if ($reconnect);
}

lib/AnyEvent/MQTT.pm  view on Meta::CPAN

    }
    my $mid = $args->{message_id};
    my $send_cv = AnyEvent->condvar;
    $send_cv->cb(subname 'ack_cb_for_'.$mid => sub {
                   $self->{inflight}->{$mid} =
                     {
                      expect => $expect,
                      message => $args,
                      cv => $cv,
                      timeout =>
                        AnyEvent->timer(after => $self->{keep_alive_timer},
                                        cb => subname 'ack_timeout_for_'.$mid =>
                                        sub {
                          print ref $self, " timeout waiting for ",
                            message_type_string($expect), "\n" if DEBUG;
                          delete $self->{inflight}->{$mid};
                          $self->_send_with_ack($args, $cv, $expect, 1);
                        }),
                     };
                   });
    $args->{cv} = $send_cv;

lib/AnyEvent/MQTT.pm  view on Meta::CPAN

sub _write_now {
  my $self = shift;
  my ($msg, $cv);
  undef $self->{_waiting};
  if (@_) {
    ($msg, $cv) = @_;
  } else {
    my $args = shift @{$self->{write_queue}} or return;
    ($msg, $cv) = @$args;
  }
  $self->_reset_keep_alive_timer();
  print STDERR "Sending: ", $msg->string, "\n" if DEBUG;
  $self->{message_log_callback}->('>', $msg) if ($self->{message_log_callback});
  $self->{_waiting} = [$msg, $cv];
  print '  ', (unpack 'H*', $msg->bytes), "\n" if DEBUG;
  $self->{handle}->push_write($msg->bytes);
  $cv;
}

sub _reset_keep_alive_timer {
  my ($self, $wait) = @_;
  undef $self->{_keep_alive_handle};
  my $method = $wait ? '_keep_alive_timeout' : '_send_keep_alive';
  $self->{_keep_alive_waiting} = $wait;
  my $weak_self = $self;
  weaken $weak_self;
  $self->{_keep_alive_handle} =
    AnyEvent->timer(after => $self->{keep_alive_timer},
                    cb => subname((substr $method, 1).'_cb' =>
                                  sub { $weak_self->$method(@_) }));
}

sub _send_keep_alive {
  my $self = shift;
  print STDERR "Sending: keep alive\n" if DEBUG;
  $self->_send(message_type => MQTT_PINGREQ);
  $self->_reset_keep_alive_timer(1);
}

sub _keep_alive_timeout {
  my $self = shift;
  print STDERR "keep alive timeout\n" if DEBUG;
  undef $self->{_keep_alive_waiting};
  $self->{handle}->destroy;
  $self->_error(0, 'keep alive timeout', 1);
}

sub _keep_alive_received {
  my $self = shift;
  print STDERR "keep alive received\n" if DEBUG;
  return unless (defined $self->{_keep_alive_waiting});
  $self->_reset_keep_alive_timer();
}


sub connect {
  my ($self, $msg, $cv) = @_;
  print STDERR "connect\n" if DEBUG;
  $self->{_waiting} = 'connect';
  if ($msg) {
    $cv = AnyEvent->condvar unless ($cv);
    $self->_queue_write($msg, $cv);

lib/AnyEvent/MQTT.pm  view on Meta::CPAN

                            $weak_self->{wait} = 'nothing';
                          }),
                          on_connect => subname('on_connect_cb' => sub {
                            my ($handle, $host, $port, $retry) = @_;
                            print STDERR "TCP handshake complete\n" if DEBUG;
                            # call user-defined on_connect function.
                            $weak_self->{on_connect}->($handle, $retry) if $weak_self->{on_connect};
                            my $msg =
                              Net::MQTT::Message->new(
                                message_type => MQTT_CONNECT,
                                keep_alive_timer => $weak_self->{keep_alive_timer},
                                client_id => $weak_self->{client_id},
                                clean_session => $weak_self->{clean_session},
                                will_topic => $weak_self->{will_topic},
                                will_qos => $weak_self->{will_qos},
                                will_retain => $weak_self->{will_retain},
                                will_message => $weak_self->{will_message},
                                user_name => $weak_self->{user_name},
                                password => $weak_self->{password},
                              );
                            $weak_self->_write_now($msg);

lib/AnyEvent/MQTT.pm  view on Meta::CPAN

    print STDERR "Resubscribing to '$topic':\n" if DEBUG;
    for my $cb (values %{$rec->{cb}}) {
      $self->subscribe(topic => $topic, callback => $cb, qos => $rec->{qos});
    }
  }
  delete $self->{_sub_reconnect};
  return
}

sub _process_pingresp {
  shift->_keep_alive_received();
}

sub _process_suback {
  my ($self, $handle, $msg, $error) = @_;
  print STDERR "Confirmed subscription:\n", $msg->string('  '), "\n" if DEBUG;
  $self->_confirm_subscription($msg->message_id, $msg->qos_levels->[0]);
  return
}

sub _process_unsuback {

lib/AnyEvent/MQTT.pm  view on Meta::CPAN

The server host.  Defaults to C<127.0.0.1>.

=item C<port>

The server port.  Defaults to C<1883>.

=item C<timeout>

The timeout for responses from the server.

=item C<keep_alive_timer>

The keep alive timer.

=item C<user_name>

The user name for the MQTT broker.

=item C<password>

The password for the MQTT broker.

=item C<tls>

t/01-keep-alive.t  view on Meta::CPAN


use_ok('AnyEvent::MQTT');

my $mqtt =
  AnyEvent::MQTT->new(host => $host, port => $port, client_id => 'acme_mqtt');

ok($mqtt, 'instantiate AnyEvent::MQTT object');
my $cv = $mqtt->connect();

is($cv->recv, 1, '... connection handshake complete');
$mqtt->{keep_alive_timer} = 0.2; # hack keep alive timer to avoid long test
$mqtt->_reset_keep_alive_timer(); # reset it
$cv = AnyEvent->condvar;
my $timer = AnyEvent->timer(after => 0.4, cb => sub { $cv->send(1); });
$cv->recv;

$cv = AnyEvent->condvar;
$timer = AnyEvent->timer(after => 0.8, cb => sub { $cv->send(0,'oops'); });
$mqtt->{on_error} = sub { $cv->send(@_); };
my ($fatal, $err) = $cv->recv;
is($fatal, 0, 'keep alive timeout error - non-fatal');
is($err, 'keep alive timeout', 'keep alive timeout error - message');

$mqtt->{keep_alive_timer} = 120; # hack keep alive timer back to default
$cv = AnyEvent->condvar;
my $sub_cv = $mqtt->subscribe(topic => '/t1',
                              callback => sub {
                                my ($topic, $message) = @_;
                                $cv->send($topic.' '.$message);
                              });
is($sub_cv->recv, 0, 'subscribe after reconnect');
is($cv->recv, '/t1 message1', '... received message');

t/03-pub-qos-1.t  view on Meta::CPAN

   [
    [ packrecv => '10 17 00 06  4D 51 49 73   64 70 03 02  00 78 00 09
                   61 63 6D 65  5F 6D 71 74   74', q{connect} ],
    [ packsend => '20 02 00 00', q{connack} ],
    [ packrecv => '32 12 00 06  2F 74 6F 70   69 63 00 01  6D 65 73 73
                   61 67 65 31', q{publish} ],
    [ packsend => '40 02 00 01', q{puback} ],
    [ code => sub { $published->send(1) }, q{puback done} ],
    [ packrecv => '32 12 00 06  2F 74 6F 70   69 63 00 02  6D 65 73 73
                   61 67 65 32', q{publish} ],
    [ packrecv => 'C0 00', q{keepalive - pingreq} ],
    [ packsend => 'D0 00', q{keepalive - pingresp} ],
    [ packrecv => '3A 12 00 06  2F 74 6F 70   69 63 00 02  6D 65 73 73
                   61 67 65 32', q{publish} ],
    [ packsend => '50 02 00 02', q{pubrec} ],
    [ packsend => '40 02 00 02', q{puback} ],
    [ code => sub { $published->send(1) }, q{pubrec} ],
   ],
  );

my $server;
eval { $server = AnyEvent::MockTCPServer->new(connections => \@connections); };

t/03-pub-qos-1.t  view on Meta::CPAN


ok($mqtt, 'instantiate AnyEvent::MQTT object');

$published = AnyEvent->condvar;
my $cv = $mqtt->publish(message => 'message1', topic => '/topic',
                        qos => MQTT_QOS_AT_LEAST_ONCE);
ok($cv, 'simple message publish');
is($cv->recv, 1, '... client complete');
is($published->recv, 1, '... server complete');

$mqtt->{keep_alive_timer} = 0.1;
$published = AnyEvent->condvar;
$cv = $mqtt->publish(message => 'message2', topic => '/topic',
                     qos => MQTT_QOS_AT_LEAST_ONCE);
$mqtt->{keep_alive_timer} = 120;
ok($cv, 'message publish timeout and re-publish');
my $res;
is(test_warn(sub { $res = $cv->recv }),
   'Received PubRec but expected PubAck for message id 2',
   '... unexpected pubrec');
is($res, 1, '... client complete');
is($published->recv, 1, '... server complete');

t/author-eol.t  view on Meta::CPAN

use Test::EOL;

my @files = (
    'bin/anyevent-mqtt-monitor',
    'bin/anyevent-mqtt-pub',
    'bin/anyevent-mqtt-sub',
    'lib/AnyEvent/MQTT.pm',
    't/01-close-connection.t',
    't/01-connect-error.t',
    't/01-errors.t',
    't/01-keep-alive.t',
    't/01-publish.t',
    't/01-subscribe.t',
    't/01-timeout.t',
    't/01-unexpected.t',
    't/02-dup-sub-callback.t',
    't/02-sub-wildcard.t',
    't/03-pub-qos-1.t',
    't/03-pub-qos-2.t',
    't/03-sub-qos-1.t',
    't/03-sub-qos-2.t',

t/author-no-tabs.t  view on Meta::CPAN

use Test::NoTabs;

my @files = (
    'bin/anyevent-mqtt-monitor',
    'bin/anyevent-mqtt-pub',
    'bin/anyevent-mqtt-sub',
    'lib/AnyEvent/MQTT.pm',
    't/01-close-connection.t',
    't/01-connect-error.t',
    't/01-errors.t',
    't/01-keep-alive.t',
    't/01-publish.t',
    't/01-subscribe.t',
    't/01-timeout.t',
    't/01-unexpected.t',
    't/02-dup-sub-callback.t',
    't/02-sub-wildcard.t',
    't/03-pub-qos-1.t',
    't/03-pub-qos-2.t',
    't/03-sub-qos-1.t',
    't/03-sub-qos-2.t',



( run in 1.622 second using v1.01-cache-2.11-cpan-df04353d9ac )