Beekeeper

 view release on metacpan or  search on metacpan

t/lib/Tests/Mqtt.pm  view on Meta::CPAN

        payload      => '',
        $utf8_string => 'utf8_key',
    );

    $self->async_wait( 0.2 );

    is( scalar(@received), 4, "Received 4 messages from topic");

    ok( $received[0]->{payload}, 'Received string' );
    ok( $received[1]->{payload}, 'Received blob' );

    is( length($received[0]->{payload}), 1, 'Received string length is 1 char' );
    is( length($received[1]->{payload}), 3, 'Received blob length is 3 bytes' );

    is( $received[0]->{payload}, $utf8_string, "Got correct utf8 string");
    is( $received[1]->{payload}, $binary_blob, "Got correct binary blob");

    is( $received[2]->{properties}->{'utf8_value'}, $utf8_string, "Got utf8 property value");
    is( $received[3]->{properties}->{$utf8_string}, 'utf8_key',   "Got utf8 property key");

    # $DEBUG && diag Dumper \@received;

    $bus->disconnect;
}

sub test_07_big_message : Test(4) {
    my $self = shift;

    my $bus = Beekeeper::MQTT->new( %$bus_config );

    $bus->connect( blocking => 1 );

    my @received;

    $bus->subscribe(
        topic => 'msg/bar',
        on_publish => sub {
            my ($payload, $properties) = @_;
            push @received, {
                properties => { %$properties },
                payload    => $$payload,
            };
        },
    );

    my $data = 'X' x 1048576;

    $bus->publish(
        topic      => 'msg/bar',
        payload    => \$data,
    );

    $self->async_wait( 0.2 );

    is( scalar(@received), 1, "Received 1 message from topic");
    is( length( $received[0]->{payload} ), 1048576, "Got a 1 MiB message");


    if ($ENV{'PATH'} =~ m|^/home/david/|) {
        # The following test is important: it forces the event loop to send and
        # receive a message in multiple iterations, because it is bigger than OS 
        # buffers. So skip it only on selected smokers that are too slow to pass
        return "This test does not run reliably on constrained platforms";
    }

    $data = 'X' x 10485760;

    $bus->publish(
        topic      => 'msg/bar',
        payload    => \$data,
    );

    $self->async_wait( 1 );

    is( scalar(@received), 2, "Received 1 message from topic");
    is( length( $received[1]->{payload} ), 10485760, "Got a 10 MiB message");

    $bus->disconnect;
}

1;



( run in 0.647 second using v1.01-cache-2.11-cpan-71847e10f99 )