Beekeeper
view release on metacpan or search on metacpan
lib/Beekeeper/Service/ToyBroker/Worker.pm view on Meta::CPAN
sub on_startup { }
sub on_shutdown {
my $self = shift;
log_info "Shutting down";
# Wait for clients to gracefully disconnect
for (1..60) {
my $conn_count = scalar keys %{$self->{connections}};
last if $conn_count <= 1; # our self connection
my $wait = AnyEvent->condvar;
my $tmr = AnyEvent->timer( after => 0.5, cb => $wait );
$wait->recv;
}
# Get rid of our self connection
$self->{_BUS}->disconnect;
log_info "Stopped";
}
sub authorize_request {
my ($self, $req) = @_;
return BKPR_REQUEST_AUTHORIZED;
}
sub start_broker {
my ($self) = @_;
$self->{connections} = {};
$self->{clients} = {};
$self->{topics} = {};
$self->{users} = {};
my $config = Beekeeper::Config->read_config_file( 'toybroker.config.json' );
# Start a default listener if no config found
$config = [ {} ] unless defined $config;
foreach my $listener (@$config) {
if ($listener->{users}) {
%{$self->{users}} = ( %{$self->{users}}, %{$listener->{users}} );
}
$self->start_listener( $listener );
}
}
sub start_listener {
my ($self, $listener) = @_;
weaken($self);
my $max_packet_size = $listener->{'max_packet_size'};
my $addr = $listener->{'listen_addr'} || '127.0.0.1'; # Must be an IPv4 or IPv6 address
my $port = $listener->{'listen_port'} || 1883;
($addr) = ($addr =~ m/^([\w\.:]+)$/); # untaint
($port) = ($port =~ m/^(\d+)$/);
log_info "Listening on $addr:$port";
$self->{"listener-$addr-$port"} = tcp_server ($addr, $port, sub {
my ($FH, $host, $port) = @_;
my $packet_type;
my $packet_flags;
my $rbuff_len;
my $packet_len;
my $mult;
my $offs;
my $byte;
my $fh; $fh = AnyEvent::Handle->new(
fh => $FH,
keepalive => 1,
no_delay => 1,
on_read => sub {
PARSE_PACKET: {
$rbuff_len = length $fh->{rbuf};
return unless $rbuff_len >= 2;
unless ($packet_type) {
$packet_len = 0;
$mult = 1;
$offs = 1;
PARSE_LEN: {
$byte = unpack "C", substr( $fh->{rbuf}, $offs++, 1 );
$packet_len += ($byte & 0x7f) * $mult;
last unless ($byte & 0x80);
return if ($offs >= $rbuff_len); # Not enough data
$mult *= 128;
redo if ($offs < 5);
}
if ($max_packet_size && $packet_len > $max_packet_size) {
$self->disconnect($fh, reason_code => 0x95);
return;
}
$byte = unpack('C', substr( $fh->{rbuf}, 0, 1 ));
$packet_type = $byte >> 4;
$packet_flags = $byte & 0x0F;
}
if ($rbuff_len < ($offs + $packet_len)) {
# Not enough data
return;
}
# Consume packet from buffer
( run in 0.790 second using v1.01-cache-2.11-cpan-8f98c5d2c55 )