Beekeeper
view release on metacpan or search on metacpan
t/lib/Tests/Mqtt.pm view on Meta::CPAN
package Tests::Mqtt;
use strict;
use warnings;
use base 'Tests::Service::Base';
use Tests::Service::Config;
use Beekeeper::MQTT;
use Test::More;
use Time::HiRes 'sleep';
use Data::Dumper;
my $DEBUG = 0;
my $bus_config;
sub read_bus_config : Test(startup => 1) {
my $self = shift;
$bus_config = Beekeeper::Config->get_bus_config( bus_id => 'test' );
ok( $bus_config->{host}, "Read bus config, connecting to " . $bus_config->{host});
}
sub async_wait {
my ($self, $time) = @_;
$time *= 10 if $self->automated_testing;
my $cv = AnyEvent->condvar;
my $tmr = AnyEvent->timer( after => $time, cb => $cv );
$cv->recv;
}
sub test_01_topic : Test(3) {
my $self = shift;
my $bus1 = Beekeeper::MQTT->new( %$bus_config );
my $bus2 = Beekeeper::MQTT->new( %$bus_config );
$bus1->connect( blocking => 1 );
$bus2->connect( blocking => 1 );
my ($cv, $tmr);
my @received;
$bus1->subscribe(
topic => 'msg/bar',
on_publish => sub {
my ($payload, $properties) = @_;
push @received, {
bus => 1,
properties => { %$properties },
payload => $$payload,
};
},
);
$bus2->subscribe(
topic => 'msg/bar',
on_publish => sub {
my ($payload, $properties) = @_;
push @received, {
bus => 2,
properties => { %$properties },
payload => $$payload,
};
},
);
$self->async_wait( 0.2 );
$bus1->publish(
topic => 'msg/bar',
payload => 'Hello 1',
);
$self->async_wait( 0.2 );
is( scalar(@received), 2, "Received 2 messages from topic");
is( $received[0]->{payload}, 'Hello 1', "got message");
is( $received[1]->{payload}, 'Hello 1', "got message");
# $DEBUG && diag Dumper \@received;
$bus1->disconnect;
$bus2->disconnect;
}
sub test_02_topic_wildcard : Test(7) {
my $self = shift;
( run in 2.049 seconds using v1.01-cache-2.11-cpan-75ffa21a3d4 )