view release on metacpan or search on metacpan
examples/dashboard/lib/Beekeeper/Service/Dashboard/Worker.pm view on Meta::CPAN
use Beekeeper::Worker ':log';
use base 'Beekeeper::Worker';
use Beekeeper::Worker::Extension::RemoteSession;
use Beekeeper::Service::Supervisor;
use Beekeeper::Service::LogTail;
use Beekeeper::Config;
use AnyEvent;
use JSON::XS;
use Scalar::Util 'weaken';
use Digest::SHA 'sha256_hex';
use Fcntl qw(:DEFAULT :flock);
use Time::HiRes;
use constant AVERAGE => 0;
use constant MAXIMUM => 1;
use constant TOTAL => 2;
use constant COUNT => 3;
use constant TSTAMP => 0;
use constant STATS => 1;
examples/dashboard/lib/Beekeeper/Service/Dashboard/Worker.pm view on Meta::CPAN
on_error => sub {
my ($resp) = @_;
$req->send_response( $resp );
},
);
}
sub _init_collector {
my ($self) = @_;
weaken($self);
$self->_load_state;
$self->{services_1s} ||= [];
$self->{services_5s} ||= [];
$self->{services_2m} ||= [];
$self->{services_15m} ||= [];
$self->{services_1h} ||= [];
my $now = Time::HiRes::time;
lib/Beekeeper/MQTT.pm view on Meta::CPAN
use strict;
use warnings;
our $VERSION = '0.10';
use AnyEvent;
use AnyEvent::Handle;
use Time::HiRes;
use List::Util 'shuffle';
use Scalar::Util 'weaken';
use Exporter 'import';
use Carp;
our @EXPORT_OK;
our %EXPORT_TAGS;
our $DEBUG = 0;
EXPORT: {
my (@const, @encode);
lib/Beekeeper/MQTT.pm view on Meta::CPAN
$self->_connect;
$self->{connect_cv}->recv if $args{'blocking'};
$self->{connect_cv} = undef;
return $args{'blocking'} ? $self->{is_connected} : 1;
}
sub _connect {
my ($self) = @_;
weaken($self);
my $config = $self->{config};
my $timeout = $config->{'timeout'};
$timeout = 30 unless defined $timeout;
# Ensure that timeout is set properly when the event loop was blocked
AnyEvent->now_update;
# Connection timeout handler
lib/Beekeeper/MQTT.pm view on Meta::CPAN
pack("C", MQTT_UNSUBSCRIBE << 4 | 0x02) . # 3.10.1 Packet type
_encode_var_int(length $raw_mqtt) . # 3.10.1 Packet length
$raw_mqtt
);
1;
}
sub _receive_unsuback {
my ($self, $packet) = @_;
weaken($self);
# 3.11.2 Packet id (short int)
my $offs = 0;
my $packet_id = _decode_int_16($packet, \$offs);
# 3.11.2.1.1 Property Length (variable length int)
my $prop_len = _decode_var_int($packet, \$offs);
my $prop_end = $offs + $prop_len;
my %prop;
lib/Beekeeper/Service/LogTail/Worker.pm view on Meta::CPAN
use strict;
use warnings;
our $VERSION = '0.10';
use Beekeeper::Worker ':log';
use base 'Beekeeper::Worker';
use Beekeeper::Logger ':log_levels';
use Scalar::Util 'weaken';
use JSON::XS;
my @Log_buffer;
sub authorize_request {
my ($self, $req) = @_;
return unless $self->__has_authorization_token('BKPR_ADMIN');
return BKPR_REQUEST_AUTHORIZED;
}
sub on_startup {
my $self = shift;
weaken $self;
$self->{max_entries} = $self->{config}->{buffer_entries} || 20000;
$self->{log_level} = $self->{config}->{log_level} || LOG_DEBUG;
$self->_connect_to_all_brokers;
$self->accept_remote_calls(
'_bkpr.logtail.tail' => 'tail',
);
lib/Beekeeper/Service/LogTail/Worker.pm view on Meta::CPAN
after => 60 * rand(),
interval => 60,
cb => sub { $self->_ping_backend_brokers },
);
log_info "Ready";
}
sub _connect_to_all_brokers {
my $self = shift;
weaken $self;
my $own_bus = $self->{_BUS};
my $group_config = Beekeeper::Config->get_bus_group_config( bus_id => $own_bus->bus_id );
$self->{_BUS_GROUP} = [];
foreach my $config (@$group_config) {
my $bus_id = $config->{'bus_id'};
lib/Beekeeper/Service/Router/Worker.pm view on Meta::CPAN
use strict;
use warnings;
our $VERSION = '0.10';
use Beekeeper::Worker ':log';
use base 'Beekeeper::Worker';
use Beekeeper::Worker::Extension::SharedCache;
use Scalar::Util 'weaken';
use constant FRONTEND_ROLE =>'frontend';
use constant SESSION_TIMEOUT => 1800;
use constant SHUTDOWN_WAIT => 2;
use constant QUEUE_LANES => 2;
use constant DEBUG => 0;
$Beekeeper::Worker::LogLevel = 9 if DEBUG;
sub authorize_request {
my ($self, $req) = @_;
return unless $self->__has_authorization_token('BKPR_ROUTER');
return BKPR_REQUEST_AUTHORIZED;
}
sub on_startup {
my $self = shift;
weaken $self;
my $worker_config = $self->{_WORKER}->{config};
my $bus_config = $self->{_WORKER}->{bus_config};
$self->{sess_timeout} = $worker_config->{'session_timeout'} || SESSION_TIMEOUT;
$self->{shutdown_wait} = $worker_config->{'shutdown_wait'} || SHUTDOWN_WAIT;
$self->{frontend_role} = $worker_config->{'frontend_role'} || FRONTEND_ROLE;
$self->_init_routing_table;
lib/Beekeeper/Service/Router/Worker.pm view on Meta::CPAN
# Ping frontend brokers to avoid disconnections due to inactivity
$self->{ping_timer} = AnyEvent->timer(
after => 60 * rand(),
interval => 60,
cb => sub { $self->ping_frontend_brokers },
);
}
sub init_frontend_connection {
my ($self, $config) = @_;
weaken $self;
my $bus_id = $config->{'bus_id'};
my $back_id = $self->{_BUS}->bus_id;
$self->{wait_frontends_up}->begin;
my $bus; $bus = Beekeeper::MQTT->new(
%$config,
bus_id => $bus_id,
timeout => 60,
lib/Beekeeper/Service/Router/Worker.pm view on Meta::CPAN
}
# Disconnect shared cache
undef $self->{MqttSessions};
log_info "Stopped";
}
sub pull_frontend_requests {
my ($self, %args) = @_;
weaken $self;
# Get requests from frontend bus and forward them to backend bus
#
# from: req/backend-n @frontend
# to: req/backend/{app}/{service} @backend
my $frontend_bus = $args{frontend};
my $frontend_id = $frontend_bus->bus_id;
my $backend_bus = $self->{_BUS};
lib/Beekeeper/Service/Router/Worker.pm view on Meta::CPAN
},
on_suback => sub {
log_debug "Forwarding $src_queue \@$backend_id --> priv/{session_id} \@$frontend_id";
}
);
}
}
sub pull_backend_notifications {
my ($self, %args) = @_;
weaken($self);
# Get notifications from backend bus and broadcast them to all frontend buses
#
# from: msg/frontend-n @backend
# to: msg/frontend/{app}/{service}/{method} @frontend
unless (keys %{$self->{FRONTEND}} && $self->{wait_frontends_up}->ready) {
# Wait until connected to all (working) frontends before pulling
# notifications otherwise messages cannot be broadcasted properly
#TODO: MQTT: broker will discard messages unless someone subscribes
lib/Beekeeper/Service/ToyBroker/Worker.pm view on Meta::CPAN
our $VERSION = '0.10';
use Beekeeper::Worker ':log';
use base 'Beekeeper::Worker';
use Beekeeper::MQTT qw(:const :decode);
use Beekeeper::Config;
use AnyEvent::Handle;
use AnyEvent::Socket;
use Scalar::Util 'weaken';
use Carp;
use constant DEBUG => 0;
sub new {
my ($class, %args) = @_;
my $self = $class->SUPER::new(%args);
lib/Beekeeper/Service/ToyBroker/Worker.pm view on Meta::CPAN
if ($listener->{users}) {
%{$self->{users}} = ( %{$self->{users}}, %{$listener->{users}} );
}
$self->start_listener( $listener );
}
}
sub start_listener {
my ($self, $listener) = @_;
weaken($self);
my $max_packet_size = $listener->{'max_packet_size'};
my $addr = $listener->{'listen_addr'} || '127.0.0.1'; # Must be an IPv4 or IPv6 address
my $port = $listener->{'listen_port'} || 1883;
($addr) = ($addr =~ m/^([\w\.:]+)$/); # untaint
($port) = ($port =~ m/^(\d+)$/);
log_info "Listening on $addr:$port";
lib/Beekeeper/Service/ToyBroker/Worker.pm view on Meta::CPAN
my ($self, $fh, $packet) = @_;
$self->disconnect($fh, reason_code => 0x9B);
}
#------------------------------------------------------------------------------
sub add_client {
my ($self, $fh, $prop) = @_;
weaken($self);
my $client_id = $prop->{'client_identifier'};
my $username = $prop->{'username'};
my $password = $prop->{'password'};
my $users_cfg = $self->{'users'};
my $authorized;
AUTH: {
lib/Beekeeper/Worker/Extension/SharedCache.pm view on Meta::CPAN
);
}
package
Beekeeper::Worker::Extension::SharedCache::Cache; # hide from PAUSE
use Beekeeper::Worker ':log';
use AnyEvent;
use JSON::XS;
use Fcntl qw(:DEFAULT :flock);
use Scalar::Util 'weaken';
use Carp;
use constant SYNC_REQUEST_TIMEOUT => 30;
# Show errors from perspective of caller
$Carp::Internal{(__PACKAGE__)}++;
sub new {
my ($class, %args) = @_;
lib/Beekeeper/Worker/Extension/SharedCache.pm view on Meta::CPAN
_BUS_GROUP=> undef,
};
bless $self, $class;
$self->_load_state if $self->{persist};
$self->_connect_to_all_brokers($worker);
my $Self = $self;
weaken $Self;
AnyEvent->now_update;
if ($self->{max_age}) {
$self->{gc_timer} = AnyEvent->timer(
after => $self->{max_age} * rand() + 60,
interval => $self->{max_age},
cb => sub { $Self->_gc },
);
lib/Beekeeper/Worker/Extension/SharedCache.pm view on Meta::CPAN
after => 60 * rand(),
interval => 60,
cb => sub { $Self->_ping_backend_brokers },
);
return $self;
}
sub _connect_to_all_brokers {
my ($self, $worker) = @_;
weaken $self;
#TODO: using multiple shared_cache from the same worker will cause multiple bus connections
my $worker_bus = $worker->{_BUS};
my $group_config = Beekeeper::Config->get_bus_group_config( bus_id => $worker_bus->bus_id );
my $bus_group = $self->{_BUS_GROUP} = [];
foreach my $config (@$group_config) {
my $bus_id = $config->{'bus_id'};
if ($bus_id eq $worker_bus->bus_id) {
# Already connected to our own bus
$self->_setup_sync_listeners($worker_bus);
$self->_send_sync_request($worker_bus);
$self->{_BUS} = $worker_bus;
weaken $self->{_BUS};
next;
}
my $bus; $bus = Beekeeper::MQTT->new(
%$config,
bus_id => $bus_id,
timeout => 300,
on_error => sub {
# Reconnect
my $errmsg = $_[0] || ""; $errmsg =~ s/\s+/ /sg;
lib/Beekeeper/Worker/Extension/SharedCache.pm view on Meta::CPAN
log_debug "Connected to $bus_id";
$self->_setup_sync_listeners($bus);
$self->_accept_sync_requests($bus) if $self->{synced};
},
);
}
}
sub _setup_sync_listeners {
my ($self, $bus) = @_;
weaken $self;
my $cache_id = $self->{id};
my $uid = $self->{uid};
my $local_bus = $bus->{bus_role};
my $client_id = $bus->{client_id};
my $topic = "msg/$local_bus/_sync/$cache_id/set";
$bus->subscribe(
topic => $topic,
lib/Beekeeper/Worker/Extension/SharedCache.pm view on Meta::CPAN
},
on_suback => sub {
my ($success) = @_;
log_error "Could not subscribe to reply topic '$reply_topic'" unless $success;
}
);
}
sub _send_sync_request {
my ($self, $bus) = @_;
weaken $self;
# Do not send more than one sync request at the time
return if $self->{_sync_timeout};
my $cache_id = $self->{id};
my $uid = $self->{uid};
my $local_bus = $bus->{bus_role};
my $client_id = $bus->{client_id};
$bus->publish(
lib/Beekeeper/Worker/Extension/SharedCache.pm view on Meta::CPAN
# Connections to other buses could have failed or be in progress
next unless $bus->{is_connected};
$self->_accept_sync_requests($bus);
}
}
sub _accept_sync_requests {
my ($self, $bus) = @_;
weaken $self;
weaken $bus;
my $cache_id = $self->{id};
my $uid = $self->{uid};
my $bus_id = $bus->{bus_id};
my $local_bus = $bus->{bus_role};
log_debug "Shared cache '$self->{id}': Accepting sync requests from $local_bus";
my $topic = "\$share/BKPR/req/$local_bus/_sync/$cache_id/dump";
lib/Beekeeper/Worker/Extension/SharedCache.pm view on Meta::CPAN
next unless $bus->{is_connected};
$bus->pingreq;
}
}
my $_now = 0;
sub set {
my ($self, $key, $value) = @_;
weaken $self;
croak "Key value is undefined" unless (defined $key);
my $old = $self->{data}->{$key};
$self->{data}->{$key} = $value;
$self->{vers}->{$key}++;
$self->{time}->{$key} = Time::HiRes::time();
my $json = encode_json([