Beekeeper

 view release on metacpan or  search on metacpan

lib/Beekeeper/MQTT.pm  view on Meta::CPAN

package Beekeeper::MQTT;

use strict;
use warnings;

our $VERSION = '0.10';

use AnyEvent;
use AnyEvent::Handle;
use Time::HiRes;
use List::Util 'shuffle';
use Scalar::Util 'weaken';
use Exporter 'import';
use Carp;

our @EXPORT_OK;
our %EXPORT_TAGS;

our $DEBUG = 0;

EXPORT: {
    my (@const, @encode);
    foreach (keys %{Beekeeper::MQTT::}) {
        push @const, $_ if m/^MQTT_/;
        push @encode, $_ if m/^_(en|de)code/;
    }
    @EXPORT_OK = (@const, @encode);
    $EXPORT_TAGS{'const'} = \@const;
    $EXPORT_TAGS{'decode'} = \@encode;
}

# 2.1.2  Control Packet type

use constant MQTT_CONNECT     => 0x01;
use constant MQTT_CONNACK     => 0x02;
use constant MQTT_PUBLISH     => 0x03;
use constant MQTT_PUBACK      => 0x04;
use constant MQTT_PUBREC      => 0x05;
use constant MQTT_PUBREL      => 0x06;
use constant MQTT_PUBCOMP     => 0x07;
use constant MQTT_SUBSCRIBE   => 0x08;
use constant MQTT_SUBACK      => 0x09;
use constant MQTT_UNSUBSCRIBE => 0x0A;
use constant MQTT_UNSUBACK    => 0x0B;
use constant MQTT_PINGREQ     => 0x0C;
use constant MQTT_PINGRESP    => 0x0D;
use constant MQTT_DISCONNECT  => 0x0E;
use constant MQTT_AUTH        => 0x0F;

# 2.2.2.2  Properties

use constant MQTT_PAYLOAD_FORMAT_INDICATOR           => 0x01;  # byte           PUBLISH, Will Properties
use constant MQTT_MESSAGE_EXPIRY_INTERVAL            => 0x02;  # long int       PUBLISH, Will Properties
use constant MQTT_CONTENT_TYPE                       => 0x03;  # utf8 string    PUBLISH, Will Properties
use constant MQTT_RESPONSE_TOPIC                     => 0x08;  # utf8 string    PUBLISH, Will Properties
use constant MQTT_CORRELATION_DATA                   => 0x09;  # binary data    PUBLISH, Will Properties
use constant MQTT_SUBSCRIPTION_IDENTIFIER            => 0x0B;  # variable int   PUBLISH, SUBSCRIBE
use constant MQTT_SESSION_EXPIRY_INTERVAL            => 0x11;  # long int       CONNECT, CONNACK, DISCONNECT
use constant MQTT_ASSIGNED_CLIENT_IDENTIFIER         => 0x12;  # utf8 string    CONNACK
use constant MQTT_SERVER_KEEP_ALIVE                  => 0x13;  # short int      CONNACK
use constant MQTT_AUTHENTICATION_METHOD              => 0x15;  # utf8 string    CONNECT, CONNACK, AUTH
use constant MQTT_AUTHENTICATION_DATA                => 0x16;  # binary data    CONNECT, CONNACK, AUTH
use constant MQTT_REQUEST_PROBLEM_INFORMATION        => 0x17;  # byte           CONNECT
use constant MQTT_WILL_DELAY_INTERVAL                => 0x18;  # long int       Will Properties
use constant MQTT_REQUEST_RESPONSE_INFORMATION       => 0x19;  # byte           CONNECT
use constant MQTT_RESPONSE_INFORMATION               => 0x1A;  # utf8 string    CONNACK
use constant MQTT_SERVER_REFERENCE                   => 0x1C;  # utf8 string    CONNACK, DISCONNECT
use constant MQTT_REASON_STRING                      => 0x1F;  # utf8 string    CONNACK, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK, UNSUBACK, DISCONNECT, AUTH
use constant MQTT_RECEIVE_MAXIMUM                    => 0x21;  # short int      CONNECT, CONNACK
use constant MQTT_TOPIC_ALIAS_MAXIMUM                => 0x22;  # short int      CONNECT, CONNACK
use constant MQTT_TOPIC_ALIAS                        => 0x23;  # short int      PUBLISH
use constant MQTT_MAXIMUM_QOS                        => 0x24;  # byte           CONNACK

lib/Beekeeper/MQTT.pm  view on Meta::CPAN

        handle          => undef,    # the socket
        hosts           => undef,    # list of all hosts in cluster
        is_connected    => undef,    # true once connected
        try_hosts       => undef,    # list of hosts to try to connect
        connect_err     => undef,    # count of connection errors
        timeout_tmr     => undef,    # timer used for connection timeout
        reconnect_tmr   => undef,    # timer used for connection retry
        connack_cb      => undef,    # connack callback
        error_cb        => undef,    # error callback
        client_id       => undef,    # client id
        server_prop     => {},       # server properties
        server_alias    => {},       # server topic aliases
        client_alias    => {},       # client topic aliases
        subscriptions   => {},       # topic subscriptions
        subscr_cb       => {},       # subscription callbacks
        packet_cb       => {},       # packet callbacks 
        buffers         => {},       # raw mqtt buffers
        packet_seq      => 1,        # sequence used for packet ids
        subscr_seq      => 1,        # sequence used for subscription ids
        alias_seq       => 1,        # sequence used for topic alias ids
        use_alias       => 0,        # topic alias enabled
        config          => \%args,
    };

    $self->{bus_id}   = delete $args{'bus_id'};
    $self->{bus_role} = delete $args{'bus_role'} || $self->{bus_id};
    $self->{error_cb} = delete $args{'on_error'};

    bless $self, $class;
    return $self;
}

sub bus_id   { $_[0]->{bus_id}   }
sub bus_role { $_[0]->{bus_role} }

sub _fatal {
    my ($self, $errstr) = @_;
    die "(" . __PACKAGE__ . ") $errstr\n" unless $self->{error_cb};
    $self->{error_cb}->($errstr);
}

our $BUSY_SINCE = undef;
our $BUSY_TIME  = 0;

sub connect {
    my ($self, %args) = @_;

    $self->{connack_cb} = $args{'on_connack'};
    $self->{connect_cv} = AnyEvent->condvar;

    $self->_connect;

    $self->{connect_cv}->recv if $args{'blocking'};
    $self->{connect_cv} = undef;

    return $args{'blocking'} ? $self->{is_connected} : 1;
}

sub _connect {
    my ($self) = @_;
    weaken($self);

    my $config = $self->{config};

    my $timeout = $config->{'timeout'};
    $timeout = 30 unless defined $timeout;

    # Ensure that timeout is set properly when the event loop was blocked
    AnyEvent->now_update;

    # Connection timeout handler
    if ($timeout && !$self->{timeout_tmr}) {
        $self->{timeout_tmr} = AnyEvent->timer( after => $timeout, cb => sub { 
            $self->_reset_connection;
            $self->{connect_cv}->send;
            $self->_fatal("Could not connect to MQTT broker after $timeout seconds");
        });
    }

    unless ($self->{hosts}) {
        # Initialize the list of cluster hosts
        my $hosts = $config->{'host'} || 'localhost';
        my @hosts = (ref $hosts eq 'ARRAY') ? @$hosts : ( $hosts );
        $self->{hosts} = [ shuffle @hosts ];
    }

    # Determine next host of cluster to connect to
    my $try_hosts = $self->{try_hosts} ||= [];
    @$try_hosts = @{$self->{hosts}} unless @$try_hosts;

    # TCP connection args
    my $host = shift @$try_hosts;
    my $tls  = $config->{'tls'}  || 0;
    my $port = $config->{'port'} || ( $tls ? 8883 : 1883 );

    ($host) = ($host =~ m/^([a-zA-Z0-9\-\.]+)$/s); # untaint
    ($port) = ($port =~ m/^([0-9]+)$/s);

    $self->{handle} = AnyEvent::Handle->new(
        connect    => [ $host, $port ],
        tls        => $tls ? 'connect' : undef,
        keepalive  => 1,
        no_delay   => 1,
        on_connect => sub {
            my ($fh, $host, $port) = @_;
            # Send CONNECT packet
            $self->{server_prop}->{host} = $host;
            $self->{server_prop}->{port} = $port;
            $self->_send_connect;
        },
        on_connect_error => sub {
            my ($fh, $errmsg) = @_;
            # Some error occurred while connection, such as an unresolved hostname
            # or connection refused. Try next host of cluster immediately, or retry
            # in few seconds if all hosts of the cluster are unresponsive
            $self->{connect_err}++;
            warn "Could not connect to MQTT broker at $host:$port: $errmsg\n" if ($self->{connect_err} <= @{$self->{hosts}});
            my $delay = @{$self->{try_hosts}} ? 0 : $self->{connect_err} / @{$self->{hosts}};
            $self->{reconnect_tmr} = AnyEvent->timer(
                after => ($delay < 10 ? $delay : 10),
                cb    => sub { $self->_connect },

lib/Beekeeper/MQTT.pm  view on Meta::CPAN



sub unsubscribe {
    my ($self, %args) = @_;

    my $topic       = delete $args{'topic'};
    my $topics      = delete $args{'topics'};
    my $unsuback_cb = delete $args{'on_unsuback'};

    $topics = [] unless defined $topics;
    push (@$topics, $topic) if defined $topic; 

    croak "Unsubscription topics were not specified" unless @$topics;
    croak "on_unsuback callback is required" unless $unsuback_cb;

    foreach my $topic (@$topics) {
        croak "Undefined unsubscription topic" unless defined $topic;
        croak "Empty unsubscription topic" unless length $topic;
    }

    my $packet_id = $self->{packet_seq}++;
    $self->{packet_seq} = 1 if $packet_id == 0xFFFF;

    # Set callback for UNSUBACK
    $self->{packet_cb}->{$packet_id} = {
        topics      => [ @$topics ], # copy
        unsuback_cb => $unsuback_cb,
    };

    # 3.10.2.1.2  User Property  (utf8 string pair)
    my $raw_prop = '';
    foreach my $key (keys %args) {
        my $val = $args{$key};
        next unless defined $val;
        utf8::encode( $key );
        utf8::encode( $val );
        $raw_prop .= pack("C n/a* n/a*", MQTT_USER_PROPERTY, $key, $val);
    }

    my $raw_mqtt = pack("n", $packet_id)             .  # 3.10.2    Packet identifier
                   _encode_var_int(length $raw_prop) .  # 3.10.2.1  Property Length
                   $raw_prop;                           # 3.10.2.1  Properties

    # 3.10.3  Payload
    foreach my $topic (@$topics) {
        utf8::encode($topic);
        $raw_mqtt .= pack("n/a*", $topic);
    }

    $self->{handle}->push_write( 
        pack("C", MQTT_UNSUBSCRIBE << 4 | 0x02) .  # 3.10.1 Packet type 
        _encode_var_int(length $raw_mqtt)       .  # 3.10.1 Packet length
        $raw_mqtt
    );

    1;
}

sub _receive_unsuback {
    my ($self, $packet) = @_;
    weaken($self);

    # 3.11.2  Packet id  (short int)
    my $offs = 0;
    my $packet_id = _decode_int_16($packet, \$offs);

    # 3.11.2.1.1  Property Length  (variable length int)
    my $prop_len = _decode_var_int($packet, \$offs);
    my $prop_end = $offs + $prop_len;
    my %prop;

    while ($offs < $prop_end) {

        my $prop_id = _decode_byte($packet, \$offs);

        if ($prop_id == MQTT_REASON_STRING) {
            # 3.11.2.1.2  Reason String  (utf8 string)
            $prop{'reason_string'} = _decode_utf8_str($packet, \$offs);
        }
        elsif ($prop_id == MQTT_USER_PROPERTY) {
            # 3.11.2.1.3  User Property  (utf8 string pair)
            my $key = _decode_utf8_str($packet, \$offs);
            my $val = _decode_utf8_str($packet, \$offs);
            $prop{$key} = $val;
        }
        else {
            # Protocol error
            $self->_fatal("Received UNSUBACK with unexpected property $prop_id");
        }
    }

    # 3.11.3  Payload
    my @reason_codes = unpack("C*", substr($$packet, $offs));

    my $packet_cb = delete $self->{packet_cb}->{$packet_id};
    $self->_fatal("Received unexpected UNSUBACK") unless $packet_cb;

    my $topics = $packet_cb->{topics};
    my $unsuback_cb = $packet_cb->{unsuback_cb};

    my $success = 1;
    my @properties;

    foreach my $code (@reason_codes) {

        my $topic = shift @$topics;
        my $reason = $Reason_code{$code};

        if ($code == 0) {
            # Success
            my $subs = $self->{subscriptions};
            my $subscr_id = delete $subs->{$topic};
            if ($subscr_id) {
                # Free on_publish callback if not used by another subscription
                my @still_used = grep { $subs->{$_} == $subscr_id } keys %$subs;
                unless (@still_used) {
                    # But not right now, as broker may send some messages *after* unsubscription
                    $self->{_timers}->{"unsub-$subscr_id"} = AnyEvent->timer( after => 60, cb => sub {
                        delete $self->{_timers}->{"unsub-$subscr_id"};
                        delete $self->{subscr_cb}->{$subscr_id};
                    });



( run in 1.495 second using v1.01-cache-2.11-cpan-5b529ec07f3 )