Beekeeper

 view release on metacpan or  search on metacpan

lib/Beekeeper/MQTT.pm  view on Meta::CPAN


    return $int;
}

sub _decode_utf8_str {
    my ($packet, $offs) = @_;

    my $str = unpack("n/a", substr($$packet, $$offs));
    $$offs += 2 + length($str);
    utf8::decode($str);

    return $str;
}

sub _decode_binary_data {
    my ($packet, $offs) = @_;

    my $data = unpack("n/a", substr($$packet, $$offs));
    $$offs += 2 + length($data);

    return $data;
}

sub _decode_var_int {
    my ($packet, $offs) = @_;

    my $int = 0;
    my $mult = 1;
    my $byte;

    do {
        $byte = unpack("C", substr($$packet, $$offs, 1));
        $int += ($byte & 0x7F) * $mult;
        $mult *= 128;
        $$offs++;
    } while ($byte & 0x80);

    return $int;
}

sub _encode_var_int {
    return pack("C", $_[0]) if ($_[0] < 128);
    my @a = unpack("C*", pack("w", $_[0]));
    $a[0]  &= 0x7F;
    $a[-1] |= 0x80;
    return pack("C*", reverse @a);
}


sub new {
    my ($class, %args) = @_;

    my $self = {
        bus_id          => undef,
        bus_role        => undef,
        handle          => undef,    # the socket
        hosts           => undef,    # list of all hosts in cluster
        is_connected    => undef,    # true once connected
        try_hosts       => undef,    # list of hosts to try to connect
        connect_err     => undef,    # count of connection errors
        timeout_tmr     => undef,    # timer used for connection timeout
        reconnect_tmr   => undef,    # timer used for connection retry
        connack_cb      => undef,    # connack callback
        error_cb        => undef,    # error callback
        client_id       => undef,    # client id
        server_prop     => {},       # server properties
        server_alias    => {},       # server topic aliases
        client_alias    => {},       # client topic aliases
        subscriptions   => {},       # topic subscriptions
        subscr_cb       => {},       # subscription callbacks
        packet_cb       => {},       # packet callbacks 
        buffers         => {},       # raw mqtt buffers
        packet_seq      => 1,        # sequence used for packet ids
        subscr_seq      => 1,        # sequence used for subscription ids
        alias_seq       => 1,        # sequence used for topic alias ids
        use_alias       => 0,        # topic alias enabled
        config          => \%args,
    };

    $self->{bus_id}   = delete $args{'bus_id'};
    $self->{bus_role} = delete $args{'bus_role'} || $self->{bus_id};
    $self->{error_cb} = delete $args{'on_error'};

    bless $self, $class;
    return $self;
}

sub bus_id   { $_[0]->{bus_id}   }
sub bus_role { $_[0]->{bus_role} }

sub _fatal {
    my ($self, $errstr) = @_;
    die "(" . __PACKAGE__ . ") $errstr\n" unless $self->{error_cb};
    $self->{error_cb}->($errstr);
}

our $BUSY_SINCE = undef;
our $BUSY_TIME  = 0;

sub connect {
    my ($self, %args) = @_;

    $self->{connack_cb} = $args{'on_connack'};
    $self->{connect_cv} = AnyEvent->condvar;

    $self->_connect;

    $self->{connect_cv}->recv if $args{'blocking'};
    $self->{connect_cv} = undef;

    return $args{'blocking'} ? $self->{is_connected} : 1;
}

sub _connect {
    my ($self) = @_;
    weaken($self);

    my $config = $self->{config};

    my $timeout = $config->{'timeout'};
    $timeout = 30 unless defined $timeout;

    # Ensure that timeout is set properly when the event loop was blocked
    AnyEvent->now_update;

    # Connection timeout handler
    if ($timeout && !$self->{timeout_tmr}) {
        $self->{timeout_tmr} = AnyEvent->timer( after => $timeout, cb => sub { 
            $self->_reset_connection;
            $self->{connect_cv}->send;
            $self->_fatal("Could not connect to MQTT broker after $timeout seconds");
        });
    }

    unless ($self->{hosts}) {
        # Initialize the list of cluster hosts
        my $hosts = $config->{'host'} || 'localhost';
        my @hosts = (ref $hosts eq 'ARRAY') ? @$hosts : ( $hosts );
        $self->{hosts} = [ shuffle @hosts ];
    }

    # Determine next host of cluster to connect to
    my $try_hosts = $self->{try_hosts} ||= [];
    @$try_hosts = @{$self->{hosts}} unless @$try_hosts;

    # TCP connection args
    my $host = shift @$try_hosts;
    my $tls  = $config->{'tls'}  || 0;
    my $port = $config->{'port'} || ( $tls ? 8883 : 1883 );

    ($host) = ($host =~ m/^([a-zA-Z0-9\-\.]+)$/s); # untaint
    ($port) = ($port =~ m/^([0-9]+)$/s);

    $self->{handle} = AnyEvent::Handle->new(
        connect    => [ $host, $port ],
        tls        => $tls ? 'connect' : undef,
        keepalive  => 1,
        no_delay   => 1,
        on_connect => sub {
            my ($fh, $host, $port) = @_;
            # Send CONNECT packet
            $self->{server_prop}->{host} = $host;
            $self->{server_prop}->{port} = $port;
            $self->_send_connect;
        },
        on_connect_error => sub {
            my ($fh, $errmsg) = @_;
            # Some error occurred while connection, such as an unresolved hostname
            # or connection refused. Try next host of cluster immediately, or retry
            # in few seconds if all hosts of the cluster are unresponsive
            $self->{connect_err}++;
            warn "Could not connect to MQTT broker at $host:$port: $errmsg\n" if ($self->{connect_err} <= @{$self->{hosts}});
            my $delay = @{$self->{try_hosts}} ? 0 : $self->{connect_err} / @{$self->{hosts}};
            $self->{reconnect_tmr} = AnyEvent->timer(
                after => ($delay < 10 ? $delay : 10),
                cb    => sub { $self->_connect },
            );
        },
        on_error => sub {
            my ($fh, $fatal, $errmsg) = @_;
            # Some error occurred, such as a read error
            $self->_reset_connection;
            $self->_fatal("Error on connection to MQTT broker at $host:$port: $errmsg");
        },
        on_eof => sub {
            my ($fh) = @_;
            # The server has closed the connection without sending DISCONNECT
            $self->_reset_connection;
            $self->_fatal("MQTT broker at $host:$port has gone away");
        },
        on_read => sub {
            my ($fh) = @_;

            my $packet_type;
            my $packet_flags;

            my $rbuff_len;
            my $packet_len;

            my $mult;
            my $offs;
            my $byte;

            my $timing_packets;

            unless (defined $BUSY_SINCE) {
                # Measure time elapsed while processing incoming packets
                $BUSY_SINCE = Time::HiRes::time;
                $timing_packets = 1; 
            }

            PARSE_PACKET: {

                $rbuff_len = length $fh->{rbuf};

                last PARSE_PACKET unless $rbuff_len >= 2;

                unless ($packet_type) {

                    $packet_len = 0;
                    $mult = 1;
                    $offs = 1;

                    PARSE_LEN: {
                        $byte = unpack "C", substr( $fh->{rbuf}, $offs++, 1 );
                        $packet_len += ($byte & 0x7f) * $mult;
                        last unless ($byte & 0x80);
                        last PARSE_PACKET if ($offs >= $rbuff_len); # Not enough data
                        $mult *= 128;
                        redo if ($offs < 5);
                    }

                    #TODO: Check max packet size

lib/Beekeeper/MQTT.pm  view on Meta::CPAN


sub _receive_unsuback {
    my ($self, $packet) = @_;
    weaken($self);

    # 3.11.2  Packet id  (short int)
    my $offs = 0;
    my $packet_id = _decode_int_16($packet, \$offs);

    # 3.11.2.1.1  Property Length  (variable length int)
    my $prop_len = _decode_var_int($packet, \$offs);
    my $prop_end = $offs + $prop_len;
    my %prop;

    while ($offs < $prop_end) {

        my $prop_id = _decode_byte($packet, \$offs);

        if ($prop_id == MQTT_REASON_STRING) {
            # 3.11.2.1.2  Reason String  (utf8 string)
            $prop{'reason_string'} = _decode_utf8_str($packet, \$offs);
        }
        elsif ($prop_id == MQTT_USER_PROPERTY) {
            # 3.11.2.1.3  User Property  (utf8 string pair)
            my $key = _decode_utf8_str($packet, \$offs);
            my $val = _decode_utf8_str($packet, \$offs);
            $prop{$key} = $val;
        }
        else {
            # Protocol error
            $self->_fatal("Received UNSUBACK with unexpected property $prop_id");
        }
    }

    # 3.11.3  Payload
    my @reason_codes = unpack("C*", substr($$packet, $offs));

    my $packet_cb = delete $self->{packet_cb}->{$packet_id};
    $self->_fatal("Received unexpected UNSUBACK") unless $packet_cb;

    my $topics = $packet_cb->{topics};
    my $unsuback_cb = $packet_cb->{unsuback_cb};

    my $success = 1;
    my @properties;

    foreach my $code (@reason_codes) {

        my $topic = shift @$topics;
        my $reason = $Reason_code{$code};

        if ($code == 0) {
            # Success
            my $subs = $self->{subscriptions};
            my $subscr_id = delete $subs->{$topic};
            if ($subscr_id) {
                # Free on_publish callback if not used by another subscription
                my @still_used = grep { $subs->{$_} == $subscr_id } keys %$subs;
                unless (@still_used) {
                    # But not right now, as broker may send some messages *after* unsubscription
                    $self->{_timers}->{"unsub-$subscr_id"} = AnyEvent->timer( after => 60, cb => sub {
                        delete $self->{_timers}->{"unsub-$subscr_id"};
                        delete $self->{subscr_cb}->{$subscr_id};
                    });
                }
            }
        }
        else {
            # Failure
            $success = 0;
            unless ($unsuback_cb) {
                $self->_fatal("Unsubscription to topic '$topic' failed: $reason");
            }
        }

        push @properties, {
            topic       => $topic,
            reason_code => $code,
            reason      => $reason,
            %prop
        };
    }

    $unsuback_cb->($success, @properties) if $unsuback_cb;
}

our $AE_WAITING;

sub publish {
    my ($self, %args) = @_;

    my $topic     = delete $args{'topic'};
    my $payload   = delete $args{'payload'};
    my $qos       = delete $args{'qos'};
    my $dup       = delete $args{'duplicate'};
    my $retain    = delete $args{'retain'};
    my $on_puback = delete $args{'on_puback'};
    my $buffer_id = delete $args{'buffer_id'};

    croak "Message topic was not specified" unless defined $topic;

    $DEBUG && warn "Sent message to: $topic\n";

    $payload = '' unless defined $payload;
    my $payload_ref = (ref $payload eq 'SCALAR') ? $payload : \$payload;

    # 3.3.2.3.4  Topic Alias
    my $topic_alias;
    if ($self->{use_alias}) {
        $topic_alias = $self->{client_alias}->{$topic};
        if ($topic_alias) {
            # Send topic alias only
            $topic = '';
        }
        elsif ($self->{server_prop}->{'topic_alias_maximum'}) {
            #TODO: Honor maximum
            $topic_alias = $self->{alias_seq}++;
            $self->{client_alias}->{$topic} = $topic_alias;
        }
    }

    # 3.3.1.2  QoS level



( run in 2.266 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )