Async-Selector

 view release on metacpan or  search on metacpan

lib/Async/Selector.pm  view on Meta::CPAN

use Async::Selector::Watcher;


=pod

=head1 NAME

Async::Selector - level-triggered resource observer like select(2)


=head1 VERSION

1.03

=cut

our $VERSION = "1.03";


=pod

=head1 SYNOPSIS


    use Async::Selector;
    
    my $selector = Async::Selector->new();
    
    ## Register resource
    my $resource = "some text.";  ## 10 bytes
    
    $selector->register(resource_A => sub {
        ## If length of $resource is more than or equal to $threshold bytes, provide it.
        my $threshold = shift;
        return length($resource) >= $threshold ? $resource : undef;
    });
    
    
    ## Watch the resource with a callback.
    $selector->watch(
        resource_A => 20,  ## When the resource gets more than or equal to 20 bytes...
        sub {              ## ... execute this callback.
            my ($watcher, %resource) = @_;
            print "$resource{resource_A}\n";
            $watcher->cancel();
        }
    );
    
    
    ## Append data to the resource
    $resource .= "data";  ## 14 bytes
    $selector->trigger('resource_A'); ## Nothing happens
    
    $resource .= "more data";  ## 23 bytes
    $selector->trigger('resource_A'); ## The callback prints 'some text.datamore data'


=head1 DESCRIPTION

L<Async::Selector> is an object that watches registered resources
and executes callbacks when some of the resources are available.
Thus it is an implementation of the Observer pattern like L<Event::Notify>,
but the important difference is that L<Async::Selector> is B<level-triggered> like C<select(2)> system call.

Basic usage of L<Async::Selector> is as follows:

=over

=item 1.

Register as many resources as you like by C<register()> method.

A resource has its name and resource provider.
A resource provier is a subroutine reference that returns some data (or C<undef> if it's not available).


=item 2.

Watch as many resources as you like by C<watch()> method.

When any of the watched resources gets available, a callback function is executed
with the available resource data.

Note that if some of the watched resources is already available when calling C<watch()> method,
it executes the callback function immediately.
That's because L<Async::Selector> is level-triggered.


=item 3.

Notify the L<Async::Selector> object by C<trigger()> method that some of the registered resources have changed.

The L<Async::Selector> object then checks if any of the triggered resources gets available.
If some resources become available, the callback function given by C<watch()> method is executed.


=back


=head1 CLASS METHODS


=head2 $selector = Async::Selector->new();

Creates an L<Async::Selector> object. It takes no parameters.


=cut


sub new {
   my ($class) = @_;
   my $self = bless {
       resources => {},
       watchers => {},
   }, $class;
   return $self;
}

sub _check {
   my ($self, $watcher_id_or_watcher, @triggers) = @_;

lib/Async/Selector.pm  view on Meta::CPAN

=cut

sub registered {
    my ($self, $resource_name) = @_;
    return 0 if not defined($resource_name);
    return exists $self->{resources}{$resource_name};
}


=pod


=head2 @watchers = $selector->watchers([@resource_names]);

Returns the list of active watchers (L<Async::Selector::Watcher> objects) from the L<Async::Selector> object.

If C<watchers()> method is called without argument, it returns all of the active watchers.

If C<watchers()> method is called with some arguments (C<@resource_names>),
it returns active watchers that watch ANY resource out of C<@resource_names>.

If you want watchers that watch ALL of C<@resource_names>,
try filtering the result (C<@watchers>) with L<Async::Selector::Watcher>'s C<resources()> method.

=cut

sub watchers {
    my ($self, @resources) = @_;
    my @all_watchers = map { $_->{object} } values %{$self->{watchers}};
    if(!@resources) {
        return @all_watchers;
    }
    my @affected_watchers = ();
  watcher_loop: foreach my $watcher (@all_watchers) {
        my %watch_conditions = $watcher->conditions;
        foreach my $res (@resources) {
            next if !defined($res);
            if(exists($watch_conditions{$res})) {
                push(@affected_watchers, $watcher);
                next watcher_loop;
            }
        }
    }
    return @affected_watchers;
}


sub selections {
    my ($self) = @_;
    return map { "$_" } $self->watchers;
}


=pod

=head1 EXAMPLES

=head2 Level-triggered vs. edge-triggered

Watchers created by C<watch()> and C<watch_lt()> methods are level-triggered.
This means their callbacks can be immediately executed if some of the watched resources
are already available.

Watchers created by C<watch_et()> method are edge-triggered.
This means their callbacks are never executed at the moment C<watch_et()> is called.

Both level-triggered and edge-triggered watcher callbacks are executed
when some of the watched resources are C<trigger()>-ed AND available.


    my $selector = Async::Selector->new();
    my $a = 10;
    $selector->register(a => sub { my $t = shift; return $a >= $t ? $a : undef });

    ## Level-triggered watch
    $selector->watch_lt(a => 5, sub { ## => LT: 10
        my ($watcher, %res) = @_;
        print "LT: $res{a}\n";
    });
    $selector->trigger('a');          ## => LT: 10
    $a = 12;
    $selector->trigger('a');          ## => LT: 12
    $a = 3;
    $selector->trigger('a');          ## Nothing happens because $a == 3 < 5.

    ## Edge-triggered watch
    $selector->watch_et(a => 2, sub { ## Nothing happens because it's edge-triggered
        my ($watcher, %res) = @_;
        print "ET: $res{a}\n";
    });
    $selector->trigger('a');          ## => ET: 3
    $a = 0;
    $selector->trigger('a');          ## Nothing happens.
    $a = 10;
    $selector->trigger('a');          ## => LT: 10
                                      ## => ET: 10



=head2 Multiple resources, multiple watches

You can register multiple resources with a single L<Async::Selector>
object.  You can watch multiple resources with a single call of
C<watch()> method.  If you watch multiple resources, the callback is
executed when any of the watched resources is available.


    my $selector = Async::Selector->new();
    my $a = 5;
    my $b = 6;
    my $c = 7;
    $selector->register(
        a => sub { my $t = shift; return $a >= $t ? $a : undef },
        b => sub { my $t = shift; return $b >= $t ? $b : undef },
        c => sub { my $t = shift; return $c >= $t ? $c : undef },
    );
    $selector->watch(a => 10, sub {
        my ($watcher, %res) = @_;
        print "Select 1: a is $res{a}\n";
        $watcher->cancel();
    });
    $selector->watch(
        a => 12, b => 15, c => 15,
        sub {
            my ($watcher, %res) = @_;
            foreach my $key (sort keys %res) {
                print "Select 2: $key is $res{$key}\n";
            }
            $watcher->cancel();
        }
    );

    ($a, $b, $c) = (11, 14, 14);
    $selector->trigger(qw(a b c));  ## -> Select 1: a is 11
    print "---------\n";
    ($a, $b, $c) = (12, 14, 20);
    $selector->trigger(qw(a b c));  ## -> Select 2: a is 12
                                    ## -> Select 2: c is 20


=head2 One-shot and persistent watches

The watchers are persistent by default, that is, they remain in the
L<Async::Selector> object no matter how many times their callbacks
are executed.

If you want to execute your callback just one time, call C<< $watcher->cancel() >>
in the callback.


    my $selector = Async::Selector->new();
    my $A = "";
    my $B = "";
    $selector->register(
        A => sub { my $in = shift; return length($A) >= $in ? $A : undef },
        B => sub { my $in = shift; return length($B) >= $in ? $B : undef },
    );

    my $watcher_a = $selector->watch(A => 5, sub {
        my ($watcher, %res) = @_;
        print "A: $res{A}\n";
        $watcher->cancel(); ## one-shot callback
    });
    my $watcher_b = $selector->watch(B => 5, sub {
        my ($watcher, %res) = @_;
        print "B: $res{B}\n";
        ## persistent callback
    });

    ## Trigger the resources.
    ## Execution order of watcher callbacks is not guaranteed.
    ($A, $B) = ('aaaaa', 'bbbbb');
    $selector->trigger('A', 'B');   ## -> A: aaaaa
                                    ## -> B: bbbbb
    print "--------\n";
    ## $watcher_a is already canceled.
    ($A, $B) = ('AAAAA', 'BBBBB');
    $selector->trigger('A', 'B');   ## -> B: BBBBB
    print "--------\n";

    $B = "CCCCCCC";
    $selector->trigger('A', 'B');   ## -> B: CCCCCCC
    print "--------\n";

    $watcher_b->cancel();
    $selector->trigger('A', 'B');   ## Nothing happens.

=head2 Watcher aggregator

Sometimes you might want to use multiple L<Async::Selector> objects
and watch their resources simultaneously.
In this case, you can use L<Async::Selector::Aggregator> to aggregate
watchers produced by L<Async::Selector> objects.
See L<Async::Selector::Aggregator> for details.

    my $selector_a = Async::Selector->new();
    my $selector_b = Async::Selector->new();
    my $A = "";
    my $B = "";
    $selector_a->register(resource => sub { my $in = shift; return length($A) >= $in ? $A : undef });
    $selector_b->register(resource => sub { my $in = shift; return length($B) >= $in ? $B : undef });
    
    my $watcher_a = $selector_a->watch(resource => 5, sub {
        my ($watcher, %res) = @_;
        print "A: $res{resource}\n";
    });
    my $watcher_b = $selector_b->watch(resource => 5, sub {
        my ($watcher, %res) = @_;
        print "B: $res{resource}\n";
    });
    
    ## Aggregates the two watchers into $aggregator
    my $aggregator = Async::Selector::Aggregator->new();
    $aggregator->add($watcher_a);
    $aggregator->add($watcher_b);
    
    ## This cancels both $watcher_a and $watcher_b
    $aggregator->cancel();
    
    print("watcher_a: " . ($watcher_a->active ? "active" : "inactive") . "\n"); ## -> watcher_a: inactive
    print("watcher_b: " . ($watcher_b->active ? "active" : "inactive") . "\n"); ## -> watcher_b: inactive



=head2 Real-time Web: Comet (long-polling) and WebSocket

L<Async::Selector> can be used for foundation of so-called real-time
Web.  Resource registered with an L<Async::Selector> object can be
pushed to Web browsers via Comet (long-polling) and/or WebSocket.

See L<Async::Selector::Example::Mojo> for detail.



( run in 1.964 second using v1.01-cache-2.11-cpan-39bf76dae61 )