Beekeeper
view release on metacpan or search on metacpan
lib/Beekeeper/Service/LogTail/Worker.pm view on Meta::CPAN
package Beekeeper::Service::LogTail::Worker;
use strict;
use warnings;
our $VERSION = '0.10';
use Beekeeper::Worker ':log';
use base 'Beekeeper::Worker';
use Beekeeper::Logger ':log_levels';
use Scalar::Util 'weaken';
use JSON::XS;
my @Log_buffer;
sub authorize_request {
my ($self, $req) = @_;
return unless $self->__has_authorization_token('BKPR_ADMIN');
return BKPR_REQUEST_AUTHORIZED;
}
sub on_startup {
my $self = shift;
weaken $self;
$self->{max_entries} = $self->{config}->{buffer_entries} || 20000;
$self->{log_level} = $self->{config}->{log_level} || LOG_DEBUG;
$self->_connect_to_all_brokers;
$self->accept_remote_calls(
'_bkpr.logtail.tail' => 'tail',
);
# Ping backend brokers to avoid disconnections due to inactivity
$self->{ping_timer} = AnyEvent->timer(
after => 60 * rand(),
interval => 60,
cb => sub { $self->_ping_backend_brokers },
);
log_info "Ready";
}
sub _connect_to_all_brokers {
my $self = shift;
weaken $self;
my $own_bus = $self->{_BUS};
my $group_config = Beekeeper::Config->get_bus_group_config( bus_id => $own_bus->bus_id );
$self->{_BUS_GROUP} = [];
foreach my $config (@$group_config) {
my $bus_id = $config->{'bus_id'};
if ($bus_id eq $own_bus->bus_id) {
# Already connected to our own bus
$self->_collect_log($own_bus);
next;
}
my $bus; $bus = Beekeeper::MQTT->new(
%$config,
bus_id => $bus_id,
timeout => 300,
on_error => sub {
# Reconnect
my $errmsg = $_[0] || ""; $errmsg =~ s/\s+/ /sg;
log_error "Connection to $bus_id failed: $errmsg";
my $delay = $self->{connect_err}->{$bus_id}++;
$self->{reconnect_tmr}->{$bus_id} = AnyEvent->timer(
after => ($delay < 10 ? $delay * 3 : 30),
cb => sub {
$bus->connect(
on_connack => sub {
# Setup subscriptions
log_warn "Reconnected to $bus_id";
$self->_collect_log($bus);
}
);
},
);
},
);
push @{$self->{_BUS_GROUP}}, $bus;
$bus->connect(
on_connack => sub {
# Setup subscriptions
$self->_collect_log($bus);
}
);
}
}
sub _collect_log {
my ($self, $bus) = @_;
# Default logger logs to topic log/$level/$service
my $max_entries = $self->{max_entries};
my $log_level = $self->{log_level};
my $worker = $self->{_WORKER};
foreach my $level (1..$log_level) {
my $topic = "log/$level/#";
my $req;
$bus->subscribe(
topic => $topic,
on_publish => sub {
# my ($payload_ref, $mqtt_properties) = @_;
$req = decode_json( ${$_[0]} );
push @Log_buffer, $req->{params};
shift @Log_buffer if (@Log_buffer > $max_entries);
# Track number of collected log entries
$worker->{notif_count}++;
},
on_suback => sub {
my ($success, $prop) = @_;
die "Could not subscribe to log topic '$topic'" unless $success;
},
);
}
}
( run in 1.563 second using v1.01-cache-2.11-cpan-39bf76dae61 )