Net-MQTT-Simple

 view release on metacpan or  search on metacpan

lib/Net/MQTT/Simple.pm  view on Meta::CPAN

package Net::MQTT::Simple;

use strict;
use warnings;

use IO::Socket::IP;
use Socket ();

our $VERSION = '1.33';

# Please note that these are not documented and are subject to change:
our $KEEPALIVE_INTERVAL = 60;
our $PING_TIMEOUT = 10;
our $RECONNECT_INTERVAL = 5;
our $MAX_LENGTH = 2097152;    # 2 MB
our $READ_BYTES = 16 * 1024;  # 16 kB per IO::Socket::SSL recommendation
our $WRITE_BYTES = 16 * 1024; # 16 kB per IO::Socket::SSL maximum
our $PROTOCOL_LEVEL = 0x04;   # 0x03 in v3.1, 0x04 in v3.1.1
our $PROTOCOL_NAME = "MQTT";  # MQIsdp in v3.1, MQTT in v3.1.1

my $global;

sub _default_port { 1883 }
sub _socket_class { 'IO::Socket::IP' }
sub _socket_error { "$@" }
sub _secure { 0 }

sub _client_identifier { my ($class) = @_; return "Net::MQTT::Simple[" . $class->{random_id} . "]"; }

# Carp might not be available either.
sub _croak {
    die sprintf "%s at %s line %d.\n", "@_", (caller 1)[1, 2];
}

sub filter_as_regex {
    my ($filter) = @_;

    return "^(?!\\\$)" if $filter eq '#';   # Match everything except /^\$/
    return "^/"        if $filter eq '/#';  # Parent (empty topic) is invalid

    $filter = quotemeta $filter;

    $filter =~ s{ \z (?<! \\ \/ \\ \# ) } {\\z}x;       # Anchor unless /#$/
    $filter =~ s{ \\ \/ \\ \#           } {}x;
    $filter =~ s{ \\ \+                 } {[^/]*+}xg;
    $filter =~ s{ ^ (?= \[ \^ / \] \* ) } {(?!\\\$)}x;  # No /^\$/ if /^\+/

    return "^$filter";
}

sub import {
    my ($class, $server) = @_;
    @_ <= 2 or _croak "Too many arguments for use " . __PACKAGE__;

    $server or return;

    $global = $class->new($server);

    no strict 'refs';
    *{ (caller)[0] . "::publish" }  = \&publish;
    *{ (caller)[0] . "::retain"  }  = \&retain;
    *{ (caller)[0] . "::mqtt_get" } = \&get;
}

sub new {
    my ($class, $server, $sockopts) = @_;
    @_ == 2 or @_ == 3 or _croak "Wrong number of arguments for $class->new";

    my $port = $class->_default_port;

    # Add port for bare IPv6 address
    $server = "[$server]:$port" if $server =~ /:.*:/ and not $server =~ /\[/;

    # Add port for bare IPv4 address or bracketed IPv6 address
    $server .= ":$port" if $server !~ /:/ or $server =~ /^\[.*\]$/;

    # Create a random ID for the instance of the object
    my $random_id = join "", map chr 65 + int rand 26, 1 .. 10;

    return bless {
        server       => $server,
        last_connect => 0,
        sockopts     => $sockopts // {},
        random_id    => $random_id
    }, $class;
}

sub last_will {
    my ($self, $topic, $message, $retain) = @_;

    my %old;
    %old = %{ $self->{will} } if $self->{will};

    _croak "Wrong number of arguments for last_will" if @_ > 4;

    if (@_ >= 2) {
        if (not defined $topic and not defined $message) {
            delete $self->{will};
            delete $self->{encoded_will};

            return;
        } else {
            $self->{will} = {
                topic   => $topic    // $old{topic}   // '',
                message => $message  // $old{message} // '',
                retain  => !!$retain // $old{retain}  // 0,
            };
            _croak("Topic is empty") if not length $self->{will}->{topic};

            my $e = $self->{encoded_will} = { %{ $self->{will} } };
            utf8::encode($e->{topic});
            utf8::downgrade($e->{message}, 1) or do {
                my ($file, $line, $method) = (caller 1)[1, 2, 3];
                warn "Wide character in $method at $file line $line.\n";
                utf8::encode($e->{message});
            };
        }
    }

    return @{ $self->{will} }{qw/topic message retain/};
}



( run in 1.875 second using v1.01-cache-2.11-cpan-cdf2f3d4e48 )