Async-Selector
view release on metacpan or search on metacpan
lib/Async/Selector.pm view on Meta::CPAN
use Async::Selector::Watcher;
=pod
=head1 NAME
Async::Selector - level-triggered resource observer like select(2)
=head1 VERSION
1.03
=cut
our $VERSION = "1.03";
=pod
=head1 SYNOPSIS
use Async::Selector;
my $selector = Async::Selector->new();
## Register resource
my $resource = "some text."; ## 10 bytes
$selector->register(resource_A => sub {
## If length of $resource is more than or equal to $threshold bytes, provide it.
my $threshold = shift;
return length($resource) >= $threshold ? $resource : undef;
});
## Watch the resource with a callback.
$selector->watch(
resource_A => 20, ## When the resource gets more than or equal to 20 bytes...
sub { ## ... execute this callback.
my ($watcher, %resource) = @_;
print "$resource{resource_A}\n";
$watcher->cancel();
}
);
## Append data to the resource
$resource .= "data"; ## 14 bytes
$selector->trigger('resource_A'); ## Nothing happens
$resource .= "more data"; ## 23 bytes
$selector->trigger('resource_A'); ## The callback prints 'some text.datamore data'
=head1 DESCRIPTION
L<Async::Selector> is an object that watches registered resources
and executes callbacks when some of the resources are available.
Thus it is an implementation of the Observer pattern like L<Event::Notify>,
but the important difference is that L<Async::Selector> is B<level-triggered> like C<select(2)> system call.
Basic usage of L<Async::Selector> is as follows:
=over
=item 1.
Register as many resources as you like by C<register()> method.
A resource has its name and resource provider.
A resource provier is a subroutine reference that returns some data (or C<undef> if it's not available).
=item 2.
Watch as many resources as you like by C<watch()> method.
When any of the watched resources gets available, a callback function is executed
with the available resource data.
Note that if some of the watched resources is already available when calling C<watch()> method,
it executes the callback function immediately.
That's because L<Async::Selector> is level-triggered.
=item 3.
Notify the L<Async::Selector> object by C<trigger()> method that some of the registered resources have changed.
The L<Async::Selector> object then checks if any of the triggered resources gets available.
If some resources become available, the callback function given by C<watch()> method is executed.
=back
=head1 CLASS METHODS
=head2 $selector = Async::Selector->new();
Creates an L<Async::Selector> object. It takes no parameters.
=cut
sub new {
my ($class) = @_;
my $self = bless {
resources => {},
watchers => {},
}, $class;
return $self;
}
sub _check {
my ($self, $watcher_id_or_watcher, @triggers) = @_;
lib/Async/Selector.pm view on Meta::CPAN
=cut
sub registered {
my ($self, $resource_name) = @_;
return 0 if not defined($resource_name);
return exists $self->{resources}{$resource_name};
}
=pod
=head2 @watchers = $selector->watchers([@resource_names]);
Returns the list of active watchers (L<Async::Selector::Watcher> objects) from the L<Async::Selector> object.
If C<watchers()> method is called without argument, it returns all of the active watchers.
If C<watchers()> method is called with some arguments (C<@resource_names>),
it returns active watchers that watch ANY resource out of C<@resource_names>.
If you want watchers that watch ALL of C<@resource_names>,
try filtering the result (C<@watchers>) with L<Async::Selector::Watcher>'s C<resources()> method.
=cut
sub watchers {
my ($self, @resources) = @_;
my @all_watchers = map { $_->{object} } values %{$self->{watchers}};
if(!@resources) {
return @all_watchers;
}
my @affected_watchers = ();
watcher_loop: foreach my $watcher (@all_watchers) {
my %watch_conditions = $watcher->conditions;
foreach my $res (@resources) {
next if !defined($res);
if(exists($watch_conditions{$res})) {
push(@affected_watchers, $watcher);
next watcher_loop;
}
}
}
return @affected_watchers;
}
sub selections {
my ($self) = @_;
return map { "$_" } $self->watchers;
}
=pod
=head1 EXAMPLES
=head2 Level-triggered vs. edge-triggered
Watchers created by C<watch()> and C<watch_lt()> methods are level-triggered.
This means their callbacks can be immediately executed if some of the watched resources
are already available.
Watchers created by C<watch_et()> method are edge-triggered.
This means their callbacks are never executed at the moment C<watch_et()> is called.
Both level-triggered and edge-triggered watcher callbacks are executed
when some of the watched resources are C<trigger()>-ed AND available.
my $selector = Async::Selector->new();
my $a = 10;
$selector->register(a => sub { my $t = shift; return $a >= $t ? $a : undef });
## Level-triggered watch
$selector->watch_lt(a => 5, sub { ## => LT: 10
my ($watcher, %res) = @_;
print "LT: $res{a}\n";
});
$selector->trigger('a'); ## => LT: 10
$a = 12;
$selector->trigger('a'); ## => LT: 12
$a = 3;
$selector->trigger('a'); ## Nothing happens because $a == 3 < 5.
## Edge-triggered watch
$selector->watch_et(a => 2, sub { ## Nothing happens because it's edge-triggered
my ($watcher, %res) = @_;
print "ET: $res{a}\n";
});
$selector->trigger('a'); ## => ET: 3
$a = 0;
$selector->trigger('a'); ## Nothing happens.
$a = 10;
$selector->trigger('a'); ## => LT: 10
## => ET: 10
=head2 Multiple resources, multiple watches
You can register multiple resources with a single L<Async::Selector>
object. You can watch multiple resources with a single call of
C<watch()> method. If you watch multiple resources, the callback is
executed when any of the watched resources is available.
my $selector = Async::Selector->new();
my $a = 5;
my $b = 6;
my $c = 7;
$selector->register(
a => sub { my $t = shift; return $a >= $t ? $a : undef },
b => sub { my $t = shift; return $b >= $t ? $b : undef },
c => sub { my $t = shift; return $c >= $t ? $c : undef },
);
$selector->watch(a => 10, sub {
my ($watcher, %res) = @_;
print "Select 1: a is $res{a}\n";
$watcher->cancel();
});
$selector->watch(
a => 12, b => 15, c => 15,
sub {
my ($watcher, %res) = @_;
foreach my $key (sort keys %res) {
print "Select 2: $key is $res{$key}\n";
}
$watcher->cancel();
}
);
($a, $b, $c) = (11, 14, 14);
$selector->trigger(qw(a b c)); ## -> Select 1: a is 11
print "---------\n";
($a, $b, $c) = (12, 14, 20);
$selector->trigger(qw(a b c)); ## -> Select 2: a is 12
## -> Select 2: c is 20
=head2 One-shot and persistent watches
The watchers are persistent by default, that is, they remain in the
L<Async::Selector> object no matter how many times their callbacks
are executed.
If you want to execute your callback just one time, call C<< $watcher->cancel() >>
in the callback.
my $selector = Async::Selector->new();
my $A = "";
my $B = "";
$selector->register(
A => sub { my $in = shift; return length($A) >= $in ? $A : undef },
B => sub { my $in = shift; return length($B) >= $in ? $B : undef },
);
my $watcher_a = $selector->watch(A => 5, sub {
my ($watcher, %res) = @_;
print "A: $res{A}\n";
$watcher->cancel(); ## one-shot callback
});
my $watcher_b = $selector->watch(B => 5, sub {
my ($watcher, %res) = @_;
print "B: $res{B}\n";
## persistent callback
});
## Trigger the resources.
## Execution order of watcher callbacks is not guaranteed.
($A, $B) = ('aaaaa', 'bbbbb');
$selector->trigger('A', 'B'); ## -> A: aaaaa
## -> B: bbbbb
print "--------\n";
## $watcher_a is already canceled.
($A, $B) = ('AAAAA', 'BBBBB');
$selector->trigger('A', 'B'); ## -> B: BBBBB
print "--------\n";
$B = "CCCCCCC";
$selector->trigger('A', 'B'); ## -> B: CCCCCCC
print "--------\n";
$watcher_b->cancel();
$selector->trigger('A', 'B'); ## Nothing happens.
=head2 Watcher aggregator
Sometimes you might want to use multiple L<Async::Selector> objects
and watch their resources simultaneously.
In this case, you can use L<Async::Selector::Aggregator> to aggregate
watchers produced by L<Async::Selector> objects.
See L<Async::Selector::Aggregator> for details.
my $selector_a = Async::Selector->new();
my $selector_b = Async::Selector->new();
my $A = "";
my $B = "";
$selector_a->register(resource => sub { my $in = shift; return length($A) >= $in ? $A : undef });
$selector_b->register(resource => sub { my $in = shift; return length($B) >= $in ? $B : undef });
my $watcher_a = $selector_a->watch(resource => 5, sub {
my ($watcher, %res) = @_;
print "A: $res{resource}\n";
});
my $watcher_b = $selector_b->watch(resource => 5, sub {
my ($watcher, %res) = @_;
print "B: $res{resource}\n";
});
## Aggregates the two watchers into $aggregator
my $aggregator = Async::Selector::Aggregator->new();
$aggregator->add($watcher_a);
$aggregator->add($watcher_b);
## This cancels both $watcher_a and $watcher_b
$aggregator->cancel();
print("watcher_a: " . ($watcher_a->active ? "active" : "inactive") . "\n"); ## -> watcher_a: inactive
print("watcher_b: " . ($watcher_b->active ? "active" : "inactive") . "\n"); ## -> watcher_b: inactive
=head2 Real-time Web: Comet (long-polling) and WebSocket
L<Async::Selector> can be used for foundation of so-called real-time
Web. Resource registered with an L<Async::Selector> object can be
pushed to Web browsers via Comet (long-polling) and/or WebSocket.
See L<Async::Selector::Example::Mojo> for detail.
( run in 1.964 second using v1.01-cache-2.11-cpan-39bf76dae61 )