Broker-Async
view release on metacpan or search on metacpan
lib/Broker/Async.pm view on Meta::CPAN
package Broker::Async;
use strict;
use warnings;
use Broker::Async::Worker;
use Carp;
use Scalar::Util qw( blessed weaken );
=head1 NAME
Broker::Async - broker tasks for multiple workers
=for html <a href="https://travis-ci.org/mark-5/p5-broker-async"><img src="https://travis-ci.org/mark-5/p5-broker-async.svg?branch=master"></a>
=head1 SYNOPSIS
my @workers;
for my $uri (@uris) {
my $client = SomeClient->new($uri);
push @workers, sub { $client->request(@_) };
}
my $broker = Broker::Async->new(workers => \@workers);
for my $future (map $broker->do($_), @requests) {
my $result = $future->get;
...
}
=head1 DESCRIPTION
This module brokers tasks for multiple asynchronous workers. A worker can be any code reference that returns a L<Future>, representing work awaiting completion.
Some common use cases include throttling asynchronous requests to a server, or delegating tasks to a limited number of processes.
=cut
our $VERSION = "0.0.6"; # __VERSION__
=head1 ATTRIBUTES
=head2 workers
An array ref of workers used for handling tasks.
Can be a code reference, a hash ref of L<Broker::Async::Worker> arguments, or a L<Broker::Async::Worker> object.
Every invocation of a worker must return a L<Future> object.
Under the hood, code and hash references are simply used to instantiate a L<Broker::Async::Worker> object.
See L<Broker::Async::Worker> for more documentation about how these parameters are used.
=cut
use Class::Tiny qw( workers ), {
queue => sub { [] },
};
=head1 METHODS
=head2 new
my $broker = Broker::Async->new(
workers => [ sub { ... }, ... ],
);
=cut
sub active {
my ($self) = @_;
lib/Broker/Async.pm view on Meta::CPAN
}
my $workers = $self->workers;
croak "workers attribute must be an array ref: received $workers"
unless ref($workers) eq 'ARRAY';
for (my $i = 0; $i < @$workers; $i++) {
my $worker = $workers->[$i];
my $type = ref($worker);
if ($type eq 'CODE') {
$workers->[$i] = Broker::Async::Worker->new({code => $worker});
} elsif ($type eq 'HASH') {
$workers->[$i] = Broker::Async::Worker->new($worker);
}
}
}
=head2 do
my $future = $broker->do(@args);
Queue the invocation of a worker with @args.
@args can be any data structure, and is passed as is to a worker code ref.
Returns a L<Future> object that resolves when the work is done.
There is no guarantee when a worker will be called, that depends on when a worker becomes available.
However, calls are guaranteed to be invoked in the order they are seen by $broker->do.
=cut
sub do {
my ($self, @args) = @_;
# enforces consistent order of task execution
# makes sure current task is only started if nothing else is queued
$self->process_queue;
my $future;
if (my @active_futures = map $_->active, $self->active) {
# generate future from an existing future
# see Future::_new_convergent
my $_future = $active_futures[0];
ref($_) eq "Future" or $_future = $_, last for @active_futures;
$future = $_future->new;
push @{ $self->queue }, {args => \@args, future => $future};
} elsif (my ($available_worker) = $self->available) {
# should only be here if there's nothing active and nothing queued
# so start the task and return it's future
$future = $self->do_worker($available_worker, @args);
}
# start any recently queued tasks, if there are available workers
$self->process_queue;
return $future;
}
sub do_worker {
weaken(my $self = shift);
my ($worker, @args) = @_;
return $worker->do(@args)->on_ready(sub{
# queue next task
$self->process_queue;
});
}
sub process_queue {
weaken(my $self = shift);
my $queue = $self->queue;
while (@$queue) {
my ($worker) = $self->available or last;
my $task = shift @$queue;
$self->do_worker($worker, @{$task->{args}})
->on_ready($task->{future});
}
}
=head1 AUTHOR
Mark Flickinger E<lt>maf@cpan.orgE<gt>
=head1 LICENSE
This software is licensed under the same terms as Perl itself.
=cut
1;
( run in 1.197 second using v1.01-cache-2.11-cpan-f56aa216473 )