Net-MQTT-Simple
view release on metacpan or search on metacpan
lib/Net/MQTT/Simple.pm view on Meta::CPAN
package Net::MQTT::Simple;
use strict;
use warnings;
use IO::Socket::IP;
use Socket ();
our $VERSION = '1.33';
# Please note that these are not documented and are subject to change:
our $KEEPALIVE_INTERVAL = 60;
our $PING_TIMEOUT = 10;
our $RECONNECT_INTERVAL = 5;
our $MAX_LENGTH = 2097152; # 2 MB
our $READ_BYTES = 16 * 1024; # 16 kB per IO::Socket::SSL recommendation
our $WRITE_BYTES = 16 * 1024; # 16 kB per IO::Socket::SSL maximum
our $PROTOCOL_LEVEL = 0x04; # 0x03 in v3.1, 0x04 in v3.1.1
our $PROTOCOL_NAME = "MQTT"; # MQIsdp in v3.1, MQTT in v3.1.1
my $global;
sub _default_port { 1883 }
sub _socket_class { 'IO::Socket::IP' }
sub _socket_error { "$@" }
sub _secure { 0 }
sub _client_identifier { my ($class) = @_; return "Net::MQTT::Simple[" . $class->{random_id} . "]"; }
# Carp might not be available either.
sub _croak {
die sprintf "%s at %s line %d.\n", "@_", (caller 1)[1, 2];
}
sub filter_as_regex {
my ($filter) = @_;
return "^(?!\\\$)" if $filter eq '#'; # Match everything except /^\$/
return "^/" if $filter eq '/#'; # Parent (empty topic) is invalid
$filter = quotemeta $filter;
$filter =~ s{ \z (?<! \\ \/ \\ \# ) } {\\z}x; # Anchor unless /#$/
$filter =~ s{ \\ \/ \\ \# } {}x;
$filter =~ s{ \\ \+ } {[^/]*+}xg;
$filter =~ s{ ^ (?= \[ \^ / \] \* ) } {(?!\\\$)}x; # No /^\$/ if /^\+/
return "^$filter";
}
sub import {
my ($class, $server) = @_;
@_ <= 2 or _croak "Too many arguments for use " . __PACKAGE__;
$server or return;
$global = $class->new($server);
no strict 'refs';
*{ (caller)[0] . "::publish" } = \&publish;
*{ (caller)[0] . "::retain" } = \&retain;
*{ (caller)[0] . "::mqtt_get" } = \&get;
}
sub new {
my ($class, $server, $sockopts) = @_;
@_ == 2 or @_ == 3 or _croak "Wrong number of arguments for $class->new";
my $port = $class->_default_port;
# Add port for bare IPv6 address
$server = "[$server]:$port" if $server =~ /:.*:/ and not $server =~ /\[/;
# Add port for bare IPv4 address or bracketed IPv6 address
$server .= ":$port" if $server !~ /:/ or $server =~ /^\[.*\]$/;
# Create a random ID for the instance of the object
my $random_id = join "", map chr 65 + int rand 26, 1 .. 10;
return bless {
server => $server,
last_connect => 0,
sockopts => $sockopts // {},
random_id => $random_id
}, $class;
}
sub last_will {
my ($self, $topic, $message, $retain) = @_;
my %old;
%old = %{ $self->{will} } if $self->{will};
_croak "Wrong number of arguments for last_will" if @_ > 4;
if (@_ >= 2) {
if (not defined $topic and not defined $message) {
delete $self->{will};
delete $self->{encoded_will};
return;
} else {
$self->{will} = {
topic => $topic // $old{topic} // '',
message => $message // $old{message} // '',
retain => !!$retain // $old{retain} // 0,
};
_croak("Topic is empty") if not length $self->{will}->{topic};
my $e = $self->{encoded_will} = { %{ $self->{will} } };
utf8::encode($e->{topic});
utf8::downgrade($e->{message}, 1) or do {
my ($file, $line, $method) = (caller 1)[1, 2, 3];
warn "Wide character in $method at $file line $line.\n";
utf8::encode($e->{message});
};
}
}
return @{ $self->{will} }{qw/topic message retain/};
}
( run in 1.875 second using v1.01-cache-2.11-cpan-cdf2f3d4e48 )