AnyEvent-MQTT

 view release on metacpan or  search on metacpan

lib/AnyEvent/MQTT.pm  view on Meta::CPAN



use constant DEBUG => $ENV{ANYEVENT_MQTT_DEBUG};
use AnyEvent;
use AnyEvent::Handle;
use Net::MQTT::Constants;
use Net::MQTT::Message;
use Net::MQTT::TopicStore;
use Carp qw/croak carp/;
use Sub::Name;
use Scalar::Util qw/weaken/;


sub new {
  my ($pkg, %p) = @_;
  my $self =
    bless {
           socket => undef,
           host => '127.0.0.1',
           port => '1883',
           timeout => 30,

lib/AnyEvent/MQTT.pm  view on Meta::CPAN

  $_[0]->cleanup;
}


sub cleanup {
  my $self = shift;
  print STDERR "cleanup\n" if DEBUG;
  if ($self->{handle}) {
    my $cv = AnyEvent->condvar;
    my $handle = $self->{handle};
    weaken $handle;
    $cv->cb(sub { $handle->destroy });
    $self->_send(message_type => MQTT_DISCONNECT, cv => $cv);
  }
  delete $self->{handle};
  delete $self->{connected};
  delete $self->{wait};
  delete $self->{_keep_alive_handle};
  delete $self->{_keep_alive_waiting};
  $self->{write_queue} = [];
}

lib/AnyEvent/MQTT.pm  view on Meta::CPAN

  my $error_sub = $handle->{on_error}; # Hack: There is no accessor api
  $handle->on_error(subname 'on_error_for_read_publish_'.$topic =>
                    sub {
                      my ($hdl, $fatal, $msg) = @_;
                      $error_sub->(@_) if ($error_sub);
                      $hdl->destroy;
                      undef $hdl;
                      $cv->send(1);
                    });
  my $weak_self = $self;
  weaken $weak_self;
  my @push_read_args = @{$p{push_read_args}||['line']};
  my $sub; $sub = subname 'push_read_cb_for_'.$topic => sub {
    my ($hdl, $chunk, @args) = @_;
    print STDERR "publish: $chunk => $topic\n" if DEBUG;
    my $send_cv = AnyEvent->condvar;
    print STDERR "publish: message[$chunk] => $topic\n" if DEBUG;
    $weak_self->_send_with_ack({
                           message_type => MQTT_PUBLISH,
                           qos => $qos,
                           retain => $p{retain},

lib/AnyEvent/MQTT.pm  view on Meta::CPAN

  $self->{handle}->push_write($msg->bytes);
  $cv;
}

sub _reset_keep_alive_timer {
  my ($self, $wait) = @_;
  undef $self->{_keep_alive_handle};
  my $method = $wait ? '_keep_alive_timeout' : '_send_keep_alive';
  $self->{_keep_alive_waiting} = $wait;
  my $weak_self = $self;
  weaken $weak_self;
  $self->{_keep_alive_handle} =
    AnyEvent->timer(after => $self->{keep_alive_timer},
                    cb => subname((substr $method, 1).'_cb' =>
                                  sub { $weak_self->$method(@_) }));
}

sub _send_keep_alive {
  my $self = shift;
  print STDERR "Sending: keep alive\n" if DEBUG;
  $self->_send(message_type => MQTT_PINGREQ);

lib/AnyEvent/MQTT.pm  view on Meta::CPAN

  if ($msg) {
    $cv = AnyEvent->condvar unless ($cv);
    $self->_queue_write($msg, $cv);
  } else {
    $self->{connect_cv} = AnyEvent->condvar unless (exists $self->{connect_cv});
    $cv = $self->{connect_cv};
  }
  return $cv if ($self->{handle});

  my $weak_self = $self;
  weaken $weak_self;

  my $hd;
  $hd = $self->{handle} =
    AnyEvent::Handle->new(connect => [$self->{host}, $self->{port}],
                          ($self->{tls} ? (tls => "connect") : ()),
                          on_error => subname('on_error_cb' => sub {
                            my ($handle, $fatal, $message) = @_;
                            print STDERR "handle error $_[1]\n" if DEBUG;
                            $handle->destroy;
                            $weak_self->_error($fatal, 'Error: '.$message, 0);

lib/AnyEvent/MQTT.pm  view on Meta::CPAN

  unless ($msg->return_code == MQTT_CONNECT_ACCEPTED) {
    return $self->_error(1, 'Connection refused: '.$msg->string, 0);
  }
  print STDERR "Connection ready:\n", $msg->string('  '), "\n" if DEBUG;
  $self->_write_now();
  $self->{connected} = 1;
  $self->{connect_cv}->send(1) if ($self->{connect_cv});
  delete $self->{connect_cv};

  my $weak_self = $self;
  weaken $weak_self;

  $handle->on_drain(subname 'on_drain_cb' => sub {
                      print STDERR "drained\n" if DEBUG;
                      my $w = $weak_self->{_waiting};
                      $w->[1]->send(1) if (ref $w && defined $w->[1]);
                      $weak_self->_write_now;
                      1;
                    });

  # handle reconnect

lib/AnyEvent/MQTT.pm  view on Meta::CPAN

  $rec->{cv}->send(1);
  return 1;
}


sub anyevent_read_type {
  my ($handle, $cb) = @_;
  subname 'anyevent_read_type_reader' => sub {
    my ($handle) = @_;
    my $rbuf = \$handle->{rbuf};
    weaken $rbuf;
    return unless (defined $$rbuf);
    while (1) {
      my $msg = Net::MQTT::Message->new_from_bytes($$rbuf, 1);
      last unless ($msg);
      $cb->($handle, $msg);
    }
    return;
  };
}

t/01-close-connection.t  view on Meta::CPAN

#!/usr/bin/perl
#
# Copyright (C) 2011 by Mark Hindess

use strict;
use constant {
  DEBUG => $ENV{ANYEVENT_MQTT_TEST_DEBUG}
};
use Net::MQTT::Constants;
use Errno qw/EPIPE/;
use Scalar::Util qw/weaken/;

$|=1;

BEGIN {
  require Test::More;
  $ENV{PERL_ANYEVENT_MODEL} = 'Perl' unless ($ENV{PERL_ANYEVENT_MODEL});
  eval { require AnyEvent; import AnyEvent;
         require AnyEvent::Socket; import AnyEvent::Socket };
  if ($@) {
    import Test::More skip_all => 'No AnyEvent::Socket module installed: $@';

t/01-errors.t  view on Meta::CPAN

#!/usr/bin/perl
#
# Copyright (C) 2011 by Mark Hindess

use strict;
use constant {
  DEBUG => $ENV{ANYEVENT_MQTT_TEST_DEBUG}
};
use Net::MQTT::Constants;
use Scalar::Util qw/weaken/;

$|=1;

BEGIN {
  require Test::More;
  $ENV{PERL_ANYEVENT_MODEL} = 'Perl' unless ($ENV{PERL_ANYEVENT_MODEL});
  eval { require AnyEvent; import AnyEvent;
         require AnyEvent::Socket; import AnyEvent::Socket };
  if ($@) {
    import Test::More skip_all => 'No AnyEvent::Socket module installed: $@';

t/01-publish.t  view on Meta::CPAN

#
# Copyright (C) 2011 by Mark Hindess

use strict;
use constant {
  DEBUG => $ENV{ANYEVENT_MQTT_TEST_DEBUG}
};
use File::Temp qw/tempfile/;
use Net::MQTT::Constants;
use Errno qw/EPIPE/;
use Scalar::Util qw/weaken/;

$|=1;

BEGIN {
  require Test::More;
  $ENV{PERL_ANYEVENT_MODEL} = 'Perl' unless ($ENV{PERL_ANYEVENT_MODEL});
  eval { require AnyEvent; import AnyEvent;
         require AnyEvent::Socket; import AnyEvent::Socket };
  if ($@) {
    import Test::More skip_all => 'No AnyEvent::Socket module installed: $@';

t/01-publish.t  view on Meta::CPAN

ok($cv, 'simple message publish');
is($cv->recv, 1, '... client complete');
is($published->recv, 1, '... server complete');

my $fh = tempfile();
syswrite $fh, "message2\n";
sysseek $fh, 0, 0;

$published = AnyEvent->condvar;
my $eof = AnyEvent->condvar;
my $weak_eof = $eof; weaken $weak_eof;
my $pcv =
  $mqtt->publish(handle => $fh, topic => '/topic',
                 qos => MQTT_QOS_AT_MOST_ONCE,
                 handle_args => [ on_error => sub {
                                    my ($hdl, $fatal, $msg) = @_;
                                    # error on fh close as
                                    # readers are waiting
                                    $weak_eof->send($!{EPIPE});
                                    $hdl->destroy;
                                  }]);

t/01-publish.t  view on Meta::CPAN

ok($eof->recv, '... expected broken pipe');
ok($pcv->recv, '... client complete');
is($published->recv, 2, '... server complete');

sysseek $fh, 0, 0;
syswrite $fh, "message3\0";
sysseek $fh, 0, 0;

$published = AnyEvent->condvar;
$eof = AnyEvent->condvar;
$weak_eof = $eof; weaken $weak_eof;
my $handle;
$handle = AnyEvent::Handle->new(fh => $fh,
                                on_error => sub {
                                  my ($hdl, $fatal, $msg) = @_;
                                  # error on fh close as
                                  # readers are waiting
                                  $eof->send($!{EPIPE});
                                  $hdl->destroy;
                                });
$pcv = $mqtt->publish(handle => $handle, topic => '/topic',



( run in 0.303 second using v1.01-cache-2.11-cpan-65fba6d93b7 )