Beekeeper

 view release on metacpan or  search on metacpan

lib/Beekeeper/Service/Router/Worker.pm  view on Meta::CPAN

package Beekeeper::Service::Router::Worker;

use strict;
use warnings;

our $VERSION = '0.10';

use Beekeeper::Worker ':log';
use base 'Beekeeper::Worker';

use Beekeeper::Worker::Extension::SharedCache;
use Scalar::Util 'weaken';

use constant FRONTEND_ROLE   =>'frontend';
use constant SESSION_TIMEOUT => 1800;
use constant SHUTDOWN_WAIT   => 2;
use constant QUEUE_LANES     => 2;
use constant DEBUG           => 0;

$Beekeeper::Worker::LogLevel = 9 if DEBUG;


sub authorize_request {
    my ($self, $req) = @_;

    return unless $self->__has_authorization_token('BKPR_ROUTER');

    return BKPR_REQUEST_AUTHORIZED;
}

sub on_startup {
    my $self = shift;
    weaken $self;

    my $worker_config = $self->{_WORKER}->{config};
    my $bus_config    = $self->{_WORKER}->{bus_config};

    $self->{sess_timeout}  = $worker_config->{'session_timeout'} || SESSION_TIMEOUT;
    $self->{shutdown_wait} = $worker_config->{'shutdown_wait'}   || SHUTDOWN_WAIT;
    $self->{frontend_role} = $worker_config->{'frontend_role'}   || FRONTEND_ROLE;

    $self->_init_routing_table;

    my $frontend_role = $self->{frontend_role};
    my $frontends_config = Beekeeper::Config->get_bus_group_config( bus_role => $frontend_role );

    unless (@$frontends_config) {
        die "No bus with role '$frontend_role' was found into config file bus.config.json\n";
    }

    $self->{wait_frontends_up} = AnyEvent->condvar;

    # Create a connection to every frontend
    foreach my $config (@$frontends_config) {

        $self->init_frontend_connection( $config );
    }

    # Ping frontend brokers to avoid disconnections due to inactivity
    $self->{ping_timer} = AnyEvent->timer(
        after    => 60 * rand(),
        interval => 60,
        cb       => sub { $self->ping_frontend_brokers },
    );
}

sub init_frontend_connection {
    my ($self, $config) = @_;
    weaken $self;

    my $bus_id  = $config->{'bus_id'};
    my $back_id = $self->{_BUS}->bus_id;

    $self->{wait_frontends_up}->begin;

    my $bus; $bus = Beekeeper::MQTT->new( 
        %$config,
        bus_id   => $bus_id,
        timeout  => 60,
        on_error => sub {
            # Reconnect
            my $errmsg = $_[0] || ""; $errmsg =~ s/\s+/ /sg;
            log_alert "Connection to $bus_id failed: $errmsg";
            delete $self->{FRONTEND}->{$bus_id};
            $self->{wait_frontends_up}->end;
            my $delay = $self->{connect_err}->{$bus_id}++;
            $self->{reconnect_tmr}->{$bus_id} = AnyEvent->timer(
                after => ($delay < 10 ? $delay * 3 : 30),
                cb => sub {
                    $bus->connect(
                        on_connack => sub {
                            # Setup routing
                            log_warn "Rerouting: $back_id <--> $bus_id";
                            $self->{FRONTEND}->{$bus_id} = $bus;
                            $self->pull_frontend_requests( frontend => $bus );
                        }
                    );
                },
            );
        },
    );

    $bus->connect(
        on_connack => sub {
            # Setup routing
            log_info "Routing: $back_id <--> $bus_id";
            $self->{FRONTEND}->{$bus_id} = $bus;
            $self->{wait_frontends_up}->end;
            $self->pull_frontend_requests( frontend => $bus );
            $self->pull_backend_responses( frontend => $bus );
            $self->pull_backend_notifications( frontend => $bus );
        },
    );
}

sub on_shutdown {
    my ($self, %args) = @_;

    log_info "Shutting down";

    my $frontend_role = $self->{frontend_role};

    my $backend_bus  = $self->{_BUS};
    my $backend_role = $self->{_BUS}->{bus_role};

    my $cv = AnyEvent->condvar;

    # 1. Do not pull frontend requests anymore
    foreach my $frontend_bus (values %{$self->{FRONTEND}}) {

        foreach my $lane (1..QUEUE_LANES) {

            my $topic = "\$share/BKPR/req/$backend_role-$lane";
            $cv->begin;
            $frontend_bus->unsubscribe(
                topic       => $topic,
                on_unsuback => sub {
                    my ($success, $prop) = @_;
                    log_error "Could not unsubscribe from $topic" unless $success;
                    $cv->end;
                }
            );
        }
    }

    # 2. Stop forwarding notifications to frontend
    foreach my $lane (1..QUEUE_LANES) {

        my $topic = "\$share/BKPR/msg/$frontend_role-$lane";
        $cv->begin;
        $backend_bus->unsubscribe(
            topic       => $topic,
            on_unsuback => sub {
                my ($success, $prop) = @_;
                log_error "Could not unsubscribe from $topic" unless $success;
                $cv->end;
            }
        );
    }

    # 3. Wait for unsubacks, assuring that no more requests or messages are buffered 
    my $tmr = AnyEvent->timer( after => 30, cb => sub { $cv->send });
    $cv->recv;

    # 4. Just in case of pool full stop, wait for workers to finish their current tasks
    my $wait = AnyEvent->condvar;
    $tmr = AnyEvent->timer( after => $self->{shutdown_wait}, cb => sub { $wait->send });
    $wait->recv;

    $cv = AnyEvent->condvar;

    # 5. Stop forwarding responses to frontend
    foreach my $frontend_bus (values %{$self->{FRONTEND}}) {

        my $frontend_id = $frontend_bus->bus_id;

        foreach my $lane (1..QUEUE_LANES) {

            my $topic = "\$share/BKPR/res/$frontend_id-$lane";
            $cv->begin;
            $backend_bus->unsubscribe(
                topic       => $topic,
                on_unsuback => sub {
                    my ($success, $prop) = @_;
                    log_error "Could not unsubscribe from $topic" unless $success;
                    $cv->end;
                }
            );
        }
    }

    # 6. Wait for unsubacks, assuring that no more responses are buffered 
    $tmr = AnyEvent->timer( after => 30, cb => sub { $cv->send });
    $cv->recv;

    # Disconnect from all frontends
    my @frontends = values %{$self->{FRONTEND}};
    foreach my $frontend_bus (@frontends) {

        next unless ($frontend_bus->{is_connected});
        $frontend_bus->disconnect;
    }

    # Disconnect shared cache
    undef $self->{MqttSessions};

    log_info "Stopped";
}

sub pull_frontend_requests {
    my ($self, %args) = @_;
    weaken $self;

    # Get requests from frontend bus and forward them to backend bus
    #
    # from:  req/backend-n                @frontend
    # to:    req/backend/{app}/{service}  @backend

    my $frontend_bus = $args{frontend};
    my $frontend_id  = $frontend_bus->bus_id;

    my $backend_bus  = $self->{_BUS};
    my $backend_id   = $backend_bus->bus_id;
    my $backend_role = $backend_bus->bus_role;

    foreach my $lane (1..QUEUE_LANES) {

        my $src_queue = "\$share/BKPR/req/$backend_role-$lane";

        my ($payload_ref, $mqtt_properties);
        my ($dest_queue, $reply_to, $caller_id, $mqtt_session);
        my %pub_args;

        $frontend_bus->subscribe(
            topic       => $src_queue,
            maximum_qos => 0,
            on_publish  => sub {
                ($payload_ref, $mqtt_properties) = @_;

                # (!) UNTRUSTED REQUEST

                # eg: req/backend/myapp/service
                $dest_queue = $mqtt_properties->{'fwd_to'} || '';
                return unless $dest_queue =~ m|^req(?:/(?!_)[\w-]+)+$|;

                # eg: priv/7nXDsxMDwgLUSedX
                $reply_to = $mqtt_properties->{'response_topic'} || '';
                return unless $reply_to =~ m|^priv/(\w{16,23})$|;
                $caller_id = $1;

                #TODO: Extra sanity checks could be done here before forwarding to backend

                %pub_args = (



( run in 1.093 second using v1.01-cache-2.11-cpan-39bf76dae61 )