Beekeeper
view release on metacpan or search on metacpan
lib/Beekeeper/Service/ToyBroker/Worker.pm view on Meta::CPAN
package Beekeeper::Service::ToyBroker::Worker;
use strict;
use warnings;
our $VERSION = '0.10';
use Beekeeper::Worker ':log';
use base 'Beekeeper::Worker';
use Beekeeper::MQTT qw(:const :decode);
use Beekeeper::Config;
use AnyEvent::Handle;
use AnyEvent::Socket;
use Scalar::Util 'weaken';
use Carp;
use constant DEBUG => 0;
sub new {
my ($class, %args) = @_;
my $self = $class->SUPER::new(%args);
$self->start_broker;
# Postponed initialization
$self->SUPER::__init_client;
$self->{_LOGGER}->{_BUS} = $self->{_BUS};
$self->SUPER::__init_auth_tokens;
$self->SUPER::__init_worker;
return $self;
}
sub __init_client { }
sub __init_auth_tokens { }
sub __init_worker { }
sub on_startup { }
sub on_shutdown {
my $self = shift;
log_info "Shutting down";
# Wait for clients to gracefully disconnect
for (1..60) {
my $conn_count = scalar keys %{$self->{connections}};
last if $conn_count <= 1; # our self connection
my $wait = AnyEvent->condvar;
my $tmr = AnyEvent->timer( after => 0.5, cb => $wait );
$wait->recv;
}
# Get rid of our self connection
$self->{_BUS}->disconnect;
log_info "Stopped";
}
sub authorize_request {
my ($self, $req) = @_;
return BKPR_REQUEST_AUTHORIZED;
}
sub start_broker {
my ($self) = @_;
$self->{connections} = {};
$self->{clients} = {};
$self->{topics} = {};
$self->{users} = {};
my $config = Beekeeper::Config->read_config_file( 'toybroker.config.json' );
# Start a default listener if no config found
$config = [ {} ] unless defined $config;
foreach my $listener (@$config) {
if ($listener->{users}) {
%{$self->{users}} = ( %{$self->{users}}, %{$listener->{users}} );
}
$self->start_listener( $listener );
}
}
sub start_listener {
my ($self, $listener) = @_;
weaken($self);
my $max_packet_size = $listener->{'max_packet_size'};
my $addr = $listener->{'listen_addr'} || '127.0.0.1'; # Must be an IPv4 or IPv6 address
my $port = $listener->{'listen_port'} || 1883;
($addr) = ($addr =~ m/^([\w\.:]+)$/); # untaint
($port) = ($port =~ m/^(\d+)$/);
log_info "Listening on $addr:$port";
$self->{"listener-$addr-$port"} = tcp_server ($addr, $port, sub {
my ($FH, $host, $port) = @_;
my $packet_type;
my $packet_flags;
my $rbuff_len;
my $packet_len;
my $mult;
my $offs;
my $byte;
my $fh; $fh = AnyEvent::Handle->new(
fh => $FH,
keepalive => 1,
no_delay => 1,
on_read => sub {
PARSE_PACKET: {
$rbuff_len = length $fh->{rbuf};
return unless $rbuff_len >= 2;
unless ($packet_type) {
$packet_len = 0;
$mult = 1;
$offs = 1;
PARSE_LEN: {
$byte = unpack "C", substr( $fh->{rbuf}, $offs++, 1 );
$packet_len += ($byte & 0x7f) * $mult;
last unless ($byte & 0x80);
return if ($offs >= $rbuff_len); # Not enough data
$mult *= 128;
redo if ($offs < 5);
}
if ($max_packet_size && $packet_len > $max_packet_size) {
$self->disconnect($fh, reason_code => 0x95);
return;
}
$byte = unpack('C', substr( $fh->{rbuf}, 0, 1 ));
$packet_type = $byte >> 4;
$packet_flags = $byte & 0x0F;
}
lib/Beekeeper/Service/ToyBroker/Worker.pm view on Meta::CPAN
$self->{_WORKER}->{notif_count}++; # track outgoing messages for stats
$fh->push_write( $raw_mqtt );
}
sub puback {
my ($self, $fh, %args) = @_;
croak "Missing packet_id" unless $args{'packet_id'};
my $raw_mqtt = pack(
"C C n C",
MQTT_PUBACK << 4, # 3.4.1 Packet type
3, # 3.4.1 Remaining length
$args{'packet_id'}, # 3.4.2 Packet identifier
$args{'reason_code'} || 0, # 3.4.2.1 Reason code
);
$fh->push_write( $raw_mqtt );
}
sub _receive_puback {
my ($self, $fh, $packet) = @_;
my ($packet_id, $reason_code) = unpack("n C", $$packet);
$reason_code = 0 unless defined $reason_code;
$self->get_client($fh)->on_puback($packet_id);
}
sub _receive_pubrec {
my ($self, $fh, $packet) = @_;
$self->disconnect($fh, reason_code => 0x9B);
}
sub _receive_pubrel {
my ($self, $fh, $packet) = @_;
$self->disconnect($fh, reason_code => 0x9B);
}
sub _receive_pubcomp {
my ($self, $fh, $packet) = @_;
$self->disconnect($fh, reason_code => 0x9B);
}
sub _receive_pubauth {
my ($self, $fh, $packet) = @_;
$self->disconnect($fh, reason_code => 0x9B);
}
#------------------------------------------------------------------------------
sub add_client {
my ($self, $fh, $prop) = @_;
weaken($self);
my $client_id = $prop->{'client_identifier'};
my $username = $prop->{'username'};
my $password = $prop->{'password'};
my $users_cfg = $self->{'users'};
my $authorized;
AUTH: {
last unless (length $client_id);
last unless (length $username);
last unless (length $password);
last unless ($users_cfg);
last unless ($users_cfg->{$username});
last unless ($users_cfg->{$username}->{'password'} eq $password);
$authorized = 1;
}
unless ($authorized) {
log_warn('Client not authorized');
$self->_shutdown($fh);
return;
}
my $client = Beekeeper::Service::ToyBroker::Client->new(
client_id => $client_id,
publish => sub { $self->publish($fh, @_) },
);
$self->{clients}->{"$fh"} = $client;
$self->connack( $fh, maximum_qos => 1 );
}
sub get_client {
my ($self, $fh) = @_;
return $self->{clients}->{"$fh"};
}
sub remove_client {
my ($self, $fh) = @_;
my $client = $self->{clients}->{"$fh"};
return unless $client; # called on eof after DISCONNECT
foreach my $topic_filter (keys %{$client->{subscriptions}}) {
$self->unsubscribe_client($fh, { topic_filter => $topic_filter });
}
$client->resend_unacked_messages;
delete $self->{clients}->{"$fh"};
}
( run in 2.864 seconds using v1.01-cache-2.11-cpan-524268b4103 )