Beekeeper

 view release on metacpan or  search on metacpan

lib/Beekeeper/Service/LogTail/Worker.pm  view on Meta::CPAN

package Beekeeper::Service::LogTail::Worker;

use strict;
use warnings;

our $VERSION = '0.10';

use Beekeeper::Worker ':log';
use base 'Beekeeper::Worker';

use Beekeeper::Logger ':log_levels';
use Scalar::Util 'weaken';
use JSON::XS;

my @Log_buffer;


sub authorize_request {
    my ($self, $req) = @_;

    return unless $self->__has_authorization_token('BKPR_ADMIN');

    return BKPR_REQUEST_AUTHORIZED;
}

sub on_startup {
    my $self = shift;
    weaken $self;

    $self->{max_entries} = $self->{config}->{buffer_entries} || 20000;
    $self->{log_level}   = $self->{config}->{log_level}      || LOG_DEBUG;

    $self->_connect_to_all_brokers;

    $self->accept_remote_calls(
        '_bkpr.logtail.tail' => 'tail',
    );

    # Ping backend brokers to avoid disconnections due to inactivity
    $self->{ping_timer} = AnyEvent->timer(
        after    => 60 * rand(),
        interval => 60,
        cb       => sub { $self->_ping_backend_brokers },
    );

    log_info "Ready";
}

sub _connect_to_all_brokers {
    my $self = shift;
    weaken $self;

    my $own_bus = $self->{_BUS};
    my $group_config = Beekeeper::Config->get_bus_group_config( bus_id => $own_bus->bus_id );

    $self->{_BUS_GROUP} = [];

    foreach my $config (@$group_config) {

        my $bus_id = $config->{'bus_id'};

        if ($bus_id eq $own_bus->bus_id) {
            # Already connected to our own bus
            $self->_collect_log($own_bus);
            next;
        }

        my $bus; $bus = Beekeeper::MQTT->new( 
            %$config,
            bus_id   => $bus_id,
            timeout  => 300,
            on_error => sub {
                # Reconnect
                my $errmsg = $_[0] || ""; $errmsg =~ s/\s+/ /sg;
                log_error "Connection to $bus_id failed: $errmsg";
                my $delay = $self->{connect_err}->{$bus_id}++;
                $self->{reconnect_tmr}->{$bus_id} = AnyEvent->timer(
                    after => ($delay < 10 ? $delay * 3 : 30),
                    cb => sub {
                        $bus->connect(
                            on_connack => sub {
                                # Setup subscriptions
                                log_warn "Reconnected to $bus_id";
                                $self->_collect_log($bus);
                            }
                        );
                    },
                );
            },
        );

        push @{$self->{_BUS_GROUP}}, $bus;

        $bus->connect(
            on_connack => sub {
                # Setup subscriptions
                $self->_collect_log($bus);
            }
        );
    }
}

sub _collect_log {
    my ($self, $bus) = @_;

    # Default logger logs to topic log/$level/$service

    my $max_entries = $self->{max_entries};
    my $log_level   = $self->{log_level};
    my $worker      = $self->{_WORKER};

    foreach my $level (1..$log_level) {

        my $topic = "log/$level/#";
        my $req;

        $bus->subscribe(
            topic      => $topic,
            on_publish => sub {
              # my ($payload_ref, $mqtt_properties) = @_;

                $req = decode_json( ${$_[0]} );

                push @Log_buffer, $req->{params};

                shift @Log_buffer if (@Log_buffer > $max_entries);

                # Track number of collected log entries
                $worker->{notif_count}++;
            },
            on_suback => sub {
                my ($success, $prop) = @_;
                die "Could not subscribe to log topic '$topic'" unless $success;
            },
        );
    }
}



( run in 1.563 second using v1.01-cache-2.11-cpan-39bf76dae61 )