Broker-Async
view release on metacpan or search on metacpan
lib/Broker/Async/Worker.pm view on Meta::CPAN
package Broker::Async::Worker;
use strict;
use warnings;
use Carp;
use Scalar::Util qw( blessed weaken );
=head1 NAME
Broker::Async::Worker
=head1 DESCRIPTION
Used by L<Broker::Async> for tracking the state of asynchronous work.
=cut
our $VERSION = "0.0.6"; # __VERSION__
=head1 ATTRIBUTES
=head2 code
The code reference used to start the work.
This will be invoked with the arguments passed to C<do>.
Must return a L<Future> subclass.
=head2 concurrency
The number of concurrent tasks a worker can execute.
Do'ing more tasks than this limit is a fatal error.
Defaults to 1.
=cut
use Class::Tiny qw( code ), {
concurrency => sub { 1 },
futures => sub { +{} },
available => sub { shift->concurrency },
};
=head1 METHODS
=head2 new
my $worker = Broker::Async::Worker->new(
code => sub { ... },
concurrency => $max,
);
=head2 available
Indicates whether the worker is available to C<do> tasks.
It is a fatal error to invoke C<do> when this is false.
=head2 do
my $future = $worker->do($task);
Invokes the code attribute with the given arguments.
Returns a future that will be resolved when the work is done.
=cut
sub active {
my ($self) = @_;
return values %{ $self->futures };
}
sub BUILD {
my ($self) = @_;
for my $name (qw( code )) {
croak "$name attribute required" unless defined $self->$name;
}
}
sub do {
weaken(my $self = shift);
my (@args) = @_;
if (not( $self->available )) {
croak "worker $self is not available for work";
}
my $f = $self->code->(@args);
if (not( blessed($f) and $f->isa('Future') )) {
croak "code for worker $self did not return a Future: returned $f";
}
$self->available( $self->available - 1 );
return $self->futures->{$f} = $f->on_ready(sub{
delete $self->futures->{$f};
$self->available( $self->available + 1 );
});
}
=head1 AUTHOR
Mark Flickinger E<lt>maf@cpan.orgE<gt>
=head1 LICENSE
This software is licensed under the same terms as Perl itself.
=cut
1;
( run in 1.462 second using v1.01-cache-2.11-cpan-75ffa21a3d4 )