Broker-Async
view release on metacpan or search on metacpan
lib/Broker/Async.pm view on Meta::CPAN
package Broker::Async;
use strict;
use warnings;
use Broker::Async::Worker;
use Carp;
use Scalar::Util qw( blessed weaken );
=head1 NAME
Broker::Async - broker tasks for multiple workers
=for html <a href="https://travis-ci.org/mark-5/p5-broker-async"><img src="https://travis-ci.org/mark-5/p5-broker-async.svg?branch=master"></a>
=head1 SYNOPSIS
my @workers;
lib/Broker/Async.pm view on Meta::CPAN
# so start the task and return it's future
$future = $self->do_worker($available_worker, @args);
}
# start any recently queued tasks, if there are available workers
$self->process_queue;
return $future;
}
sub do_worker {
weaken(my $self = shift);
my ($worker, @args) = @_;
return $worker->do(@args)->on_ready(sub{
# queue next task
$self->process_queue;
});
}
sub process_queue {
weaken(my $self = shift);
my $queue = $self->queue;
while (@$queue) {
my ($worker) = $self->available or last;
my $task = shift @$queue;
$self->do_worker($worker, @{$task->{args}})
->on_ready($task->{future});
}
}
lib/Broker/Async/Worker.pm view on Meta::CPAN
package Broker::Async::Worker;
use strict;
use warnings;
use Carp;
use Scalar::Util qw( blessed weaken );
=head1 NAME
Broker::Async::Worker
=head1 DESCRIPTION
Used by L<Broker::Async> for tracking the state of asynchronous work.
=cut
lib/Broker/Async/Worker.pm view on Meta::CPAN
}
sub BUILD {
my ($self) = @_;
for my $name (qw( code )) {
croak "$name attribute required" unless defined $self->$name;
}
}
sub do {
weaken(my $self = shift);
my (@args) = @_;
if (not( $self->available )) {
croak "worker $self is not available for work";
}
my $f = $self->code->(@args);
if (not( blessed($f) and $f->isa('Future') )) {
croak "code for worker $self did not return a Future: returned $f";
}
$self->available( $self->available - 1 );
t/lib/Test/Broker/Async/Trace.pm view on Meta::CPAN
package Test::Broker::Async::Trace;
use strict;
use warnings;
use Future;
use Scalar::Util qw( weaken );
use Class::Tiny qw(), {
_live => sub { {} },
futures => sub { {} },
started => sub { [] },
};
sub live {
my ($self) = @_;
my @started = @{ $self->started };
my @live =
map { $_->[0] }
sort { $a->[1] <=> $b->[1] }
map { [$_, index($_, @started)] }
keys %{ $self->_live };
return \@live;
}
sub worker {
weaken(my $self = shift);
my $code = $_[0] ||= sub { Future->new };
return sub {
my ($id) = @_;
my $future = $code->(@_);
push @{ $self->started }, $id;
$self->_live->{$id} = $future;
$self->futures->{$id} = $future;
( run in 0.309 second using v1.01-cache-2.11-cpan-65fba6d93b7 )