Async-Queue

 view release on metacpan or  search on metacpan

lib/Async/Queue.pm  view on Meta::CPAN

package Async::Queue;

use 5.006;
use strict;
use warnings;

use Carp;
use Scalar::Util qw(looks_like_number);

sub new {
    my ($class, %options) = @_;
    my $self = bless {
        concurrency => 1,
        worker => undef,
        drain => undef,
        empty => undef,
        saturated => undef,
        task_queue => [],
        running => 0,
    }, $class;
    $self->$_($options{$_}) foreach qw(concurrency worker drain empty saturated);
    return $self;
}

sub _define_hook_accessors {
    my ($name, %options) = @_;
    my $class = __PACKAGE__;
    my $fullname = "${class}::$name";
    no strict 'refs';
    *{$fullname} = sub {
        my ($self, $v) = @_;
        if(@_ > 1) {
            croak "$name must not be undef." if !defined($v) && !$options{allow_undef};
            croak "$name must be a coderef" if defined($v) && ref($v) ne 'CODE';
            croak "You cannot set $name while there is a running task." if $self->running > 0;
            $self->{$name} = $v;
        }
        return $self->{$name};
    };
}

sub running {
    my ($self) = @_;
    return $self->{running};
}

sub concurrency {
    my ($self, $conc) = @_;
    if(@_ > 1) {
        croak "You cannot set concurrency while there is a running task" if $self->running > 0;
        $conc = 1 if not defined($conc);
        croak "concurrency must be a number" if !looks_like_number($conc);
        $self->{concurrency} = int($conc);
    }
    return $self->{concurrency};
}

sub length {
    my ($self) = @_;
    return int(@{$self->{task_queue}});
}

*waiting = \&length;

_define_hook_accessors 'worker';
_define_hook_accessors $_, allow_undef => 1 foreach qw(drain empty saturated);

sub push {
    my ($self, $task, $cb) = @_;
    if(@_ < 2) {
        croak("You must specify something to push.");
    }
    if(defined($cb) && ref($cb) ne 'CODE') {
        croak("callback for a task must be a coderef");
    }
    push(@{$self->{task_queue}}, [$task, $cb]);
    $self->_shift_run(1);
    return $self;
}

sub _shift_run {
    my ($self, $from_push) = @_;
    return if $self->concurrency > 0 && $self->running >= $self->concurrency;
    my $args_ref = shift(@{$self->{task_queue}});
    return if !defined($args_ref);
    my ($task, $cb) = @$args_ref;
    $self->{running} += 1;
    if($self->running == $self->concurrency && $from_push && defined($self->saturated)) {
        $self->saturated->($self);
    }
    if(@{$self->{task_queue}} == 0 && defined($self->empty)) {
        $self->empty->($self);
    }
    my $sync = 1;
    my $sync_completed = 0;
    $self->worker->($task, sub {
        my (@worker_results) = @_;
        $cb->(@worker_results) if defined($cb);
        $self->{running} -= 1;
        if(@{$self->{task_queue}} == 0 && $self->running == 0 && defined($self->drain)) {
            $self->drain->($self);
        }
        if($sync) {
            $sync_completed = 1;
        }else {
            @_ = ($self);
            goto &_shift_run;
        }
    }, $self);
    $sync = 0;
    if($sync_completed) {
        @_ = ($self);



( run in 1.418 second using v1.01-cache-2.11-cpan-39bf76dae61 )