Async-Selector
view release on metacpan or search on metacpan
t/55-aggregation_usecase.t view on Meta::CPAN
use strict;
use warnings;
use Test::More;
use Test::Builder;
use Test::Memory::Cycle;
## dummies for SYNOPSIS
sub handle_a {}
sub handle_b {}
###################### SYNOPSIS ###############
use Async::Selector;
use Async::Selector::Aggregator;
## Setup resources with 3 selectors, each of which registers 'resource'
my %resources = (
a => { val => 0, selector => Async::Selector->new },
b => { val => 0, selector => Async::Selector->new },
c => { val => 0, selector => Async::Selector->new },
);
foreach my $res (values %resources) {
$res->{selector}->register(resource => sub {
my ($threshold) = @_;
return $res->{val} >= $threshold ? $res->{val} : undef;
});
}
## Aggregate 3 selectors into one. Resource names are now ('a', 'b', 'c')
sub aggregate_watch {
my $callback = pop;
my %watch_spec = @_;
my $aggregator = Async::Selector::Aggregator->new();
foreach my $key (keys %watch_spec) {
my $watcher = $resources{$key}{selector}->watch(
resource => $watch_spec{$key}, sub {
my ($w, %res) = @_;
$callback->($aggregator, $key => $res{resource});
}
);
$aggregator->add($watcher);
last if !$aggregator->active;
}
return $aggregator;
}
## Treat 3 selectors like a single selector almost transparently.
## $w and $watcher are actually an Async::Selector::Aggregator.
my $watcher = aggregate_watch(a => 3, b => 0, sub {
my ($w, %res) = @_;
handle_a($res{a}) if exists $res{a};
handle_b($res{b}) if exists $res{b};
$w->cancel;
});
## In this case, the callback is called immediately and $w->cancel is called.
$watcher->active; ## => false
#################################
sub test_active_nums {
my ($exp_active_nums, $label) = @_;
local $Test::Builder::Level = $Test::Builder::Level + 1;
$label ||= "";
foreach my $key (keys %$exp_active_nums) {
my $exp_num = $exp_active_nums->{$key};
is(scalar($resources{$key}{selector}->watchers), $exp_num, "$label: $exp_num active watchers in selector $key");
}
}
sub set {
my ($name, $val) = @_;
$resources{$name}{val} = $val;
$resources{$name}{selector}->trigger('resource');
}
ok(!$watcher->active, "watcher is inactive because resource b fired.");
test_active_nums({a => 0, b => 0, c => 0}, 'initial watch');
{
my @results = ();
my $agg = aggregate_watch(a => 3, b => 1, sub {
my ($aggregator, %res) = @_;
push(@results, \%res);
});
ok($agg->active, 'agg is active');
is_deeply(\@results, [], 'results empty');
test_active_nums({a => 1, b => 1, c => 0}, 'watch 1');
set(b => 3);
is_deeply(\@results, [{b => 3}], 'b fired.');
ok($agg->active, "agg is still active.");
memory_cycle_ok($agg, 'no cyclic refs in agg even if agg is active');
@results = ();
set(a => 3);
is_deeply(\@results, [{a => 3}], 'a fired.');
ok($agg->active, 'agg is still active.');
@results = ();
set(c => 0);
is_deeply(\@results, [], 'none fired.');
ok($agg->active, 'agg is still active.');
$agg->cancel;
ok(!$agg->active, 'agg is inactive');
test_active_nums({a => 0, b => 0, c => 0}, 'no watch');
( run in 0.519 second using v1.01-cache-2.11-cpan-39bf76dae61 )