view release on metacpan or search on metacpan
lib/AnyEvent/MQTT.pm view on Meta::CPAN
use constant DEBUG => $ENV{ANYEVENT_MQTT_DEBUG};
use AnyEvent;
use AnyEvent::Handle;
use Net::MQTT::Constants;
use Net::MQTT::Message;
use Net::MQTT::TopicStore;
use Carp qw/croak carp/;
use Sub::Name;
use Scalar::Util qw/weaken/;
sub new {
my ($pkg, %p) = @_;
my $self =
bless {
socket => undef,
host => '127.0.0.1',
port => '1883',
timeout => 30,
lib/AnyEvent/MQTT.pm view on Meta::CPAN
$_[0]->cleanup;
}
sub cleanup {
my $self = shift;
print STDERR "cleanup\n" if DEBUG;
if ($self->{handle}) {
my $cv = AnyEvent->condvar;
my $handle = $self->{handle};
weaken $handle;
$cv->cb(sub { $handle->destroy });
$self->_send(message_type => MQTT_DISCONNECT, cv => $cv);
}
delete $self->{handle};
delete $self->{connected};
delete $self->{wait};
delete $self->{_keep_alive_handle};
delete $self->{_keep_alive_waiting};
$self->{write_queue} = [];
}
lib/AnyEvent/MQTT.pm view on Meta::CPAN
my $error_sub = $handle->{on_error}; # Hack: There is no accessor api
$handle->on_error(subname 'on_error_for_read_publish_'.$topic =>
sub {
my ($hdl, $fatal, $msg) = @_;
$error_sub->(@_) if ($error_sub);
$hdl->destroy;
undef $hdl;
$cv->send(1);
});
my $weak_self = $self;
weaken $weak_self;
my @push_read_args = @{$p{push_read_args}||['line']};
my $sub; $sub = subname 'push_read_cb_for_'.$topic => sub {
my ($hdl, $chunk, @args) = @_;
print STDERR "publish: $chunk => $topic\n" if DEBUG;
my $send_cv = AnyEvent->condvar;
print STDERR "publish: message[$chunk] => $topic\n" if DEBUG;
$weak_self->_send_with_ack({
message_type => MQTT_PUBLISH,
qos => $qos,
retain => $p{retain},
lib/AnyEvent/MQTT.pm view on Meta::CPAN
$self->{handle}->push_write($msg->bytes);
$cv;
}
sub _reset_keep_alive_timer {
my ($self, $wait) = @_;
undef $self->{_keep_alive_handle};
my $method = $wait ? '_keep_alive_timeout' : '_send_keep_alive';
$self->{_keep_alive_waiting} = $wait;
my $weak_self = $self;
weaken $weak_self;
$self->{_keep_alive_handle} =
AnyEvent->timer(after => $self->{keep_alive_timer},
cb => subname((substr $method, 1).'_cb' =>
sub { $weak_self->$method(@_) }));
}
sub _send_keep_alive {
my $self = shift;
print STDERR "Sending: keep alive\n" if DEBUG;
$self->_send(message_type => MQTT_PINGREQ);
lib/AnyEvent/MQTT.pm view on Meta::CPAN
if ($msg) {
$cv = AnyEvent->condvar unless ($cv);
$self->_queue_write($msg, $cv);
} else {
$self->{connect_cv} = AnyEvent->condvar unless (exists $self->{connect_cv});
$cv = $self->{connect_cv};
}
return $cv if ($self->{handle});
my $weak_self = $self;
weaken $weak_self;
my $hd;
$hd = $self->{handle} =
AnyEvent::Handle->new(connect => [$self->{host}, $self->{port}],
($self->{tls} ? (tls => "connect") : ()),
on_error => subname('on_error_cb' => sub {
my ($handle, $fatal, $message) = @_;
print STDERR "handle error $_[1]\n" if DEBUG;
$handle->destroy;
$weak_self->_error($fatal, 'Error: '.$message, 0);
lib/AnyEvent/MQTT.pm view on Meta::CPAN
unless ($msg->return_code == MQTT_CONNECT_ACCEPTED) {
return $self->_error(1, 'Connection refused: '.$msg->string, 0);
}
print STDERR "Connection ready:\n", $msg->string(' '), "\n" if DEBUG;
$self->_write_now();
$self->{connected} = 1;
$self->{connect_cv}->send(1) if ($self->{connect_cv});
delete $self->{connect_cv};
my $weak_self = $self;
weaken $weak_self;
$handle->on_drain(subname 'on_drain_cb' => sub {
print STDERR "drained\n" if DEBUG;
my $w = $weak_self->{_waiting};
$w->[1]->send(1) if (ref $w && defined $w->[1]);
$weak_self->_write_now;
1;
});
# handle reconnect
lib/AnyEvent/MQTT.pm view on Meta::CPAN
$rec->{cv}->send(1);
return 1;
}
sub anyevent_read_type {
my ($handle, $cb) = @_;
subname 'anyevent_read_type_reader' => sub {
my ($handle) = @_;
my $rbuf = \$handle->{rbuf};
weaken $rbuf;
return unless (defined $$rbuf);
while (1) {
my $msg = Net::MQTT::Message->new_from_bytes($$rbuf, 1);
last unless ($msg);
$cb->($handle, $msg);
}
return;
};
}
t/01-close-connection.t view on Meta::CPAN
#!/usr/bin/perl
#
# Copyright (C) 2011 by Mark Hindess
use strict;
use constant {
DEBUG => $ENV{ANYEVENT_MQTT_TEST_DEBUG}
};
use Net::MQTT::Constants;
use Errno qw/EPIPE/;
use Scalar::Util qw/weaken/;
$|=1;
BEGIN {
require Test::More;
$ENV{PERL_ANYEVENT_MODEL} = 'Perl' unless ($ENV{PERL_ANYEVENT_MODEL});
eval { require AnyEvent; import AnyEvent;
require AnyEvent::Socket; import AnyEvent::Socket };
if ($@) {
import Test::More skip_all => 'No AnyEvent::Socket module installed: $@';
t/01-errors.t view on Meta::CPAN
#!/usr/bin/perl
#
# Copyright (C) 2011 by Mark Hindess
use strict;
use constant {
DEBUG => $ENV{ANYEVENT_MQTT_TEST_DEBUG}
};
use Net::MQTT::Constants;
use Scalar::Util qw/weaken/;
$|=1;
BEGIN {
require Test::More;
$ENV{PERL_ANYEVENT_MODEL} = 'Perl' unless ($ENV{PERL_ANYEVENT_MODEL});
eval { require AnyEvent; import AnyEvent;
require AnyEvent::Socket; import AnyEvent::Socket };
if ($@) {
import Test::More skip_all => 'No AnyEvent::Socket module installed: $@';
t/01-publish.t view on Meta::CPAN
#
# Copyright (C) 2011 by Mark Hindess
use strict;
use constant {
DEBUG => $ENV{ANYEVENT_MQTT_TEST_DEBUG}
};
use File::Temp qw/tempfile/;
use Net::MQTT::Constants;
use Errno qw/EPIPE/;
use Scalar::Util qw/weaken/;
$|=1;
BEGIN {
require Test::More;
$ENV{PERL_ANYEVENT_MODEL} = 'Perl' unless ($ENV{PERL_ANYEVENT_MODEL});
eval { require AnyEvent; import AnyEvent;
require AnyEvent::Socket; import AnyEvent::Socket };
if ($@) {
import Test::More skip_all => 'No AnyEvent::Socket module installed: $@';
t/01-publish.t view on Meta::CPAN
ok($cv, 'simple message publish');
is($cv->recv, 1, '... client complete');
is($published->recv, 1, '... server complete');
my $fh = tempfile();
syswrite $fh, "message2\n";
sysseek $fh, 0, 0;
$published = AnyEvent->condvar;
my $eof = AnyEvent->condvar;
my $weak_eof = $eof; weaken $weak_eof;
my $pcv =
$mqtt->publish(handle => $fh, topic => '/topic',
qos => MQTT_QOS_AT_MOST_ONCE,
handle_args => [ on_error => sub {
my ($hdl, $fatal, $msg) = @_;
# error on fh close as
# readers are waiting
$weak_eof->send($!{EPIPE});
$hdl->destroy;
}]);
t/01-publish.t view on Meta::CPAN
ok($eof->recv, '... expected broken pipe');
ok($pcv->recv, '... client complete');
is($published->recv, 2, '... server complete');
sysseek $fh, 0, 0;
syswrite $fh, "message3\0";
sysseek $fh, 0, 0;
$published = AnyEvent->condvar;
$eof = AnyEvent->condvar;
$weak_eof = $eof; weaken $weak_eof;
my $handle;
$handle = AnyEvent::Handle->new(fh => $fh,
on_error => sub {
my ($hdl, $fatal, $msg) = @_;
# error on fh close as
# readers are waiting
$eof->send($!{EPIPE});
$hdl->destroy;
});
$pcv = $mqtt->publish(handle => $handle, topic => '/topic',