BusyBird

 view release on metacpan or  search on metacpan

lib/BusyBird/Timeline.pm  view on Meta::CPAN

package BusyBird::Timeline;
use v5.8.0;
use strict;
use warnings;
use BusyBird::Util qw(set_param);
use BusyBird::Log qw(bblog);
use BusyBird::Flow;
use BusyBird::Watcher::Aggregator;
use BusyBird::DateTime::Format 0.04;
use BusyBird::Config;
use Async::Selector 1.0;
use Data::UUID;
use Carp;
use Storable qw(dclone);
use Scalar::Util qw(weaken looks_like_number);
use DateTime;

our @CARP_NOT = qw(BusyBird::Config);

sub new {
    my ($class, %args) = @_;
    my $self = bless {
        filter_flow => BusyBird::Flow->new,
        selector => Async::Selector->new,
        unacked_counts => {total => 0},
        config => BusyBird::Config->new(type => "timeline", with_default => 0),
        id_generator => Data::UUID->new,
    }, $class;
    $self->set_param(\%args, 'name', undef, 1);
    $self->set_param(\%args, 'storage', undef, 1);
    $self->set_param(\%args, 'watcher_max', 512);
    croak 'name must not be empty' if $self->{name} eq '';
    $self->_init_selector();
    $self->_update_unacked_counts();
    return $self;
}

sub _log {
    my ($self, $level, $msg) = @_;
    bblog($level, $self->name . ": $msg");
}

sub _update_unacked_counts {
    my ($self) = @_;
    $self->get_unacked_counts(callback => sub {
        my ($error, $unacked_counts) = @_;
        if(defined($error)) {
            $self->_log('error', "error while updating unacked count: $error");
            return;
        }
        $self->{unacked_counts} = $unacked_counts;
        $self->{selector}->trigger('unacked_counts');
    });
}

sub _init_selector {
    my ($self) = @_;
    weaken $self;
    $self->{selector}->register(unacked_counts => sub {
        my ($exp_unacked_counts) = @_;
        if(!defined($exp_unacked_counts) || ref($exp_unacked_counts) ne 'HASH') {
            croak "unacked_counts watcher: condition input must be a hash-ref";
        }
        return { %{$self->{unacked_counts}} } if !%$exp_unacked_counts;
        foreach my $key (keys %$exp_unacked_counts) {
            my $exp_val = $exp_unacked_counts->{$key} || 0;
            my $got_val = $self->{unacked_counts}{$key} || 0;
            return { %{$self->{unacked_counts}} } if $exp_val != $got_val;
        }
        return undef;
    });
    $self->{selector}->register(watcher_quota => sub {
        my ($in) = @_;
        my @watchers = $self->{selector}->watchers('watcher_quota');
        if(int(@watchers) <= $self->{watcher_max}) {
            return undef;
        }
        my $watcher_age = $in->{age} || 0;
        return $watcher_age > $self->{watcher_max} ? 1 : undef;
    });
}

sub name {
    return shift->{name};
}

sub _get_from_storage {
    my ($self, $method, $args_ref) = @_;
    $args_ref->{timeline} = $self->name;
    local @CARP_NOT = (ref($self->{storage}));
    $self->{storage}->$method(%$args_ref);
}

sub get_statuses {
    my ($self, %args) = @_;
    $self->_get_from_storage("get_statuses", \%args);
}

sub get_unacked_counts {
    my ($self, %args) = @_;
    $self->_get_from_storage("get_unacked_counts", \%args);
}

sub _write_statuses {
    my ($self, $method, $args_ref) = @_;
    $args_ref->{timeline} = $self->name;
    local @CARP_NOT = (ref($self->{storage}));
    my $orig_callback = $args_ref->{callback};
    $self->{storage}->$method(%$args_ref, callback => sub {
        $self->_update_unacked_counts();
        goto $orig_callback if defined($orig_callback);
    });
}

sub put_statuses {
    my ($self, %args) = @_;
    $self->_write_statuses('put_statuses', \%args);
}



( run in 1.630 second using v1.01-cache-2.11-cpan-8f98c5d2c55 )