Beekeeper

 view release on metacpan or  search on metacpan

lib/Beekeeper/Service/ToyBroker/Worker.pm  view on Meta::CPAN

}

sub authorize_request {
    my ($self, $req) = @_;

    return BKPR_REQUEST_AUTHORIZED;
}

sub start_broker {
    my ($self) = @_;

    $self->{connections} = {};
    $self->{clients}     = {};
    $self->{topics}      = {};
    $self->{users}       = {};

    my $config = Beekeeper::Config->read_config_file( 'toybroker.config.json' );

    # Start a default listener if no config found
    $config = [ {} ] unless defined $config;

    foreach my $listener (@$config) {

        if ($listener->{users}) {
            %{$self->{users}} = ( %{$self->{users}}, %{$listener->{users}} );
        }

        $self->start_listener( $listener );
    }
}

sub start_listener {
    my ($self, $listener) = @_;
    weaken($self);

    my $max_packet_size = $listener->{'max_packet_size'};

    my $addr = $listener->{'listen_addr'} || '127.0.0.1';  # Must be an IPv4 or IPv6 address
    my $port = $listener->{'listen_port'} ||  1883;

    ($addr) = ($addr =~ m/^([\w\.:]+)$/);  # untaint
    ($port) = ($port =~ m/^(\d+)$/);

    log_info "Listening on $addr:$port";

    $self->{"listener-$addr-$port"} = tcp_server ($addr, $port, sub {
        my ($FH, $host, $port) = @_;

        my $packet_type;
        my $packet_flags;

        my $rbuff_len;
        my $packet_len;

        my $mult;
        my $offs;
        my $byte;

        my $fh; $fh = AnyEvent::Handle->new(
            fh => $FH,
            keepalive => 1,
            no_delay => 1,
            on_read => sub {

                PARSE_PACKET: {

                    $rbuff_len = length $fh->{rbuf};

                    return unless $rbuff_len >= 2;

                    unless ($packet_type) {

                        $packet_len = 0;
                        $mult = 1;
                        $offs = 1;

                        PARSE_LEN: {
                            $byte = unpack "C", substr( $fh->{rbuf}, $offs++, 1 );
                            $packet_len += ($byte & 0x7f) * $mult;
                            last unless ($byte & 0x80);
                            return if ($offs >= $rbuff_len); # Not enough data
                            $mult *= 128;
                            redo if ($offs < 5);
                        }

                        if ($max_packet_size && $packet_len > $max_packet_size) {
                            $self->disconnect($fh, reason_code => 0x95);
                            return;
                        }

                        $byte = unpack('C', substr( $fh->{rbuf}, 0, 1 ));
                        $packet_type  = $byte >> 4;
                        $packet_flags = $byte & 0x0F;
                    }

                    if ($rbuff_len < ($offs + $packet_len)) {
                        # Not enough data
                        return;
                    }

                    # Consume packet from buffer
                    my $packet = substr($fh->{rbuf}, 0, ($offs + $packet_len), '');

                    # Trim fixed header from packet
                    substr($packet, 0, $offs, '');

                    if ($packet_type == MQTT_PUBLISH) {

                        $self->_receive_publish($fh, \$packet, $packet_flags);
                    }
                    elsif ($packet_type == MQTT_PUBACK) {

                        $self->_receive_puback($fh, \$packet);
                    }
                    elsif ($packet_type == MQTT_PINGREQ) {

                        $self->pingresp($fh);
                    }
                    elsif ($packet_type == MQTT_PINGRESP) {

                        $self->_receive_pingresp($fh);

lib/Beekeeper/Service/ToyBroker/Worker.pm  view on Meta::CPAN


                        $self->_receive_auth($fh, \$packet);
                    }
                    else {
                        # Protocol error
                        log_warn "Received packet with unknown type $packet_type";
                        $self->disconnect($fh, reason_code => 0x81);
                        return;
                    }

                    # Prepare for next frame
                    undef $packet_type;

                    # Handle could have been destroyed at this point
                    redo PARSE_PACKET if defined $fh->{rbuf};
                }
            },
            on_eof => sub {
                # Clean disconnection, client will not write anymore
                $self->remove_client($fh);
                delete $self->{connections}->{"$fh"};
            },
            on_error => sub {
                log_error "$_[2]\n";
                $self->remove_client($fh);
                delete $self->{connections}->{"$fh"};
            }
        );

        $self->{connections}->{"$fh"} = $fh;

        #TODO: Close connection on login timeout
        # my $login_tmr = AnyEvent->timer( after => 5, cb => sub {
        #     $self->_shutdown($fh) unless $self->get_client($fh);
        # });
    });
}

sub _receive_connect {
    my ($self, $fh, $packet) = @_;

    my %prop;
    my $offs = 0;

    # 3.1.2.1  Protocol Name  (utf8 string)
    $prop{'protocol_name'} = _decode_utf8_str($packet, \$offs);

    # 3.1.2.2  Protocol Version  (byte)
    $prop{'protocol_version'} = _decode_byte($packet, \$offs);

    # 3.1.2.3  Connect Flags  (byte)
    my $flags = _decode_byte($packet, \$offs);
    $prop{'clean_start'} = 1 if $flags & 0x02;   # 3.1.2.4  Clean Start
    $prop{'username'}    = 1 if $flags & 0x80;   # 3.1.2.8  User Name Flag
    $prop{'password'}    = 1 if $flags & 0x40;   # 3.1.2.9  Password Flag
    $prop{'will_flag'}   = 1 if $flags & 0x04;   # 3.1.2.5  Will Flag
    $prop{'will_qos'}    = ($flags & 0x18) >> 3; # 3.1.2.6  Will QoS
    $prop{'will_retain'} = 1 if $flags & 0x20;   # 3.1.2.7  Will Retain

    # 3.1.2.10  Keep Alive  (short int)
    $prop{'keep_alive'} = _decode_int_16($packet, \$offs);

    # 3.1.2.11.1  Properties Length  (variable length int)
    my $prop_len = _decode_var_int($packet, \$offs);
    my $prop_end = $offs + $prop_len;

    while ($offs < $prop_end) {

        my $prop_id = _decode_byte($packet, \$offs);

        if ($prop_id == MQTT_SESSION_EXPIRY_INTERVAL) {
            # 3.1.2.11.2  Session Expiry Interval  (long int)
            $prop{'session_expiry_interval'} = _decode_int_32($packet, \$offs);
        }
        elsif ($prop_id == MQTT_RECEIVE_MAXIMUM) {
            # 3.1.2.11.3  Receive Maximum  (short int)
            $prop{'receive_maximum'} = _decode_int_16($packet, \$offs);
        }
        elsif ($prop_id == MQTT_MAXIMUM_PACKET_SIZE) {
            # 3.1.2.11.4  Maximum Packet Size  (long int)
            $prop{'maximum_packet_size'} = _decode_int_32($packet, \$offs);
        }
        elsif ($prop_id == MQTT_TOPIC_ALIAS_MAXIMUM) {
            # 3.1.2.11.5  Topic Alias Maximum  (short int)
            $prop{'topic_alias_maximum'} = _decode_int_16($packet, \$offs);
        }
        elsif ($prop_id == MQTT_REQUEST_RESPONSE_INFORMATION) {
            # 3.1.2.11.6  Request Response Information  (byte)  
            $prop{'request_response_information'} = _decode_byte($packet, \$offs);
        }
        elsif ($prop_id == MQTT_REQUEST_PROBLEM_INFORMATION) {
            # 3.1.2.11.7  Request Problem Information  (byte)
            $prop{'request_problem_information'} = _decode_byte($packet, \$offs);
        }
        elsif ($prop_id == MQTT_USER_PROPERTY) {
            # 3.1.2.11.8  User Property  (utf8 string pair)
            my $key = _decode_utf8_str($packet, \$offs);
            my $val = _decode_utf8_str($packet, \$offs);
            $prop{$key} = $val;
        }
        elsif ($prop_id == MQTT_AUTHENTICATION_METHOD) {
            # 3.1.2.11.9  Authentication Method  (utf8 string)
            $prop{'authentication_method'} = _decode_utf8_str($packet, \$offs);
        }
        elsif ($prop_id == MQTT_AUTHENTICATION_DATA) {
            # 3.1.2.11.10  Authentication Data  (binary data)
            $prop{'authentication_data'} = _decode_binary_data($packet, \$offs);
        }
        else {
            # Protocol error
            log_warn "Received CONNECT with unknown property $prop_id";
            $self->_shutdown($fh);
            return; 
        }
    }

    # 3.1.3.1  Client Identifier  (utf8 string)
    $prop{'client_identifier'} = _decode_utf8_str($packet, \$offs);

    if ($prop{'will'}) {

lib/Beekeeper/Service/ToyBroker/Worker.pm  view on Meta::CPAN


    my $raw_prop;

    if (exists $args{'session_expiry_interval'}) {
        # 3.2.2.3.2  Session Expiry Interval  (long int)
        $raw_prop .= pack("C N", MQTT_SESSION_EXPIRY_INTERVAL, delete $args{'session_expiry_interval'});
    }

    if (exists $args{'receive_maximum'}) {
        # 3.2.2.3.3  Receive Maximum  (short int)
        $raw_prop .= pack("C n", MQTT_RECEIVE_MAXIMUM, delete $args{'receive_maximum'});
    }

    if (exists $args{'maximum_qos'}) {
        # 3.2.2.3.4  Maximum QoS  (byte)
        $raw_prop .= pack("C C", MQTT_MAXIMUM_QOS, delete $args{'maximum_qos'});
    }

    if (exists $args{'retain_available'}) {
        # 3.2.2.3.5  Retain Available  (byte)
        $raw_prop .= pack("C C", MQTT_RETAIN_AVAILABLE, delete $args{'retain_available'});
    }

    if (exists $args{'maximum_packet_size'}) {
        # 3.2.2.3.6  Maximum Packet Size  (long int)
        $raw_prop .= pack("C N", MQTT_MAXIMUM_PACKET_SIZE, delete $args{'maximum_packet_size'});
    }

    if (exists $args{'assigned_client_identifier'}) {
        # 3.2.2.3.7  Assigned Client Identifier  (utf8 string)
        utf8::encode( $args{'assigned_client_identifier'} );
        $raw_prop .= pack("C n/a*", MQTT_ASSIGNED_CLIENT_IDENTIFIER, delete $args{'assigned_client_identifier'});
    }

    if (exists $args{'topic_alias_maximum'}) {
        # 3.2.2.3.8  Topic Alias Maximum  (short int)
        $raw_prop .= pack("C n", MQTT_TOPIC_ALIAS_MAXIMUM, delete $args{'topic_alias_maximum'});
    }

    if (exists $args{'reason_string'}) {
        # 3.2.2.3.9  Reason String  (utf8 string)
        utf8::encode( $args{'reason_string'} );
        $raw_prop .= pack("C n/a*", MQTT_REASON_STRING, delete $args{'reason_string'});
    }

    if (exists $args{'wildcard_subscription_available'}) {
        # 3.2.2.3.11  Wildcard Subscription Available  (byte)
        $raw_prop .= pack("C C", MQTT_WILDCARD_SUBSCRIPTION_AVAILABLE, delete $args{'wildcard_subscription_available'});
    }

    if (exists $args{'subscription_identifier_available'}) {
        # 3.2.2.3.12  Subscription Identifiers Available  (byte)
        $raw_prop .= pack("C C", MQTT_SUBSCRIPTION_IDENTIFIER_AVAILABLE, delete $args{'subscription_identifier_available'});
    }

    if (exists $args{'shared_subscription_available'}) {
        # 3.2.2.3.13  Shared Subscription Available  (byte)
        $raw_prop .= pack("C C", MQTT_SHARED_SUBSCRIPTION_AVAILABLE, delete $args{'shared_subscription_available'});
    }

    if (exists $args{'server_keep_alive'}) {
        # 3.2.2.3.14  Server Keep Alive  (short int)
        $raw_prop .= pack("C n", MQTT_SERVER_KEEP_ALIVE, delete $args{'server_keep_alive'});
    }

    if (exists $args{'response_information'}) {
        # 3.2.2.3.15  Response Information  (utf8 string)
        utf8::encode( $args{'response_information'} );
        $raw_prop .= pack("C n/a*", MQTT_RESPONSE_INFORMATION, delete $args{'response_information'});
    }

    if (exists $args{'server_reference'}) {
        # 3.2.2.3.16  Server Reference  (utf8 string)
        utf8::encode( $args{'server_reference'} );
        $raw_prop .= pack("C n/a*", MQTT_SERVER_REFERENCE, delete $args{'server_reference'});
    }

    if (exists $args{'authentication_method'}) {
        # 3.2.2.3.17  Authentication Method  (utf8 string)
        utf8::encode( $args{'authentication_method'} );
        $raw_prop .= pack("C n/a*", MQTT_AUTHENTICATION_METHOD, delete $args{'authentication_method'});
    }

    if (exists $args{'authentication_data'}) {
        # 3.2.2.3.18 Authentication Data  (binary data)
        $raw_prop .= pack("C n/a*", MQTT_AUTHENTICATION_DATA, delete $args{'authentication_data'});
    }

    foreach my $key (keys %args) {
        # 3.2.2.3.10  User Property  (utf8 string pair)
        my $val = $args{$key};
        next unless defined $val;
        utf8::encode( $key );
        utf8::encode( $val );
        $raw_prop .= pack("C n/a* n/a*", MQTT_USER_PROPERTY, $key, $val);
    }

    # 3.2.2  Variable Header

    # 3.2.2.1  Acknowledge flags  (byte)
    my $raw_mqtt = pack("C", $reason_code || 0);

    # 3.2.2.2  Reason code  (byte)
    $raw_mqtt .= pack("C", $session_present ? 0x01 : 0);

    # 3.2.2.3  Properties
    $raw_mqtt .= _encode_var_int(length $raw_prop);
    $raw_mqtt .= $raw_prop;

    $fh->push_write( 
        pack("C", MQTT_CONNACK << 4)      .  # 3.2.1  Packet type 
        _encode_var_int(length $raw_mqtt) .  # 3.2.1  Packet length
        $raw_mqtt
    );
}

sub _receive_disconnect {
    my ($self, $fh, $packet) = @_;

    # Handle abbreviated packet
    $$packet = "\x00\x00" if (length $$packet == 0);

    # 3.14.2.1  Reason Code  (byte)



( run in 0.586 second using v1.01-cache-2.11-cpan-df04353d9ac )