Beekeeper

 view release on metacpan or  search on metacpan

t/lib/Tests/Mqtt.pm  view on Meta::CPAN

package Tests::Mqtt;

use strict;
use warnings;

use base 'Tests::Service::Base';
use Tests::Service::Config;
use Beekeeper::MQTT;

use Test::More;
use Time::HiRes 'sleep';
use Data::Dumper;

my $DEBUG = 0;

my $bus_config;

sub read_bus_config : Test(startup => 1) {
    my $self = shift;

    $bus_config = Beekeeper::Config->get_bus_config( bus_id => 'test' );

    ok( $bus_config->{host}, "Read bus config, connecting to " . $bus_config->{host});
}

sub async_wait {
    my ($self, $time) = @_;
    $time *= 10 if $self->automated_testing;
    my $cv = AnyEvent->condvar; 
    my $tmr = AnyEvent->timer( after => $time, cb => $cv ); 
    $cv->recv;
}

sub test_01_topic : Test(3) {
    my $self = shift;

    my $bus1 = Beekeeper::MQTT->new( %$bus_config );
    my $bus2 = Beekeeper::MQTT->new( %$bus_config );

    $bus1->connect( blocking => 1 );
    $bus2->connect( blocking => 1 );

    my ($cv, $tmr);
    my @received;

    $bus1->subscribe(
        topic => 'msg/bar',
        on_publish => sub {
            my ($payload, $properties) = @_;
            push @received, {
                bus        => 1,
                properties => { %$properties },
                payload    => $$payload,
            };
        },
    );

    $bus2->subscribe(
        topic => 'msg/bar',
        on_publish => sub {
            my ($payload, $properties) = @_;
            push @received, {
                bus        => 2,
                properties => { %$properties },
                payload    => $$payload,
            };
        },
    );

    $self->async_wait( 0.2 );

    $bus1->publish(
        topic   => 'msg/bar',
        payload => 'Hello 1',
    );

    $self->async_wait( 0.2 );

    is( scalar(@received), 2, "Received 2 messages from topic");
    is( $received[0]->{payload}, 'Hello 1', "got message");
    is( $received[1]->{payload}, 'Hello 1', "got message");

    # $DEBUG && diag Dumper \@received;

    $bus1->disconnect;
    $bus2->disconnect;
}

sub test_02_topic_wildcard : Test(7) {
    my $self = shift;



( run in 2.049 seconds using v1.01-cache-2.11-cpan-75ffa21a3d4 )