Broker-Async

 view release on metacpan or  search on metacpan

lib/Broker/Async.pm  view on Meta::CPAN

package Broker::Async;
use strict;
use warnings;
use Broker::Async::Worker;
use Carp;
use Scalar::Util qw( blessed weaken );

=head1 NAME

Broker::Async - broker tasks for multiple workers

=for html <a href="https://travis-ci.org/mark-5/p5-broker-async"><img src="https://travis-ci.org/mark-5/p5-broker-async.svg?branch=master"></a>

=head1 SYNOPSIS

    my @workers;
    for my $uri (@uris) {
        my $client = SomeClient->new($uri);
        push @workers, sub { $client->request(@_) };
    }

    my $broker = Broker::Async->new(workers => \@workers);
    for my $future (map $broker->do($_), @requests) {
        my $result = $future->get;
        ...
    }

=head1 DESCRIPTION

This module brokers tasks for multiple asynchronous workers. A worker can be any code reference that returns a L<Future>, representing work awaiting completion.

Some common use cases include throttling asynchronous requests to a server, or delegating tasks to a limited number of processes.

=cut

our $VERSION = "0.0.6"; # __VERSION__

=head1 ATTRIBUTES

=head2 workers

An array ref of workers used for handling tasks.
Can be a code reference, a hash ref of L<Broker::Async::Worker> arguments, or a L<Broker::Async::Worker> object.
Every invocation of a worker must return a L<Future> object.

Under the hood, code and hash references are simply used to instantiate a L<Broker::Async::Worker> object.
See L<Broker::Async::Worker> for more documentation about how these parameters are used.

=cut

use Class::Tiny qw( workers ), {
    queue => sub {  [] },
};

=head1 METHODS

=head2 new

    my $broker = Broker::Async->new(
        workers => [ sub { ... }, ... ],
    );

=cut

sub active {
    my ($self) = @_;
    return grep { $_->active } @{ $self->workers };
}

sub available {
    my ($self) = @_;
    return grep { $_->available } @{ $self->workers };
}

sub BUILD {
    my ($self) = @_;
    for my $name (qw( workers )) {
        croak "$name attribute required" unless defined $self->$name;
    }

    my $workers = $self->workers;
    croak "workers attribute must be an array ref: received $workers"
        unless ref($workers) eq 'ARRAY';

    for (my $i = 0; $i < @$workers; $i++) {
        my $worker = $workers->[$i];

        my $type = ref($worker);
        if ($type eq 'CODE') {
            $workers->[$i] = Broker::Async::Worker->new({code => $worker});
        } elsif ($type eq 'HASH') {
            $workers->[$i] = Broker::Async::Worker->new($worker);



( run in 0.697 second using v1.01-cache-2.11-cpan-39bf76dae61 )