Beekeeper

 view release on metacpan or  search on metacpan

lib/Beekeeper/MQTT.pm  view on Meta::CPAN

our $BUSY_SINCE = undef;
our $BUSY_TIME  = 0;

sub connect {
    my ($self, %args) = @_;

    $self->{connack_cb} = $args{'on_connack'};
    $self->{connect_cv} = AnyEvent->condvar;

    $self->_connect;

    $self->{connect_cv}->recv if $args{'blocking'};
    $self->{connect_cv} = undef;

    return $args{'blocking'} ? $self->{is_connected} : 1;
}

sub _connect {
    my ($self) = @_;
    weaken($self);

    my $config = $self->{config};

    my $timeout = $config->{'timeout'};
    $timeout = 30 unless defined $timeout;

    # Ensure that timeout is set properly when the event loop was blocked
    AnyEvent->now_update;

    # Connection timeout handler
    if ($timeout && !$self->{timeout_tmr}) {
        $self->{timeout_tmr} = AnyEvent->timer( after => $timeout, cb => sub { 
            $self->_reset_connection;
            $self->{connect_cv}->send;
            $self->_fatal("Could not connect to MQTT broker after $timeout seconds");
        });
    }

    unless ($self->{hosts}) {
        # Initialize the list of cluster hosts
        my $hosts = $config->{'host'} || 'localhost';
        my @hosts = (ref $hosts eq 'ARRAY') ? @$hosts : ( $hosts );
        $self->{hosts} = [ shuffle @hosts ];
    }

    # Determine next host of cluster to connect to
    my $try_hosts = $self->{try_hosts} ||= [];
    @$try_hosts = @{$self->{hosts}} unless @$try_hosts;

    # TCP connection args
    my $host = shift @$try_hosts;
    my $tls  = $config->{'tls'}  || 0;
    my $port = $config->{'port'} || ( $tls ? 8883 : 1883 );

    ($host) = ($host =~ m/^([a-zA-Z0-9\-\.]+)$/s); # untaint
    ($port) = ($port =~ m/^([0-9]+)$/s);

    $self->{handle} = AnyEvent::Handle->new(
        connect    => [ $host, $port ],
        tls        => $tls ? 'connect' : undef,
        keepalive  => 1,
        no_delay   => 1,
        on_connect => sub {
            my ($fh, $host, $port) = @_;
            # Send CONNECT packet
            $self->{server_prop}->{host} = $host;
            $self->{server_prop}->{port} = $port;
            $self->_send_connect;
        },
        on_connect_error => sub {
            my ($fh, $errmsg) = @_;
            # Some error occurred while connection, such as an unresolved hostname
            # or connection refused. Try next host of cluster immediately, or retry
            # in few seconds if all hosts of the cluster are unresponsive
            $self->{connect_err}++;
            warn "Could not connect to MQTT broker at $host:$port: $errmsg\n" if ($self->{connect_err} <= @{$self->{hosts}});
            my $delay = @{$self->{try_hosts}} ? 0 : $self->{connect_err} / @{$self->{hosts}};
            $self->{reconnect_tmr} = AnyEvent->timer(
                after => ($delay < 10 ? $delay : 10),
                cb    => sub { $self->_connect },
            );
        },
        on_error => sub {
            my ($fh, $fatal, $errmsg) = @_;
            # Some error occurred, such as a read error
            $self->_reset_connection;
            $self->_fatal("Error on connection to MQTT broker at $host:$port: $errmsg");
        },
        on_eof => sub {
            my ($fh) = @_;
            # The server has closed the connection without sending DISCONNECT
            $self->_reset_connection;
            $self->_fatal("MQTT broker at $host:$port has gone away");
        },
        on_read => sub {
            my ($fh) = @_;

            my $packet_type;
            my $packet_flags;

            my $rbuff_len;
            my $packet_len;

            my $mult;
            my $offs;
            my $byte;

            my $timing_packets;

            unless (defined $BUSY_SINCE) {
                # Measure time elapsed while processing incoming packets
                $BUSY_SINCE = Time::HiRes::time;
                $timing_packets = 1; 
            }

            PARSE_PACKET: {

                $rbuff_len = length $fh->{rbuf};

                last PARSE_PACKET unless $rbuff_len >= 2;

lib/Beekeeper/MQTT.pm  view on Meta::CPAN

                }
                elsif ($packet_type == MQTT_PINGREQ) {

                    $self->pingresp;
                }
                elsif ($packet_type == MQTT_PINGRESP) {

                    # Client takes no action on receiving PINGRESP
                }
                elsif ($packet_type == MQTT_SUBACK) {

                    $self->_receive_suback(\$packet);
                }
                elsif ($packet_type == MQTT_UNSUBACK) {

                    $self->_receive_unsuback(\$packet);
                }
                elsif ($packet_type == MQTT_CONNACK) {

                    $self->_receive_connack(\$packet);
                }
                elsif ($packet_type == MQTT_DISCONNECT) {

                    $self->_receive_disconnect(\$packet);
                }
                elsif ($packet_type == MQTT_AUTH) {

                    $self->_receive_auth(\$packet);
                }
                else {
                    # Protocol error
                    $self->_fatal("Received packet with unknown type $packet_type");
                }

                # Prepare for next frame
                undef $packet_type;

                # Handle could have been destroyed at this point
                redo PARSE_PACKET if defined $fh->{rbuf};
            }

            if (defined $timing_packets) {
                $BUSY_TIME += Time::HiRes::time - $BUSY_SINCE;
                undef $BUSY_SINCE;
            }
        },
    );

    1;
}

sub _send_connect {
    my ($self) = @_;

    my %prop = %{$self->{config}};

    my $username    = delete $prop{'username'};
    my $password    = delete $prop{'password'};
    my $client_id   = delete $prop{'client_id'};
    my $clean_start = delete $prop{'clean_start'};
    my $keep_alive  = delete $prop{'keep_alive'};
    my $will        = delete $prop{'will'};

    unless ($client_id) {
        $client_id = '';
        $client_id .= ('0'..'9','a'..'z','A'..'Z')[rand 62] for (1..22);
    }

    $self->{client_id} = $client_id;


    # 3.1.2.11  Properties

    my $raw_prop = '';

    if (exists $prop{'session_expiry_interval'}) {
        # 3.1.2.11.2  Session Expiry Interval  (long int)
        $raw_prop .= pack("C N", MQTT_SESSION_EXPIRY_INTERVAL, delete $prop{'session_expiry_interval'});
    }

    if (exists $prop{'receive_maximum'}) {
        # 3.1.2.11.3  Receive Maximum  (short int)
        $raw_prop .= pack("C n", MQTT_RECEIVE_MAXIMUM, delete $prop{'receive_maximum'});
    }

    if (exists $prop{'maximum_packet_size'}) {
        # 3.1.2.11.4  Maximum Packet Size  (long int)
        $raw_prop .= pack("C N", MQTT_MAXIMUM_PACKET_SIZE, delete $prop{'maximum_packet_size'});
    }

    if (exists $prop{'topic_alias_maximum'}) {
        # 3.1.2.11.5  Topic Alias Maximum  (short int)
        $raw_prop .= pack("C n", MQTT_TOPIC_ALIAS_MAXIMUM, delete $prop{'topic_alias_maximum'});
    }

    if (exists $prop{'request_response_information'}) {
        # 3.1.2.11.6  Request Response Information  (byte)  
        $raw_prop .= pack("C C", MQTT_REQUEST_RESPONSE_INFORMATION, delete $prop{'request_response_information'});
    }

    if (exists $prop{'request_problem_information'}) {
        # 3.1.2.11.7  Request Problem Information  (byte)
        $raw_prop .= pack("C C", MQTT_REQUEST_PROBLEM_INFORMATION, delete $prop{'request_problem_information'});
    }

    if (exists $prop{'authentication_method'}) {
        # 3.1.2.11.9  Authentication Method  (utf8 string)
        utf8::encode( $prop{'authentication_method'} );
        $raw_prop .= pack("C n/a*", MQTT_AUTHENTICATION_METHOD, delete $prop{'authentication_method'});
    }

    if (exists $prop{'authentication_data'}) {
        # 3.1.2.11.10  Authentication Data  (binary data)
        $raw_prop .= pack("C n", MQTT_AUTHENTICATION_DATA, delete $prop{'authentication_data'});
    }

    foreach my $key (keys %prop) {
        # 3.1.2.11.8  User Property  (utf8 string pair)
        my $val = $prop{$key};
        next unless defined $val;
        utf8::encode( $key );
        utf8::encode( $val );
        $raw_prop .= pack("C n/a* n/a*", MQTT_USER_PROPERTY, $key, $val);
    }


    # 3.1.2  Variable Header

    # 3.1.2.1  Protocol Name  (utf8 string)
    my $raw_mqtt = pack("n/a*", "MQTT");

    # 3.1.2.2  Protocol Version  (byte)
    $raw_mqtt .= pack("C", 5);

    # 3.1.2.3  Connect Flags  (byte)
    my $flags = 0;
    $flags |= 0x02 if $clean_start;           # 3.1.2.4  Clean Start
    $flags |= 0x80 if defined $username;      # 3.1.2.8  User Name Flag
    $flags |= 0x40 if defined $password;      # 3.1.2.9  Password Flag

    if ($will) {
        $flags |= 0x04;                       # 3.1.2.5  Will Flag
        $flags |= $will->{'qos'} << 3;        # 3.1.2.6  Will QoS
        $flags |= 0x20 if $will->{'retain'};  # 3.1.2.7  Will Retain
    }

    $raw_mqtt .= pack("C", $flags);

    # 3.1.2.10  Keep Alive  (short int)
    $raw_mqtt .= pack("n", $keep_alive || 0);   

    # 3.1.2.11  Properties
    $raw_mqtt .= _encode_var_int(length $raw_prop);
    $raw_mqtt .= $raw_prop;


    # 3.1.3  Payload

    # 3.1.3.1  Client Identifier  (utf8 string)
    $raw_mqtt .= pack("n/a*", $client_id);

    if ($will) {

        #TODO: 3.1.3.2  Will Properties
        my $will_prop = '';

        $raw_mqtt .= _encode_var_int(length $will_prop);
        $raw_mqtt .= $will_prop;

        # 3.1.3.3  Will Topic  (utf8 string)
        utf8::encode( $will->{'topic'});
        $raw_mqtt .= pack("n/a*", $will->{'topic'});

        # 3.1.3.4  Will Payload  (binary data)
        $raw_mqtt .= pack("n/a*", $will->{'payload'});
    }

    if (defined $username) {
        # 3.1.3.5  Username  (utf8 string)
        utf8::encode( $username );
        $raw_mqtt .= pack("n/a*", $username);
    }
    
    if (defined $password) {
        # 3.1.3.6  Password  (binary data)
        $raw_mqtt .= pack("n/a*", $password);
    }

    $self->{handle}->push_write( 
        pack("C", MQTT_CONNECT << 4)      .  # 3.1.1  Packet type 
        _encode_var_int(length $raw_mqtt) .  # 3.1.1  Packet length
        $raw_mqtt
    );
}

sub _receive_connack {
    my ($self, $packet) = @_;

    my $prop = $self->{server_prop};
    my $offs = 0;

    # 3.2.2.1  Acknowledge flags  (byte)
    my $ack_flags = _decode_byte($packet, \$offs);
    $prop->{'session_present'} = $ack_flags & 0x01;

    # 3.2.2.2  Reason code  (byte)
    my $reason_code = _decode_byte($packet, \$offs);
    $prop->{'reason_code'} = $reason_code;
    $prop->{'reason'} = $Reason_code{$reason_code};
    
    # 3.2.2.3.1  Properties Length  (variable length int)
    my $prop_len = _decode_var_int($packet, \$offs);
    my $prop_end = $offs + $prop_len;

    while ($offs < $prop_end) {

        my $prop_id = _decode_byte($packet, \$offs);

        if ($prop_id == MQTT_SESSION_EXPIRY_INTERVAL) {
            # 3.2.2.3.2  Session Expiry Interval  (long int)
            $prop->{'session_expiry_interval'} = _decode_int_32($packet, \$offs);
        }
        elsif ($prop_id == MQTT_RECEIVE_MAXIMUM) {
            # 3.2.2.3.3  Receive Maximum  (short int)
            $prop->{'receive_maximum'} = _decode_int_16($packet, \$offs);
        }
        elsif ($prop_id == MQTT_MAXIMUM_QOS) {
            # 3.2.2.3.4  Maximum QoS  (byte)
            $prop->{'maximum_qos'} = _decode_byte($packet, \$offs);
        }
        elsif ($prop_id == MQTT_RETAIN_AVAILABLE) {
            # 3.2.2.3.5  Retain Available  (byte)
            $prop->{'retain_available'} = _decode_byte($packet, \$offs);
        }
        elsif ($prop_id == MQTT_MAXIMUM_PACKET_SIZE) {
            # 3.2.2.3.6  Maximum Packet Size  (long int)
            $prop->{'maximum_packet_size'} = _decode_int_32($packet, \$offs);
        }
        elsif ($prop_id == MQTT_ASSIGNED_CLIENT_IDENTIFIER) {
            # 3.2.2.3.7  Assigned Client Identifier  (utf8 string)
            $prop->{'assigned_client_identifier'} = _decode_utf8_str($packet, \$offs);
        }
        elsif ($prop_id == MQTT_TOPIC_ALIAS_MAXIMUM) {
            # 3.2.2.3.8  Topic Alias Maximum  (short int)
            $prop->{'topic_alias_maximum'} = _decode_int_16($packet, \$offs);
        }
        elsif ($prop_id == MQTT_REASON_STRING) {
            # 3.2.2.3.9  Reason String  (utf8 string)
            $prop->{'reason_string'} = _decode_utf8_str($packet, \$offs);
        }
        elsif ($prop_id == MQTT_USER_PROPERTY) {
            # 3.2.2.3.10  User Property  (utf8 string pair)
            my $key = _decode_utf8_str($packet, \$offs);
            my $val = _decode_utf8_str($packet, \$offs);
            $prop->{$key} = $val;
        }
        elsif ($prop_id == MQTT_WILDCARD_SUBSCRIPTION_AVAILABLE) {
            # 3.2.2.3.11  Wildcard Subscription Available  (byte)
            $prop->{'wildcard_subscription_available'} = _decode_byte($packet, \$offs);
        }
        elsif ($prop_id == MQTT_SUBSCRIPTION_IDENTIFIER_AVAILABLE) {
            # 3.2.2.3.12  Subscription Identifiers Available  (byte)
            $prop->{'subscription_identifier_available'} = _decode_byte($packet, \$offs);
        }
        elsif ($prop_id == MQTT_SHARED_SUBSCRIPTION_AVAILABLE) {
            # 3.2.2.3.13  Shared Subscription Available  (byte)
            $prop->{'shared_subscription_available'} = _decode_byte($packet, \$offs);
        }
        elsif ($prop_id == MQTT_SERVER_KEEP_ALIVE) {
            # 3.2.2.3.14  Server Keep Alive  (short int)
            $prop->{'server_keep_alive'} = _decode_int_16($packet, \$offs);
        }
        elsif ($prop_id == MQTT_RESPONSE_INFORMATION) {
            # 3.2.2.3.15  Response Information  (utf8 string)
            $prop->{'response_information'} = _decode_utf8_str($packet, \$offs);
        }
        elsif ($prop_id == MQTT_SERVER_REFERENCE) {
            # 3.2.2.3.16  Server Reference  (utf8 string)
            $prop->{'server_reference'} = _decode_utf8_str($packet, \$offs);
        }
        elsif ($prop_id == MQTT_AUTHENTICATION_METHOD) {
            # 3.2.2.3.17  Authentication Method  (utf8 string)
            $prop->{'authentication_method'} = _decode_utf8_str($packet, \$offs);
        }
        elsif ($prop_id == MQTT_AUTHENTICATION_DATA) {
            # 3.2.2.3.18 Authentication Data  (binary data)
            $prop->{'authentication_data'} = _decode_binary_data($packet, \$offs);
        }
        else {
            # Protocol error
            $self->_fatal("Received CONNACK with unknown property $prop_id"); 
        }
    }

    my $success = ($reason_code == 0x00);

    unless ( $success ) {
        # Server will close the connection
        # warn "Served refused CONNACK: $reason";
        #TODO: handle
    }

    $self->{is_connected}  = 1;
    $self->{timeout_tmr}   = undef;
    $self->{reconnect_tmr} = undef;
    $self->{connect_err}   = undef;

    #TODO: ... blocking connection
    $self->{connect_cv}->send if $self->{connect_cv};

    # Execute CONNACK callback
    my $connack_cb = $self->{connack_cb};
    $connack_cb->($success, $prop) if $connack_cb;
}


sub disconnect {
    my ($self, %args) = @_;

    unless (defined $self->{handle}) {
        carp "Already disconnected from MQTT broker";
        return;
    }

    my $reason_code = delete $args{'reason_code'};

    # 3.14.2.2  Properties

    my $raw_prop = '';

    if (exists $args{'session_expiry_interval'}) {



( run in 1.018 second using v1.01-cache-2.11-cpan-df04353d9ac )