Beekeeper

 view release on metacpan or  search on metacpan

lib/Beekeeper/Service/ToyBroker/Worker.pm  view on Meta::CPAN

sub   on_startup       { }

sub on_shutdown {
    my $self = shift;

    log_info "Shutting down";

    # Wait for clients to gracefully disconnect
    for (1..60) {
        my $conn_count = scalar keys %{$self->{connections}};
        last if $conn_count <= 1; # our self connection
        my $wait = AnyEvent->condvar;
        my $tmr = AnyEvent->timer( after => 0.5, cb => $wait );
        $wait->recv;
    }

    # Get rid of our self connection
    $self->{_BUS}->disconnect;

    log_info "Stopped";
}

sub authorize_request {
    my ($self, $req) = @_;

    return BKPR_REQUEST_AUTHORIZED;
}

sub start_broker {
    my ($self) = @_;

    $self->{connections} = {};
    $self->{clients}     = {};
    $self->{topics}      = {};
    $self->{users}       = {};

    my $config = Beekeeper::Config->read_config_file( 'toybroker.config.json' );

    # Start a default listener if no config found
    $config = [ {} ] unless defined $config;

    foreach my $listener (@$config) {

        if ($listener->{users}) {
            %{$self->{users}} = ( %{$self->{users}}, %{$listener->{users}} );
        }

        $self->start_listener( $listener );
    }
}

sub start_listener {
    my ($self, $listener) = @_;
    weaken($self);

    my $max_packet_size = $listener->{'max_packet_size'};

    my $addr = $listener->{'listen_addr'} || '127.0.0.1';  # Must be an IPv4 or IPv6 address
    my $port = $listener->{'listen_port'} ||  1883;

    ($addr) = ($addr =~ m/^([\w\.:]+)$/);  # untaint
    ($port) = ($port =~ m/^(\d+)$/);

    log_info "Listening on $addr:$port";

    $self->{"listener-$addr-$port"} = tcp_server ($addr, $port, sub {
        my ($FH, $host, $port) = @_;

        my $packet_type;
        my $packet_flags;

        my $rbuff_len;
        my $packet_len;

        my $mult;
        my $offs;
        my $byte;

        my $fh; $fh = AnyEvent::Handle->new(
            fh => $FH,
            keepalive => 1,
            no_delay => 1,
            on_read => sub {

                PARSE_PACKET: {

                    $rbuff_len = length $fh->{rbuf};

                    return unless $rbuff_len >= 2;

                    unless ($packet_type) {

                        $packet_len = 0;
                        $mult = 1;
                        $offs = 1;

                        PARSE_LEN: {
                            $byte = unpack "C", substr( $fh->{rbuf}, $offs++, 1 );
                            $packet_len += ($byte & 0x7f) * $mult;
                            last unless ($byte & 0x80);
                            return if ($offs >= $rbuff_len); # Not enough data
                            $mult *= 128;
                            redo if ($offs < 5);
                        }

                        if ($max_packet_size && $packet_len > $max_packet_size) {
                            $self->disconnect($fh, reason_code => 0x95);
                            return;
                        }

                        $byte = unpack('C', substr( $fh->{rbuf}, 0, 1 ));
                        $packet_type  = $byte >> 4;
                        $packet_flags = $byte & 0x0F;
                    }

                    if ($rbuff_len < ($offs + $packet_len)) {
                        # Not enough data
                        return;
                    }

                    # Consume packet from buffer



( run in 0.790 second using v1.01-cache-2.11-cpan-8f98c5d2c55 )