Beekeeper

 view release on metacpan or  search on metacpan

lib/Beekeeper/Service/ToyBroker/Worker.pm  view on Meta::CPAN

package Beekeeper::Service::ToyBroker::Worker;

use strict;
use warnings;

our $VERSION = '0.10';

use Beekeeper::Worker ':log';
use base 'Beekeeper::Worker';

use Beekeeper::MQTT qw(:const :decode);
use Beekeeper::Config;

use AnyEvent::Handle;
use AnyEvent::Socket;
use Scalar::Util 'weaken';
use Carp;

use constant DEBUG => 0;


sub new {
    my ($class, %args) = @_;

    my $self = $class->SUPER::new(%args);

    $self->start_broker;

    # Postponed initialization
    $self->SUPER::__init_client;
    $self->{_LOGGER}->{_BUS} = $self->{_BUS};
    $self->SUPER::__init_auth_tokens;
    $self->SUPER::__init_worker;

    return $self;
}

sub __init_client      { }
sub __init_auth_tokens { }
sub __init_worker      { }
sub   on_startup       { }

sub on_shutdown {
    my $self = shift;

    log_info "Shutting down";

    # Wait for clients to gracefully disconnect
    for (1..60) {
        my $conn_count = scalar keys %{$self->{connections}};
        last if $conn_count <= 1; # our self connection
        my $wait = AnyEvent->condvar;
        my $tmr = AnyEvent->timer( after => 0.5, cb => $wait );
        $wait->recv;
    }

    # Get rid of our self connection
    $self->{_BUS}->disconnect;

    log_info "Stopped";
}

sub authorize_request {
    my ($self, $req) = @_;

    return BKPR_REQUEST_AUTHORIZED;
}

sub start_broker {
    my ($self) = @_;

    $self->{connections} = {};
    $self->{clients}     = {};
    $self->{topics}      = {};
    $self->{users}       = {};

    my $config = Beekeeper::Config->read_config_file( 'toybroker.config.json' );

    # Start a default listener if no config found
    $config = [ {} ] unless defined $config;

    foreach my $listener (@$config) {

        if ($listener->{users}) {
            %{$self->{users}} = ( %{$self->{users}}, %{$listener->{users}} );
        }

        $self->start_listener( $listener );
    }
}

sub start_listener {
    my ($self, $listener) = @_;
    weaken($self);

    my $max_packet_size = $listener->{'max_packet_size'};

    my $addr = $listener->{'listen_addr'} || '127.0.0.1';  # Must be an IPv4 or IPv6 address
    my $port = $listener->{'listen_port'} ||  1883;

    ($addr) = ($addr =~ m/^([\w\.:]+)$/);  # untaint
    ($port) = ($port =~ m/^(\d+)$/);

    log_info "Listening on $addr:$port";

    $self->{"listener-$addr-$port"} = tcp_server ($addr, $port, sub {
        my ($FH, $host, $port) = @_;

        my $packet_type;
        my $packet_flags;

        my $rbuff_len;
        my $packet_len;

lib/Beekeeper/Service/ToyBroker/Worker.pm  view on Meta::CPAN


                        $self->_receive_subscribe($fh, \$packet);
                    }
                    elsif ($packet_type == MQTT_UNSUBSCRIBE) {

                        $self->_receive_unsubscribe($fh, \$packet);
                    }
                    elsif ($packet_type == MQTT_CONNECT) {

                        $self->_receive_connect($fh, \$packet);
                    }
                    elsif ($packet_type == MQTT_DISCONNECT) {

                        $self->_receive_disconnect($fh, \$packet);
                    }
                    elsif ($packet_type == MQTT_PUBREC) {

                        $self->_receive_pubrec($fh, \$packet);
                    }
                    elsif ($packet_type == MQTT_PUBREL) {
                        
                        $self->_receive_pubrel($fh, \$packet);
                    }
                    elsif ($packet_type == MQTT_PUBCOMP) {

                        $self->_receive_pubcomp($fh, \$packet);
                    }
                    elsif ($packet_type == MQTT_AUTH) {

                        $self->_receive_auth($fh, \$packet);
                    }
                    else {
                        # Protocol error
                        log_warn "Received packet with unknown type $packet_type";
                        $self->disconnect($fh, reason_code => 0x81);
                        return;
                    }

                    # Prepare for next frame
                    undef $packet_type;

                    # Handle could have been destroyed at this point
                    redo PARSE_PACKET if defined $fh->{rbuf};
                }
            },
            on_eof => sub {
                # Clean disconnection, client will not write anymore
                $self->remove_client($fh);
                delete $self->{connections}->{"$fh"};
            },
            on_error => sub {
                log_error "$_[2]\n";
                $self->remove_client($fh);
                delete $self->{connections}->{"$fh"};
            }
        );

        $self->{connections}->{"$fh"} = $fh;

        #TODO: Close connection on login timeout
        # my $login_tmr = AnyEvent->timer( after => 5, cb => sub {
        #     $self->_shutdown($fh) unless $self->get_client($fh);
        # });
    });
}

sub _receive_connect {
    my ($self, $fh, $packet) = @_;

    my %prop;
    my $offs = 0;

    # 3.1.2.1  Protocol Name  (utf8 string)
    $prop{'protocol_name'} = _decode_utf8_str($packet, \$offs);

    # 3.1.2.2  Protocol Version  (byte)
    $prop{'protocol_version'} = _decode_byte($packet, \$offs);

    # 3.1.2.3  Connect Flags  (byte)
    my $flags = _decode_byte($packet, \$offs);
    $prop{'clean_start'} = 1 if $flags & 0x02;   # 3.1.2.4  Clean Start
    $prop{'username'}    = 1 if $flags & 0x80;   # 3.1.2.8  User Name Flag
    $prop{'password'}    = 1 if $flags & 0x40;   # 3.1.2.9  Password Flag
    $prop{'will_flag'}   = 1 if $flags & 0x04;   # 3.1.2.5  Will Flag
    $prop{'will_qos'}    = ($flags & 0x18) >> 3; # 3.1.2.6  Will QoS
    $prop{'will_retain'} = 1 if $flags & 0x20;   # 3.1.2.7  Will Retain

    # 3.1.2.10  Keep Alive  (short int)
    $prop{'keep_alive'} = _decode_int_16($packet, \$offs);

    # 3.1.2.11.1  Properties Length  (variable length int)
    my $prop_len = _decode_var_int($packet, \$offs);
    my $prop_end = $offs + $prop_len;

    while ($offs < $prop_end) {

        my $prop_id = _decode_byte($packet, \$offs);

        if ($prop_id == MQTT_SESSION_EXPIRY_INTERVAL) {
            # 3.1.2.11.2  Session Expiry Interval  (long int)
            $prop{'session_expiry_interval'} = _decode_int_32($packet, \$offs);
        }
        elsif ($prop_id == MQTT_RECEIVE_MAXIMUM) {
            # 3.1.2.11.3  Receive Maximum  (short int)
            $prop{'receive_maximum'} = _decode_int_16($packet, \$offs);
        }
        elsif ($prop_id == MQTT_MAXIMUM_PACKET_SIZE) {
            # 3.1.2.11.4  Maximum Packet Size  (long int)
            $prop{'maximum_packet_size'} = _decode_int_32($packet, \$offs);
        }
        elsif ($prop_id == MQTT_TOPIC_ALIAS_MAXIMUM) {
            # 3.1.2.11.5  Topic Alias Maximum  (short int)
            $prop{'topic_alias_maximum'} = _decode_int_16($packet, \$offs);
        }
        elsif ($prop_id == MQTT_REQUEST_RESPONSE_INFORMATION) {
            # 3.1.2.11.6  Request Response Information  (byte)  
            $prop{'request_response_information'} = _decode_byte($packet, \$offs);
        }
        elsif ($prop_id == MQTT_REQUEST_PROBLEM_INFORMATION) {
            # 3.1.2.11.7  Request Problem Information  (byte)
            $prop{'request_problem_information'} = _decode_byte($packet, \$offs);



( run in 2.796 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )