Beekeeper

 view release on metacpan or  search on metacpan

lib/Beekeeper/Service/ToyBroker/Worker.pm  view on Meta::CPAN

package Beekeeper::Service::ToyBroker::Worker;

use strict;
use warnings;

our $VERSION = '0.10';

use Beekeeper::Worker ':log';
use base 'Beekeeper::Worker';

use Beekeeper::MQTT qw(:const :decode);
use Beekeeper::Config;

use AnyEvent::Handle;
use AnyEvent::Socket;
use Scalar::Util 'weaken';
use Carp;

use constant DEBUG => 0;


sub new {
    my ($class, %args) = @_;

    my $self = $class->SUPER::new(%args);

    $self->start_broker;

    # Postponed initialization
    $self->SUPER::__init_client;
    $self->{_LOGGER}->{_BUS} = $self->{_BUS};
    $self->SUPER::__init_auth_tokens;
    $self->SUPER::__init_worker;

    return $self;
}

sub __init_client      { }
sub __init_auth_tokens { }
sub __init_worker      { }
sub   on_startup       { }

sub on_shutdown {
    my $self = shift;

    log_info "Shutting down";

    # Wait for clients to gracefully disconnect
    for (1..60) {
        my $conn_count = scalar keys %{$self->{connections}};
        last if $conn_count <= 1; # our self connection
        my $wait = AnyEvent->condvar;
        my $tmr = AnyEvent->timer( after => 0.5, cb => $wait );
        $wait->recv;
    }

    # Get rid of our self connection
    $self->{_BUS}->disconnect;

    log_info "Stopped";
}

sub authorize_request {
    my ($self, $req) = @_;

    return BKPR_REQUEST_AUTHORIZED;
}

sub start_broker {
    my ($self) = @_;

    $self->{connections} = {};
    $self->{clients}     = {};
    $self->{topics}      = {};
    $self->{users}       = {};

    my $config = Beekeeper::Config->read_config_file( 'toybroker.config.json' );

    # Start a default listener if no config found
    $config = [ {} ] unless defined $config;

    foreach my $listener (@$config) {

        if ($listener->{users}) {
            %{$self->{users}} = ( %{$self->{users}}, %{$listener->{users}} );
        }

        $self->start_listener( $listener );
    }
}

sub start_listener {
    my ($self, $listener) = @_;
    weaken($self);

    my $max_packet_size = $listener->{'max_packet_size'};

    my $addr = $listener->{'listen_addr'} || '127.0.0.1';  # Must be an IPv4 or IPv6 address
    my $port = $listener->{'listen_port'} ||  1883;

    ($addr) = ($addr =~ m/^([\w\.:]+)$/);  # untaint
    ($port) = ($port =~ m/^(\d+)$/);

    log_info "Listening on $addr:$port";

    $self->{"listener-$addr-$port"} = tcp_server ($addr, $port, sub {
        my ($FH, $host, $port) = @_;

        my $packet_type;
        my $packet_flags;

        my $rbuff_len;
        my $packet_len;

        my $mult;
        my $offs;
        my $byte;

        my $fh; $fh = AnyEvent::Handle->new(
            fh => $FH,
            keepalive => 1,
            no_delay => 1,
            on_read => sub {

                PARSE_PACKET: {

                    $rbuff_len = length $fh->{rbuf};

                    return unless $rbuff_len >= 2;

                    unless ($packet_type) {

                        $packet_len = 0;
                        $mult = 1;
                        $offs = 1;

                        PARSE_LEN: {
                            $byte = unpack "C", substr( $fh->{rbuf}, $offs++, 1 );
                            $packet_len += ($byte & 0x7f) * $mult;
                            last unless ($byte & 0x80);
                            return if ($offs >= $rbuff_len); # Not enough data
                            $mult *= 128;
                            redo if ($offs < 5);
                        }

                        if ($max_packet_size && $packet_len > $max_packet_size) {
                            $self->disconnect($fh, reason_code => 0x95);
                            return;
                        }

                        $byte = unpack('C', substr( $fh->{rbuf}, 0, 1 ));
                        $packet_type  = $byte >> 4;
                        $packet_flags = $byte & 0x0F;
                    }

lib/Beekeeper/Service/ToyBroker/Worker.pm  view on Meta::CPAN


    $self->{_WORKER}->{notif_count}++;  # track outgoing messages for stats

    $fh->push_write( $raw_mqtt );
}

sub puback {
    my ($self, $fh, %args) = @_;

    croak "Missing packet_id" unless $args{'packet_id'};

    my $raw_mqtt = pack( 
        "C C n C", 
        MQTT_PUBACK << 4,           # 3.4.1    Packet type 
        3,                          # 3.4.1    Remaining length
        $args{'packet_id'},         # 3.4.2    Packet identifier
        $args{'reason_code'} || 0,  # 3.4.2.1  Reason code
    );

    $fh->push_write( $raw_mqtt );
}

sub _receive_puback {
    my ($self, $fh, $packet) = @_;

    my ($packet_id, $reason_code) = unpack("n C", $$packet);
    $reason_code = 0 unless defined $reason_code;

    $self->get_client($fh)->on_puback($packet_id);
}

sub _receive_pubrec {
    my ($self, $fh, $packet) = @_;

    $self->disconnect($fh, reason_code => 0x9B);
}

sub _receive_pubrel {
    my ($self, $fh, $packet) = @_;

    $self->disconnect($fh, reason_code => 0x9B);
}

sub _receive_pubcomp {
    my ($self, $fh, $packet) = @_;

    $self->disconnect($fh, reason_code => 0x9B);
}

sub _receive_pubauth {
    my ($self, $fh, $packet) = @_;

    $self->disconnect($fh, reason_code => 0x9B);
}


#------------------------------------------------------------------------------

sub add_client {
    my ($self, $fh, $prop) = @_;
    weaken($self);

    my $client_id = $prop->{'client_identifier'};
    my $username  = $prop->{'username'};
    my $password  = $prop->{'password'};

    my $users_cfg = $self->{'users'};
    my $authorized;

    AUTH: {

        last unless (length $client_id);
        last unless (length $username);
        last unless (length $password);

        last unless ($users_cfg);
        last unless ($users_cfg->{$username});
        last unless ($users_cfg->{$username}->{'password'} eq $password);

        $authorized = 1;
    }

    unless ($authorized) {
        log_warn('Client not authorized');
        $self->_shutdown($fh);
        return;
    }

    my $client = Beekeeper::Service::ToyBroker::Client->new(
        client_id => $client_id,
        publish   => sub { $self->publish($fh, @_) },
    );

    $self->{clients}->{"$fh"} = $client;

    $self->connack( $fh, maximum_qos => 1 );
}

sub get_client {
    my ($self, $fh) = @_;

    return $self->{clients}->{"$fh"};
}

sub remove_client {
    my ($self, $fh) = @_;

    my $client = $self->{clients}->{"$fh"};

    return unless $client;  # called on eof after DISCONNECT 

    foreach my $topic_filter (keys %{$client->{subscriptions}}) {

        $self->unsubscribe_client($fh, { topic_filter => $topic_filter });
    }

    $client->resend_unacked_messages;

    delete $self->{clients}->{"$fh"};
}



( run in 2.864 seconds using v1.01-cache-2.11-cpan-524268b4103 )