Broker-Async

 view release on metacpan or  search on metacpan

lib/Broker/Async.pm  view on Meta::CPAN

package Broker::Async;
use strict;
use warnings;
use Broker::Async::Worker;
use Carp;
use Scalar::Util qw( blessed weaken );

=head1 NAME

Broker::Async - broker tasks for multiple workers

=for html <a href="https://travis-ci.org/mark-5/p5-broker-async"><img src="https://travis-ci.org/mark-5/p5-broker-async.svg?branch=master"></a>

=head1 SYNOPSIS

    my @workers;

lib/Broker/Async.pm  view on Meta::CPAN

        # so start the task and return it's future
        $future = $self->do_worker($available_worker, @args);
    }

    # start any recently queued tasks, if there are available workers
    $self->process_queue;
    return $future;
}

sub do_worker {
    weaken(my $self = shift);
    my ($worker, @args) = @_;

    return $worker->do(@args)->on_ready(sub{
        # queue next task
        $self->process_queue;
    });
}

sub process_queue {
    weaken(my $self = shift);
    my $queue = $self->queue;

    while (@$queue) {
        my ($worker) = $self->available or last;
        my $task     = shift @$queue;

        $self->do_worker($worker, @{$task->{args}})
             ->on_ready($task->{future});
    }
}

lib/Broker/Async/Worker.pm  view on Meta::CPAN

package Broker::Async::Worker;
use strict;
use warnings;
use Carp;
use Scalar::Util qw( blessed weaken );

=head1 NAME

Broker::Async::Worker

=head1 DESCRIPTION

Used by L<Broker::Async> for tracking the state of asynchronous work.

=cut

lib/Broker/Async/Worker.pm  view on Meta::CPAN

}

sub BUILD {
    my ($self) = @_;
    for my $name (qw( code )) {
        croak "$name attribute required" unless defined $self->$name;
    }
}

sub do {
    weaken(my $self = shift);
    my (@args) = @_;
    if (not( $self->available )) {
        croak "worker $self is not available for work";
    }

    my $f = $self->code->(@args);
    if (not( blessed($f) and $f->isa('Future') )) {
        croak "code for worker $self did not return a Future: returned $f";
    }
    $self->available( $self->available - 1 );

t/lib/Test/Broker/Async/Trace.pm  view on Meta::CPAN

package Test::Broker::Async::Trace;
use strict;
use warnings;
use Future;
use Scalar::Util qw( weaken );
use Class::Tiny qw(), {
    _live    => sub { {} },
    futures  => sub { {} },
    started  => sub { [] },
};

sub live {
    my ($self)  = @_;
    my @started = @{ $self->started };

    my @live =
        map  { $_->[0]                   }
        sort { $a->[1] <=> $b->[1]       }
        map  { [$_, index($_, @started)] }
             keys %{ $self->_live };

    return \@live;
}

sub worker {
    weaken(my $self = shift);
    my $code = $_[0] ||= sub { Future->new };

    return sub {
        my ($id) = @_;
        my $future = $code->(@_);

        push @{ $self->started }, $id;
        $self->_live->{$id}   = $future;
        $self->futures->{$id} = $future;



( run in 0.309 second using v1.01-cache-2.11-cpan-65fba6d93b7 )