Async-Util

 view release on metacpan or  search on metacpan

lib/Async/Util.pm  view on Meta::CPAN

package Async::Util;
BEGIN {
    $Async::Util::VERSION = '0.01';
}
# ABSTRACT: utilities for doing common async operations
use strict;
use warnings;
use v5.10;
no warnings 'recursion';
use Carp;
use Exporter;
use Scalar::Util qw(weaken);

our @ISA               = qw(Exporter);
our @EXPORT_OK         = qw(amap azipmap achain);
my  $DEFAULT_AT_A_TIME = 100;

sub amap {
    my (%args) = @_;

    return _amap_ignore(%args) if exists $args{output} && !$args{output};
    return _amap(%args);
}

sub _amap {
    my (%args) = @_;

    my $action    = $args{action};
    my $inputs    = $args{inputs};
    my $cb        = $args{cb};
    my $at_a_time = $args{at_a_time} || $DEFAULT_AT_A_TIME;

    croak q/Argument 'inputs' is required/ if !defined $inputs;
    croak q/Argument 'action' is required/ if !defined $action;
    croak q/Argument 'cb' is required/     if !defined $cb;

    croak q/Argument 'inputs' must be an ArrayRef/ if ref $inputs ne 'ARRAY';
    croak q/Argument 'action' must be a CodeRef/   if ref $action ne 'CODE';
    croak q/Argument 'cb'     must be a CodeRef/   if ref $cb ne 'CODE';

    my $inflight    = 0;
    my $cb_count    = 0;
    my $input_index = 0;
    my $outputs     = [];
    my $any_err     = 0;
    my $after_work;

    my $run = sub {

        while ($inflight < $at_a_time && $input_index <= $#{ $inputs }) {

            $inflight++;

            my $index = $input_index;
            my $input = $inputs->[ $index ];
            $input_index++;

            my $after_work_wrapper = sub {
                my ($res, $err) = @_;
                my $i = $index;
                $after_work->($res, $err, $i);
            };

            $action->($input, $after_work_wrapper);

            weaken $after_work_wrapper;
        }

    };

    $after_work = sub {
        my ($output, $err, $index) = @_;

        $cb_count++;
        $inflight--;

        return if $any_err;

        if ($err) {
            $any_err = 1;
            return $cb->(undef, $err);
        }

        # store the output
        $outputs->[$index] = $output;

        return $cb->($outputs) if $cb_count == @{ $inputs };

        $run->();
    };

    $run->();
    weaken $run;

    return;
}

sub _amap_ignore {
    my (%args) = @_;

    my $action    = $args{action};
    my $inputs    = $args{inputs};
    my $cb        = $args{cb};
    my $at_a_time = $args{at_a_time} || $DEFAULT_AT_A_TIME;

    croak q/Argument 'inputs' is required/ if !defined $inputs;
    croak q/Argument 'action' is required/ if !defined $action;
    croak q/Argument 'cb' is required/     if !defined $cb;

    croak q/Argument 'inputs' must be an ArrayRef/ if ref $inputs ne 'ARRAY';
    croak q/Argument 'action' must be a CodeRef/   if ref $action ne 'CODE';
    croak q/Argument 'cb'     must be a CodeRef/   if ref $cb ne 'CODE';

    my $inflight    = 0;
    my $cb_count    = 0;
    my $input_index = 0;
    my $any_err     = 0;
    my $after_work;

    my $run = sub {

        while ($inflight < $at_a_time && $input_index <= $#{ $inputs }) {

            $inflight++;

            my $index = $input_index;
            my $input = $inputs->[ $index ];
            $input_index++;

            $action->($input, $after_work);
        }
    };

    $after_work = sub {
        my (undef, $err) = @_;

        $cb_count++;
        $inflight--;

        return if $any_err;

        if ($err) {
            $any_err = 1;
            return $cb->(undef, $err);
        }

        return $cb->() if $cb_count == @{ $inputs };

        $run->();
    };

    $run->();

    weaken $after_work;

    return;
}

sub azipmap {
    my (%args) = @_;

    my $actions   = $args{actions};
    my $inputs    = $args{inputs};
    my $cb        = $args{cb};
    my $at_a_time = $args{at_a_time} || $DEFAULT_AT_A_TIME;

    croak q/Argument 'inputs' is required/  if !defined $inputs;
    croak q/Argument 'actions' is required/ if !defined $actions;
    croak q/Argument 'cb' is required/      if !defined $cb;

    croak q/Argument 'actions' must be an ArrayRef/ if ref $actions ne 'ARRAY';
    croak q/Argument 'cb' must be a CodeRef/        if ref $cb ne 'CODE';

    $inputs //= map { undef } 1..@{ $actions };

    my $inflight = 0;
    my $cb_count = 0;
    my $work_idx = 0;
    my $outputs  = [];
    my $any_err  = 0;
    my $after_work;

    my $run = sub {

        while ($inflight < $at_a_time && $work_idx <= $#{ $actions }) {

            $inflight++;

            my $index  = $work_idx;
            my $action = $actions->[ $index ];
            my $input  = $inputs->[ $index ];
            $work_idx++;

            my $after_work_wrapper = sub {
                my $i = $index;
                $after_work->($_[0], $_[1], $i);
            };

            $action->($input, $after_work_wrapper);

            weaken $after_work_wrapper;
        }
    };

    $after_work = sub {
        my ($output, $err, $index) = @_;

        $cb_count++;
        $inflight--;

        return if $any_err;

        if ($err) {
            $any_err = 1;
            $cb->(undef, $err);
        }

        $outputs->[$index] = $output;

        return $cb->($outputs) if $cb_count == @{ $actions };

        $run->();
    };

    $run->();
    weaken $run;

    return;
}

sub achain {
    my (%args) = @_;

    my $input  = $args{input};
    my $cb     = $args{cb};
    my $steps  = $args{steps};

    croak q/Argument 'cb' is required/    if !defined $cb;
    croak q/Argument 'steps' is required/ if !defined $steps;

    croak q/Argument 'cb' must be a CodeRef/      if ref $cb ne 'CODE';
    croak q/Argument 'steps' must be an ArrayRef/ if ref $steps ne 'ARRAY';

    my $run; $run = sub {
        my ($result, $err) = @_;

        return $cb->(undef, $err) if $err;

        my $next_cb = shift @{ $steps };

        return $cb->($result) if !defined $next_cb;

        $next_cb->($result, $run);
    };

    $run->($input);
    weaken $run;

    return;
}

1;

__END__

=pod

=head1 NAME

Async::Util - Utilities for common asynchronous programming tasks

=head1 SYNOPSIS

    use Async::Util qw(amap azipmap achain);

    # async map
    amap(
        inputs => [ 'foo', 'bar' ],
        action => \&something_asynchronous,
        cb     => \&do_this_at_the_end,
    );

    # invoke action on the corresponding input
    azipmap(
        inputs  => [ 1, 1, 1 ],
        actions => [
            ... # asynchronous subs
        ],
        cb     => \&do_this_at_the_end,
    );

    # execute steps in order
    achain(
        input => 2,
        steps => [
            ... # asynchronous subs
        ],
        cb    => \&do_this_at_the_end,
    );

Examples using AnyEvent:

    use AnyEvent;
    use Async::Util qw(amap);

    my @timers;
    my $delayed_double = sub {
        my ($input, $cb) = @_;

        push @timers, AnyEvent->timer(after => 2, cb => sub {
            $cb->($input*2);
        });
    };

    my $cv = AE::cv;

    amap(



( run in 3.109 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )