Beekeeper
view release on metacpan or search on metacpan
lib/Beekeeper/Service/ToyBroker/Worker.pm view on Meta::CPAN
package Beekeeper::Service::ToyBroker::Worker;
use strict;
use warnings;
our $VERSION = '0.10';
use Beekeeper::Worker ':log';
use base 'Beekeeper::Worker';
use Beekeeper::MQTT qw(:const :decode);
use Beekeeper::Config;
use AnyEvent::Handle;
use AnyEvent::Socket;
use Scalar::Util 'weaken';
use Carp;
use constant DEBUG => 0;
sub new {
my ($class, %args) = @_;
my $self = $class->SUPER::new(%args);
$self->start_broker;
# Postponed initialization
$self->SUPER::__init_client;
$self->{_LOGGER}->{_BUS} = $self->{_BUS};
$self->SUPER::__init_auth_tokens;
$self->SUPER::__init_worker;
return $self;
}
sub __init_client { }
sub __init_auth_tokens { }
sub __init_worker { }
sub on_startup { }
sub on_shutdown {
my $self = shift;
log_info "Shutting down";
# Wait for clients to gracefully disconnect
for (1..60) {
my $conn_count = scalar keys %{$self->{connections}};
last if $conn_count <= 1; # our self connection
my $wait = AnyEvent->condvar;
my $tmr = AnyEvent->timer( after => 0.5, cb => $wait );
$wait->recv;
}
# Get rid of our self connection
$self->{_BUS}->disconnect;
log_info "Stopped";
}
sub authorize_request {
my ($self, $req) = @_;
return BKPR_REQUEST_AUTHORIZED;
}
sub start_broker {
my ($self) = @_;
$self->{connections} = {};
$self->{clients} = {};
$self->{topics} = {};
$self->{users} = {};
my $config = Beekeeper::Config->read_config_file( 'toybroker.config.json' );
# Start a default listener if no config found
$config = [ {} ] unless defined $config;
foreach my $listener (@$config) {
if ($listener->{users}) {
%{$self->{users}} = ( %{$self->{users}}, %{$listener->{users}} );
}
$self->start_listener( $listener );
}
}
sub start_listener {
my ($self, $listener) = @_;
weaken($self);
my $max_packet_size = $listener->{'max_packet_size'};
my $addr = $listener->{'listen_addr'} || '127.0.0.1'; # Must be an IPv4 or IPv6 address
my $port = $listener->{'listen_port'} || 1883;
($addr) = ($addr =~ m/^([\w\.:]+)$/); # untaint
($port) = ($port =~ m/^(\d+)$/);
log_info "Listening on $addr:$port";
$self->{"listener-$addr-$port"} = tcp_server ($addr, $port, sub {
my ($FH, $host, $port) = @_;
my $packet_type;
my $packet_flags;
my $rbuff_len;
my $packet_len;
lib/Beekeeper/Service/ToyBroker/Worker.pm view on Meta::CPAN
$self->_receive_subscribe($fh, \$packet);
}
elsif ($packet_type == MQTT_UNSUBSCRIBE) {
$self->_receive_unsubscribe($fh, \$packet);
}
elsif ($packet_type == MQTT_CONNECT) {
$self->_receive_connect($fh, \$packet);
}
elsif ($packet_type == MQTT_DISCONNECT) {
$self->_receive_disconnect($fh, \$packet);
}
elsif ($packet_type == MQTT_PUBREC) {
$self->_receive_pubrec($fh, \$packet);
}
elsif ($packet_type == MQTT_PUBREL) {
$self->_receive_pubrel($fh, \$packet);
}
elsif ($packet_type == MQTT_PUBCOMP) {
$self->_receive_pubcomp($fh, \$packet);
}
elsif ($packet_type == MQTT_AUTH) {
$self->_receive_auth($fh, \$packet);
}
else {
# Protocol error
log_warn "Received packet with unknown type $packet_type";
$self->disconnect($fh, reason_code => 0x81);
return;
}
# Prepare for next frame
undef $packet_type;
# Handle could have been destroyed at this point
redo PARSE_PACKET if defined $fh->{rbuf};
}
},
on_eof => sub {
# Clean disconnection, client will not write anymore
$self->remove_client($fh);
delete $self->{connections}->{"$fh"};
},
on_error => sub {
log_error "$_[2]\n";
$self->remove_client($fh);
delete $self->{connections}->{"$fh"};
}
);
$self->{connections}->{"$fh"} = $fh;
#TODO: Close connection on login timeout
# my $login_tmr = AnyEvent->timer( after => 5, cb => sub {
# $self->_shutdown($fh) unless $self->get_client($fh);
# });
});
}
sub _receive_connect {
my ($self, $fh, $packet) = @_;
my %prop;
my $offs = 0;
# 3.1.2.1 Protocol Name (utf8 string)
$prop{'protocol_name'} = _decode_utf8_str($packet, \$offs);
# 3.1.2.2 Protocol Version (byte)
$prop{'protocol_version'} = _decode_byte($packet, \$offs);
# 3.1.2.3 Connect Flags (byte)
my $flags = _decode_byte($packet, \$offs);
$prop{'clean_start'} = 1 if $flags & 0x02; # 3.1.2.4 Clean Start
$prop{'username'} = 1 if $flags & 0x80; # 3.1.2.8 User Name Flag
$prop{'password'} = 1 if $flags & 0x40; # 3.1.2.9 Password Flag
$prop{'will_flag'} = 1 if $flags & 0x04; # 3.1.2.5 Will Flag
$prop{'will_qos'} = ($flags & 0x18) >> 3; # 3.1.2.6 Will QoS
$prop{'will_retain'} = 1 if $flags & 0x20; # 3.1.2.7 Will Retain
# 3.1.2.10 Keep Alive (short int)
$prop{'keep_alive'} = _decode_int_16($packet, \$offs);
# 3.1.2.11.1 Properties Length (variable length int)
my $prop_len = _decode_var_int($packet, \$offs);
my $prop_end = $offs + $prop_len;
while ($offs < $prop_end) {
my $prop_id = _decode_byte($packet, \$offs);
if ($prop_id == MQTT_SESSION_EXPIRY_INTERVAL) {
# 3.1.2.11.2 Session Expiry Interval (long int)
$prop{'session_expiry_interval'} = _decode_int_32($packet, \$offs);
}
elsif ($prop_id == MQTT_RECEIVE_MAXIMUM) {
# 3.1.2.11.3 Receive Maximum (short int)
$prop{'receive_maximum'} = _decode_int_16($packet, \$offs);
}
elsif ($prop_id == MQTT_MAXIMUM_PACKET_SIZE) {
# 3.1.2.11.4 Maximum Packet Size (long int)
$prop{'maximum_packet_size'} = _decode_int_32($packet, \$offs);
}
elsif ($prop_id == MQTT_TOPIC_ALIAS_MAXIMUM) {
# 3.1.2.11.5 Topic Alias Maximum (short int)
$prop{'topic_alias_maximum'} = _decode_int_16($packet, \$offs);
}
elsif ($prop_id == MQTT_REQUEST_RESPONSE_INFORMATION) {
# 3.1.2.11.6 Request Response Information (byte)
$prop{'request_response_information'} = _decode_byte($packet, \$offs);
}
elsif ($prop_id == MQTT_REQUEST_PROBLEM_INFORMATION) {
# 3.1.2.11.7 Request Problem Information (byte)
$prop{'request_problem_information'} = _decode_byte($packet, \$offs);
( run in 2.796 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )