Beekeeper
view release on metacpan or search on metacpan
lib/Beekeeper/Service/Router/Worker.pm view on Meta::CPAN
package Beekeeper::Service::Router::Worker;
use strict;
use warnings;
our $VERSION = '0.10';
use Beekeeper::Worker ':log';
use base 'Beekeeper::Worker';
use Beekeeper::Worker::Extension::SharedCache;
use Scalar::Util 'weaken';
use constant FRONTEND_ROLE =>'frontend';
use constant SESSION_TIMEOUT => 1800;
use constant SHUTDOWN_WAIT => 2;
use constant QUEUE_LANES => 2;
use constant DEBUG => 0;
$Beekeeper::Worker::LogLevel = 9 if DEBUG;
sub authorize_request {
my ($self, $req) = @_;
return unless $self->__has_authorization_token('BKPR_ROUTER');
return BKPR_REQUEST_AUTHORIZED;
}
sub on_startup {
my $self = shift;
weaken $self;
my $worker_config = $self->{_WORKER}->{config};
my $bus_config = $self->{_WORKER}->{bus_config};
$self->{sess_timeout} = $worker_config->{'session_timeout'} || SESSION_TIMEOUT;
$self->{shutdown_wait} = $worker_config->{'shutdown_wait'} || SHUTDOWN_WAIT;
$self->{frontend_role} = $worker_config->{'frontend_role'} || FRONTEND_ROLE;
$self->_init_routing_table;
my $frontend_role = $self->{frontend_role};
my $frontends_config = Beekeeper::Config->get_bus_group_config( bus_role => $frontend_role );
unless (@$frontends_config) {
die "No bus with role '$frontend_role' was found into config file bus.config.json\n";
}
$self->{wait_frontends_up} = AnyEvent->condvar;
# Create a connection to every frontend
foreach my $config (@$frontends_config) {
$self->init_frontend_connection( $config );
}
# Ping frontend brokers to avoid disconnections due to inactivity
$self->{ping_timer} = AnyEvent->timer(
after => 60 * rand(),
interval => 60,
cb => sub { $self->ping_frontend_brokers },
);
}
sub init_frontend_connection {
my ($self, $config) = @_;
weaken $self;
my $bus_id = $config->{'bus_id'};
my $back_id = $self->{_BUS}->bus_id;
$self->{wait_frontends_up}->begin;
my $bus; $bus = Beekeeper::MQTT->new(
%$config,
bus_id => $bus_id,
timeout => 60,
on_error => sub {
# Reconnect
my $errmsg = $_[0] || ""; $errmsg =~ s/\s+/ /sg;
log_alert "Connection to $bus_id failed: $errmsg";
delete $self->{FRONTEND}->{$bus_id};
$self->{wait_frontends_up}->end;
my $delay = $self->{connect_err}->{$bus_id}++;
$self->{reconnect_tmr}->{$bus_id} = AnyEvent->timer(
after => ($delay < 10 ? $delay * 3 : 30),
cb => sub {
$bus->connect(
on_connack => sub {
# Setup routing
log_warn "Rerouting: $back_id <--> $bus_id";
$self->{FRONTEND}->{$bus_id} = $bus;
$self->pull_frontend_requests( frontend => $bus );
}
);
},
);
},
);
$bus->connect(
on_connack => sub {
# Setup routing
log_info "Routing: $back_id <--> $bus_id";
$self->{FRONTEND}->{$bus_id} = $bus;
$self->{wait_frontends_up}->end;
$self->pull_frontend_requests( frontend => $bus );
$self->pull_backend_responses( frontend => $bus );
$self->pull_backend_notifications( frontend => $bus );
},
);
}
sub on_shutdown {
my ($self, %args) = @_;
log_info "Shutting down";
my $frontend_role = $self->{frontend_role};
my $backend_bus = $self->{_BUS};
my $backend_role = $self->{_BUS}->{bus_role};
my $cv = AnyEvent->condvar;
# 1. Do not pull frontend requests anymore
foreach my $frontend_bus (values %{$self->{FRONTEND}}) {
foreach my $lane (1..QUEUE_LANES) {
my $topic = "\$share/BKPR/req/$backend_role-$lane";
$cv->begin;
$frontend_bus->unsubscribe(
topic => $topic,
on_unsuback => sub {
my ($success, $prop) = @_;
log_error "Could not unsubscribe from $topic" unless $success;
$cv->end;
}
);
}
}
# 2. Stop forwarding notifications to frontend
foreach my $lane (1..QUEUE_LANES) {
my $topic = "\$share/BKPR/msg/$frontend_role-$lane";
$cv->begin;
$backend_bus->unsubscribe(
topic => $topic,
on_unsuback => sub {
my ($success, $prop) = @_;
log_error "Could not unsubscribe from $topic" unless $success;
$cv->end;
}
);
}
# 3. Wait for unsubacks, assuring that no more requests or messages are buffered
my $tmr = AnyEvent->timer( after => 30, cb => sub { $cv->send });
$cv->recv;
# 4. Just in case of pool full stop, wait for workers to finish their current tasks
my $wait = AnyEvent->condvar;
$tmr = AnyEvent->timer( after => $self->{shutdown_wait}, cb => sub { $wait->send });
$wait->recv;
$cv = AnyEvent->condvar;
# 5. Stop forwarding responses to frontend
foreach my $frontend_bus (values %{$self->{FRONTEND}}) {
my $frontend_id = $frontend_bus->bus_id;
foreach my $lane (1..QUEUE_LANES) {
my $topic = "\$share/BKPR/res/$frontend_id-$lane";
$cv->begin;
$backend_bus->unsubscribe(
topic => $topic,
on_unsuback => sub {
my ($success, $prop) = @_;
log_error "Could not unsubscribe from $topic" unless $success;
$cv->end;
}
);
}
}
# 6. Wait for unsubacks, assuring that no more responses are buffered
$tmr = AnyEvent->timer( after => 30, cb => sub { $cv->send });
$cv->recv;
# Disconnect from all frontends
my @frontends = values %{$self->{FRONTEND}};
foreach my $frontend_bus (@frontends) {
next unless ($frontend_bus->{is_connected});
$frontend_bus->disconnect;
}
# Disconnect shared cache
undef $self->{MqttSessions};
log_info "Stopped";
}
sub pull_frontend_requests {
my ($self, %args) = @_;
weaken $self;
# Get requests from frontend bus and forward them to backend bus
#
# from: req/backend-n @frontend
# to: req/backend/{app}/{service} @backend
my $frontend_bus = $args{frontend};
my $frontend_id = $frontend_bus->bus_id;
my $backend_bus = $self->{_BUS};
my $backend_id = $backend_bus->bus_id;
my $backend_role = $backend_bus->bus_role;
foreach my $lane (1..QUEUE_LANES) {
my $src_queue = "\$share/BKPR/req/$backend_role-$lane";
my ($payload_ref, $mqtt_properties);
my ($dest_queue, $reply_to, $caller_id, $mqtt_session);
my %pub_args;
$frontend_bus->subscribe(
topic => $src_queue,
maximum_qos => 0,
on_publish => sub {
($payload_ref, $mqtt_properties) = @_;
# (!) UNTRUSTED REQUEST
# eg: req/backend/myapp/service
$dest_queue = $mqtt_properties->{'fwd_to'} || '';
return unless $dest_queue =~ m|^req(?:/(?!_)[\w-]+)+$|;
# eg: priv/7nXDsxMDwgLUSedX
$reply_to = $mqtt_properties->{'response_topic'} || '';
return unless $reply_to =~ m|^priv/(\w{16,23})$|;
$caller_id = $1;
#TODO: Extra sanity checks could be done here before forwarding to backend
%pub_args = (
( run in 1.093 second using v1.01-cache-2.11-cpan-39bf76dae61 )