BusyBird
view release on metacpan or search on metacpan
lib/BusyBird/Timeline.pm view on Meta::CPAN
package BusyBird::Timeline;
use v5.8.0;
use strict;
use warnings;
use BusyBird::Util qw(set_param);
use BusyBird::Log qw(bblog);
use BusyBird::Flow;
use BusyBird::Watcher::Aggregator;
use BusyBird::DateTime::Format 0.04;
use BusyBird::Config;
use Async::Selector 1.0;
use Data::UUID;
use Carp;
use Storable qw(dclone);
use Scalar::Util qw(weaken looks_like_number);
use DateTime;
our @CARP_NOT = qw(BusyBird::Config);
sub new {
my ($class, %args) = @_;
my $self = bless {
filter_flow => BusyBird::Flow->new,
selector => Async::Selector->new,
unacked_counts => {total => 0},
config => BusyBird::Config->new(type => "timeline", with_default => 0),
id_generator => Data::UUID->new,
}, $class;
$self->set_param(\%args, 'name', undef, 1);
$self->set_param(\%args, 'storage', undef, 1);
$self->set_param(\%args, 'watcher_max', 512);
croak 'name must not be empty' if $self->{name} eq '';
$self->_init_selector();
$self->_update_unacked_counts();
return $self;
}
sub _log {
my ($self, $level, $msg) = @_;
bblog($level, $self->name . ": $msg");
}
sub _update_unacked_counts {
my ($self) = @_;
$self->get_unacked_counts(callback => sub {
my ($error, $unacked_counts) = @_;
if(defined($error)) {
$self->_log('error', "error while updating unacked count: $error");
return;
}
$self->{unacked_counts} = $unacked_counts;
$self->{selector}->trigger('unacked_counts');
});
}
sub _init_selector {
my ($self) = @_;
weaken $self;
$self->{selector}->register(unacked_counts => sub {
my ($exp_unacked_counts) = @_;
if(!defined($exp_unacked_counts) || ref($exp_unacked_counts) ne 'HASH') {
croak "unacked_counts watcher: condition input must be a hash-ref";
}
return { %{$self->{unacked_counts}} } if !%$exp_unacked_counts;
foreach my $key (keys %$exp_unacked_counts) {
my $exp_val = $exp_unacked_counts->{$key} || 0;
my $got_val = $self->{unacked_counts}{$key} || 0;
return { %{$self->{unacked_counts}} } if $exp_val != $got_val;
}
return undef;
});
$self->{selector}->register(watcher_quota => sub {
my ($in) = @_;
my @watchers = $self->{selector}->watchers('watcher_quota');
if(int(@watchers) <= $self->{watcher_max}) {
return undef;
}
my $watcher_age = $in->{age} || 0;
return $watcher_age > $self->{watcher_max} ? 1 : undef;
});
}
sub name {
return shift->{name};
}
sub _get_from_storage {
my ($self, $method, $args_ref) = @_;
$args_ref->{timeline} = $self->name;
local @CARP_NOT = (ref($self->{storage}));
$self->{storage}->$method(%$args_ref);
}
sub get_statuses {
my ($self, %args) = @_;
$self->_get_from_storage("get_statuses", \%args);
}
sub get_unacked_counts {
my ($self, %args) = @_;
$self->_get_from_storage("get_unacked_counts", \%args);
}
sub _write_statuses {
my ($self, $method, $args_ref) = @_;
$args_ref->{timeline} = $self->name;
local @CARP_NOT = (ref($self->{storage}));
my $orig_callback = $args_ref->{callback};
$self->{storage}->$method(%$args_ref, callback => sub {
$self->_update_unacked_counts();
goto $orig_callback if defined($orig_callback);
});
}
sub put_statuses {
my ($self, %args) = @_;
$self->_write_statuses('put_statuses', \%args);
}
( run in 1.630 second using v1.01-cache-2.11-cpan-8f98c5d2c55 )