Beekeeper
view release on metacpan or search on metacpan
lib/Beekeeper/Service/ToyBroker/Worker.pm view on Meta::CPAN
}
sub authorize_request {
my ($self, $req) = @_;
return BKPR_REQUEST_AUTHORIZED;
}
sub start_broker {
my ($self) = @_;
$self->{connections} = {};
$self->{clients} = {};
$self->{topics} = {};
$self->{users} = {};
my $config = Beekeeper::Config->read_config_file( 'toybroker.config.json' );
# Start a default listener if no config found
$config = [ {} ] unless defined $config;
foreach my $listener (@$config) {
if ($listener->{users}) {
%{$self->{users}} = ( %{$self->{users}}, %{$listener->{users}} );
}
$self->start_listener( $listener );
}
}
sub start_listener {
my ($self, $listener) = @_;
weaken($self);
my $max_packet_size = $listener->{'max_packet_size'};
my $addr = $listener->{'listen_addr'} || '127.0.0.1'; # Must be an IPv4 or IPv6 address
my $port = $listener->{'listen_port'} || 1883;
($addr) = ($addr =~ m/^([\w\.:]+)$/); # untaint
($port) = ($port =~ m/^(\d+)$/);
log_info "Listening on $addr:$port";
$self->{"listener-$addr-$port"} = tcp_server ($addr, $port, sub {
my ($FH, $host, $port) = @_;
my $packet_type;
my $packet_flags;
my $rbuff_len;
my $packet_len;
my $mult;
my $offs;
my $byte;
my $fh; $fh = AnyEvent::Handle->new(
fh => $FH,
keepalive => 1,
no_delay => 1,
on_read => sub {
PARSE_PACKET: {
$rbuff_len = length $fh->{rbuf};
return unless $rbuff_len >= 2;
unless ($packet_type) {
$packet_len = 0;
$mult = 1;
$offs = 1;
PARSE_LEN: {
$byte = unpack "C", substr( $fh->{rbuf}, $offs++, 1 );
$packet_len += ($byte & 0x7f) * $mult;
last unless ($byte & 0x80);
return if ($offs >= $rbuff_len); # Not enough data
$mult *= 128;
redo if ($offs < 5);
}
if ($max_packet_size && $packet_len > $max_packet_size) {
$self->disconnect($fh, reason_code => 0x95);
return;
}
$byte = unpack('C', substr( $fh->{rbuf}, 0, 1 ));
$packet_type = $byte >> 4;
$packet_flags = $byte & 0x0F;
}
if ($rbuff_len < ($offs + $packet_len)) {
# Not enough data
return;
}
# Consume packet from buffer
my $packet = substr($fh->{rbuf}, 0, ($offs + $packet_len), '');
# Trim fixed header from packet
substr($packet, 0, $offs, '');
if ($packet_type == MQTT_PUBLISH) {
$self->_receive_publish($fh, \$packet, $packet_flags);
}
elsif ($packet_type == MQTT_PUBACK) {
$self->_receive_puback($fh, \$packet);
}
elsif ($packet_type == MQTT_PINGREQ) {
$self->pingresp($fh);
}
elsif ($packet_type == MQTT_PINGRESP) {
$self->_receive_pingresp($fh);
lib/Beekeeper/Service/ToyBroker/Worker.pm view on Meta::CPAN
$self->_receive_auth($fh, \$packet);
}
else {
# Protocol error
log_warn "Received packet with unknown type $packet_type";
$self->disconnect($fh, reason_code => 0x81);
return;
}
# Prepare for next frame
undef $packet_type;
# Handle could have been destroyed at this point
redo PARSE_PACKET if defined $fh->{rbuf};
}
},
on_eof => sub {
# Clean disconnection, client will not write anymore
$self->remove_client($fh);
delete $self->{connections}->{"$fh"};
},
on_error => sub {
log_error "$_[2]\n";
$self->remove_client($fh);
delete $self->{connections}->{"$fh"};
}
);
$self->{connections}->{"$fh"} = $fh;
#TODO: Close connection on login timeout
# my $login_tmr = AnyEvent->timer( after => 5, cb => sub {
# $self->_shutdown($fh) unless $self->get_client($fh);
# });
});
}
sub _receive_connect {
my ($self, $fh, $packet) = @_;
my %prop;
my $offs = 0;
# 3.1.2.1 Protocol Name (utf8 string)
$prop{'protocol_name'} = _decode_utf8_str($packet, \$offs);
# 3.1.2.2 Protocol Version (byte)
$prop{'protocol_version'} = _decode_byte($packet, \$offs);
# 3.1.2.3 Connect Flags (byte)
my $flags = _decode_byte($packet, \$offs);
$prop{'clean_start'} = 1 if $flags & 0x02; # 3.1.2.4 Clean Start
$prop{'username'} = 1 if $flags & 0x80; # 3.1.2.8 User Name Flag
$prop{'password'} = 1 if $flags & 0x40; # 3.1.2.9 Password Flag
$prop{'will_flag'} = 1 if $flags & 0x04; # 3.1.2.5 Will Flag
$prop{'will_qos'} = ($flags & 0x18) >> 3; # 3.1.2.6 Will QoS
$prop{'will_retain'} = 1 if $flags & 0x20; # 3.1.2.7 Will Retain
# 3.1.2.10 Keep Alive (short int)
$prop{'keep_alive'} = _decode_int_16($packet, \$offs);
# 3.1.2.11.1 Properties Length (variable length int)
my $prop_len = _decode_var_int($packet, \$offs);
my $prop_end = $offs + $prop_len;
while ($offs < $prop_end) {
my $prop_id = _decode_byte($packet, \$offs);
if ($prop_id == MQTT_SESSION_EXPIRY_INTERVAL) {
# 3.1.2.11.2 Session Expiry Interval (long int)
$prop{'session_expiry_interval'} = _decode_int_32($packet, \$offs);
}
elsif ($prop_id == MQTT_RECEIVE_MAXIMUM) {
# 3.1.2.11.3 Receive Maximum (short int)
$prop{'receive_maximum'} = _decode_int_16($packet, \$offs);
}
elsif ($prop_id == MQTT_MAXIMUM_PACKET_SIZE) {
# 3.1.2.11.4 Maximum Packet Size (long int)
$prop{'maximum_packet_size'} = _decode_int_32($packet, \$offs);
}
elsif ($prop_id == MQTT_TOPIC_ALIAS_MAXIMUM) {
# 3.1.2.11.5 Topic Alias Maximum (short int)
$prop{'topic_alias_maximum'} = _decode_int_16($packet, \$offs);
}
elsif ($prop_id == MQTT_REQUEST_RESPONSE_INFORMATION) {
# 3.1.2.11.6 Request Response Information (byte)
$prop{'request_response_information'} = _decode_byte($packet, \$offs);
}
elsif ($prop_id == MQTT_REQUEST_PROBLEM_INFORMATION) {
# 3.1.2.11.7 Request Problem Information (byte)
$prop{'request_problem_information'} = _decode_byte($packet, \$offs);
}
elsif ($prop_id == MQTT_USER_PROPERTY) {
# 3.1.2.11.8 User Property (utf8 string pair)
my $key = _decode_utf8_str($packet, \$offs);
my $val = _decode_utf8_str($packet, \$offs);
$prop{$key} = $val;
}
elsif ($prop_id == MQTT_AUTHENTICATION_METHOD) {
# 3.1.2.11.9 Authentication Method (utf8 string)
$prop{'authentication_method'} = _decode_utf8_str($packet, \$offs);
}
elsif ($prop_id == MQTT_AUTHENTICATION_DATA) {
# 3.1.2.11.10 Authentication Data (binary data)
$prop{'authentication_data'} = _decode_binary_data($packet, \$offs);
}
else {
# Protocol error
log_warn "Received CONNECT with unknown property $prop_id";
$self->_shutdown($fh);
return;
}
}
# 3.1.3.1 Client Identifier (utf8 string)
$prop{'client_identifier'} = _decode_utf8_str($packet, \$offs);
if ($prop{'will'}) {
lib/Beekeeper/Service/ToyBroker/Worker.pm view on Meta::CPAN
my $raw_prop;
if (exists $args{'session_expiry_interval'}) {
# 3.2.2.3.2 Session Expiry Interval (long int)
$raw_prop .= pack("C N", MQTT_SESSION_EXPIRY_INTERVAL, delete $args{'session_expiry_interval'});
}
if (exists $args{'receive_maximum'}) {
# 3.2.2.3.3 Receive Maximum (short int)
$raw_prop .= pack("C n", MQTT_RECEIVE_MAXIMUM, delete $args{'receive_maximum'});
}
if (exists $args{'maximum_qos'}) {
# 3.2.2.3.4 Maximum QoS (byte)
$raw_prop .= pack("C C", MQTT_MAXIMUM_QOS, delete $args{'maximum_qos'});
}
if (exists $args{'retain_available'}) {
# 3.2.2.3.5 Retain Available (byte)
$raw_prop .= pack("C C", MQTT_RETAIN_AVAILABLE, delete $args{'retain_available'});
}
if (exists $args{'maximum_packet_size'}) {
# 3.2.2.3.6 Maximum Packet Size (long int)
$raw_prop .= pack("C N", MQTT_MAXIMUM_PACKET_SIZE, delete $args{'maximum_packet_size'});
}
if (exists $args{'assigned_client_identifier'}) {
# 3.2.2.3.7 Assigned Client Identifier (utf8 string)
utf8::encode( $args{'assigned_client_identifier'} );
$raw_prop .= pack("C n/a*", MQTT_ASSIGNED_CLIENT_IDENTIFIER, delete $args{'assigned_client_identifier'});
}
if (exists $args{'topic_alias_maximum'}) {
# 3.2.2.3.8 Topic Alias Maximum (short int)
$raw_prop .= pack("C n", MQTT_TOPIC_ALIAS_MAXIMUM, delete $args{'topic_alias_maximum'});
}
if (exists $args{'reason_string'}) {
# 3.2.2.3.9 Reason String (utf8 string)
utf8::encode( $args{'reason_string'} );
$raw_prop .= pack("C n/a*", MQTT_REASON_STRING, delete $args{'reason_string'});
}
if (exists $args{'wildcard_subscription_available'}) {
# 3.2.2.3.11 Wildcard Subscription Available (byte)
$raw_prop .= pack("C C", MQTT_WILDCARD_SUBSCRIPTION_AVAILABLE, delete $args{'wildcard_subscription_available'});
}
if (exists $args{'subscription_identifier_available'}) {
# 3.2.2.3.12 Subscription Identifiers Available (byte)
$raw_prop .= pack("C C", MQTT_SUBSCRIPTION_IDENTIFIER_AVAILABLE, delete $args{'subscription_identifier_available'});
}
if (exists $args{'shared_subscription_available'}) {
# 3.2.2.3.13 Shared Subscription Available (byte)
$raw_prop .= pack("C C", MQTT_SHARED_SUBSCRIPTION_AVAILABLE, delete $args{'shared_subscription_available'});
}
if (exists $args{'server_keep_alive'}) {
# 3.2.2.3.14 Server Keep Alive (short int)
$raw_prop .= pack("C n", MQTT_SERVER_KEEP_ALIVE, delete $args{'server_keep_alive'});
}
if (exists $args{'response_information'}) {
# 3.2.2.3.15 Response Information (utf8 string)
utf8::encode( $args{'response_information'} );
$raw_prop .= pack("C n/a*", MQTT_RESPONSE_INFORMATION, delete $args{'response_information'});
}
if (exists $args{'server_reference'}) {
# 3.2.2.3.16 Server Reference (utf8 string)
utf8::encode( $args{'server_reference'} );
$raw_prop .= pack("C n/a*", MQTT_SERVER_REFERENCE, delete $args{'server_reference'});
}
if (exists $args{'authentication_method'}) {
# 3.2.2.3.17 Authentication Method (utf8 string)
utf8::encode( $args{'authentication_method'} );
$raw_prop .= pack("C n/a*", MQTT_AUTHENTICATION_METHOD, delete $args{'authentication_method'});
}
if (exists $args{'authentication_data'}) {
# 3.2.2.3.18 Authentication Data (binary data)
$raw_prop .= pack("C n/a*", MQTT_AUTHENTICATION_DATA, delete $args{'authentication_data'});
}
foreach my $key (keys %args) {
# 3.2.2.3.10 User Property (utf8 string pair)
my $val = $args{$key};
next unless defined $val;
utf8::encode( $key );
utf8::encode( $val );
$raw_prop .= pack("C n/a* n/a*", MQTT_USER_PROPERTY, $key, $val);
}
# 3.2.2 Variable Header
# 3.2.2.1 Acknowledge flags (byte)
my $raw_mqtt = pack("C", $reason_code || 0);
# 3.2.2.2 Reason code (byte)
$raw_mqtt .= pack("C", $session_present ? 0x01 : 0);
# 3.2.2.3 Properties
$raw_mqtt .= _encode_var_int(length $raw_prop);
$raw_mqtt .= $raw_prop;
$fh->push_write(
pack("C", MQTT_CONNACK << 4) . # 3.2.1 Packet type
_encode_var_int(length $raw_mqtt) . # 3.2.1 Packet length
$raw_mqtt
);
}
sub _receive_disconnect {
my ($self, $fh, $packet) = @_;
# Handle abbreviated packet
$$packet = "\x00\x00" if (length $$packet == 0);
# 3.14.2.1 Reason Code (byte)
( run in 0.586 second using v1.01-cache-2.11-cpan-df04353d9ac )