Async-Selector

 view release on metacpan or  search on metacpan

t/55-aggregation_usecase.t  view on Meta::CPAN

use strict;
use warnings;
use Test::More;
use Test::Builder;
use Test::Memory::Cycle;

## dummies for SYNOPSIS
sub handle_a {}
sub handle_b {}


###################### SYNOPSIS ###############

use Async::Selector;
use Async::Selector::Aggregator;

## Setup resources with 3 selectors, each of which registers 'resource'
my %resources = (
    a => { val => 0, selector => Async::Selector->new },
    b => { val => 0, selector => Async::Selector->new },
    c => { val => 0, selector => Async::Selector->new },
);
foreach my $res (values %resources) {
    $res->{selector}->register(resource => sub {
        my ($threshold) = @_;
        return $res->{val} >= $threshold ? $res->{val} : undef;
    });
}

## Aggregate 3 selectors into one. Resource names are now ('a', 'b', 'c')
sub aggregate_watch {
    my $callback = pop;
    my %watch_spec = @_;
    my $aggregator = Async::Selector::Aggregator->new();
    foreach my $key (keys %watch_spec) {
        my $watcher = $resources{$key}{selector}->watch(
            resource => $watch_spec{$key}, sub {
                my ($w, %res) = @_;
                $callback->($aggregator, $key => $res{resource});
            }
        );
        $aggregator->add($watcher);
        last if !$aggregator->active;
    }
    return $aggregator;
}

## Treat 3 selectors like a single selector almost transparently.
## $w and $watcher are actually an Async::Selector::Aggregator.
my $watcher = aggregate_watch(a => 3, b => 0, sub {
    my ($w, %res) = @_;
    handle_a($res{a}) if exists $res{a};
    handle_b($res{b}) if exists $res{b};
    $w->cancel;
});

## In this case, the callback is called immediately and $w->cancel is called.

$watcher->active;  ## => false

#################################

sub test_active_nums {
    my ($exp_active_nums, $label) = @_;
    local $Test::Builder::Level = $Test::Builder::Level + 1;
    $label ||= "";
    foreach my $key (keys %$exp_active_nums) {
        my $exp_num = $exp_active_nums->{$key};
        is(scalar($resources{$key}{selector}->watchers), $exp_num, "$label: $exp_num active watchers in selector $key");
    }
}

sub set {
    my ($name, $val) = @_;
    $resources{$name}{val} = $val;
    $resources{$name}{selector}->trigger('resource');
}

ok(!$watcher->active, "watcher is inactive because resource b fired.");
test_active_nums({a => 0, b => 0, c => 0}, 'initial watch');

{
    my @results = ();
    my $agg = aggregate_watch(a => 3, b => 1, sub {
        my ($aggregator, %res) = @_;
        push(@results, \%res);
    });
    ok($agg->active, 'agg is active');
    is_deeply(\@results, [], 'results empty');
    test_active_nums({a => 1, b => 1, c => 0}, 'watch 1');
    set(b => 3);
    is_deeply(\@results, [{b => 3}], 'b fired.');
    ok($agg->active, "agg is still active.");
    memory_cycle_ok($agg, 'no cyclic refs in agg even if agg is active');

    @results = ();
    set(a => 3);
    is_deeply(\@results, [{a => 3}], 'a fired.');
    ok($agg->active, 'agg is still active.');

    @results = ();
    set(c => 0);
    is_deeply(\@results, [], 'none fired.');
    ok($agg->active, 'agg is still active.');

    $agg->cancel;
    ok(!$agg->active, 'agg is inactive');
    test_active_nums({a => 0, b => 0, c => 0}, 'no watch');



( run in 0.519 second using v1.01-cache-2.11-cpan-39bf76dae61 )