Broker-Async
view release on metacpan or search on metacpan
lib/Broker/Async.pm view on Meta::CPAN
package Broker::Async;
use strict;
use warnings;
use Broker::Async::Worker;
use Carp;
use Scalar::Util qw( blessed weaken );
=head1 NAME
Broker::Async - broker tasks for multiple workers
=for html <a href="https://travis-ci.org/mark-5/p5-broker-async"><img src="https://travis-ci.org/mark-5/p5-broker-async.svg?branch=master"></a>
=head1 SYNOPSIS
my @workers;
for my $uri (@uris) {
my $client = SomeClient->new($uri);
push @workers, sub { $client->request(@_) };
}
my $broker = Broker::Async->new(workers => \@workers);
for my $future (map $broker->do($_), @requests) {
my $result = $future->get;
...
}
=head1 DESCRIPTION
This module brokers tasks for multiple asynchronous workers. A worker can be any code reference that returns a L<Future>, representing work awaiting completion.
Some common use cases include throttling asynchronous requests to a server, or delegating tasks to a limited number of processes.
=cut
our $VERSION = "0.0.6"; # __VERSION__
=head1 ATTRIBUTES
=head2 workers
An array ref of workers used for handling tasks.
Can be a code reference, a hash ref of L<Broker::Async::Worker> arguments, or a L<Broker::Async::Worker> object.
Every invocation of a worker must return a L<Future> object.
Under the hood, code and hash references are simply used to instantiate a L<Broker::Async::Worker> object.
See L<Broker::Async::Worker> for more documentation about how these parameters are used.
=cut
use Class::Tiny qw( workers ), {
queue => sub { [] },
};
=head1 METHODS
=head2 new
my $broker = Broker::Async->new(
workers => [ sub { ... }, ... ],
);
=cut
sub active {
my ($self) = @_;
return grep { $_->active } @{ $self->workers };
}
sub available {
my ($self) = @_;
return grep { $_->available } @{ $self->workers };
}
sub BUILD {
my ($self) = @_;
for my $name (qw( workers )) {
croak "$name attribute required" unless defined $self->$name;
}
my $workers = $self->workers;
croak "workers attribute must be an array ref: received $workers"
unless ref($workers) eq 'ARRAY';
for (my $i = 0; $i < @$workers; $i++) {
my $worker = $workers->[$i];
my $type = ref($worker);
if ($type eq 'CODE') {
$workers->[$i] = Broker::Async::Worker->new({code => $worker});
} elsif ($type eq 'HASH') {
$workers->[$i] = Broker::Async::Worker->new($worker);
( run in 0.697 second using v1.01-cache-2.11-cpan-39bf76dae61 )