Async-Util

 view release on metacpan or  search on metacpan

lib/Async/Util.pm  view on Meta::CPAN

BEGIN {
    $Async::Util::VERSION = '0.01';
}
# ABSTRACT: utilities for doing common async operations
use strict;
use warnings;
use v5.10;
no warnings 'recursion';
use Carp;
use Exporter;
use Scalar::Util qw(weaken);

our @ISA               = qw(Exporter);
our @EXPORT_OK         = qw(amap azipmap achain);
my  $DEFAULT_AT_A_TIME = 100;

sub amap {
    my (%args) = @_;

    return _amap_ignore(%args) if exists $args{output} && !$args{output};
    return _amap(%args);

lib/Async/Util.pm  view on Meta::CPAN

            $input_index++;

            my $after_work_wrapper = sub {
                my ($res, $err) = @_;
                my $i = $index;
                $after_work->($res, $err, $i);
            };

            $action->($input, $after_work_wrapper);

            weaken $after_work_wrapper;
        }

    };

    $after_work = sub {
        my ($output, $err, $index) = @_;

        $cb_count++;
        $inflight--;

lib/Async/Util.pm  view on Meta::CPAN


        # store the output
        $outputs->[$index] = $output;

        return $cb->($outputs) if $cb_count == @{ $inputs };

        $run->();
    };

    $run->();
    weaken $run;

    return;
}

sub _amap_ignore {
    my (%args) = @_;

    my $action    = $args{action};
    my $inputs    = $args{inputs};
    my $cb        = $args{cb};

lib/Async/Util.pm  view on Meta::CPAN

            return $cb->(undef, $err);
        }

        return $cb->() if $cb_count == @{ $inputs };

        $run->();
    };

    $run->();

    weaken $after_work;

    return;
}

sub azipmap {
    my (%args) = @_;

    my $actions   = $args{actions};
    my $inputs    = $args{inputs};
    my $cb        = $args{cb};

lib/Async/Util.pm  view on Meta::CPAN

            my $input  = $inputs->[ $index ];
            $work_idx++;

            my $after_work_wrapper = sub {
                my $i = $index;
                $after_work->($_[0], $_[1], $i);
            };

            $action->($input, $after_work_wrapper);

            weaken $after_work_wrapper;
        }
    };

    $after_work = sub {
        my ($output, $err, $index) = @_;

        $cb_count++;
        $inflight--;

        return if $any_err;

lib/Async/Util.pm  view on Meta::CPAN

        }

        $outputs->[$index] = $output;

        return $cb->($outputs) if $cb_count == @{ $actions };

        $run->();
    };

    $run->();
    weaken $run;

    return;
}

sub achain {
    my (%args) = @_;

    my $input  = $args{input};
    my $cb     = $args{cb};
    my $steps  = $args{steps};

lib/Async/Util.pm  view on Meta::CPAN

        return $cb->(undef, $err) if $err;

        my $next_cb = shift @{ $steps };

        return $cb->($result) if !defined $next_cb;

        $next_cb->($result, $run);
    };

    $run->($input);
    weaken $run;

    return;
}

1;

__END__

=pod



( run in 0.651 second using v1.01-cache-2.11-cpan-65fba6d93b7 )