BusyBird

 view release on metacpan or  search on metacpan

lib/BusyBird/Flow.pm  view on Meta::CPAN

package BusyBird::Flow;
use v5.8.0;
use strict;
use warnings;
use Async::Queue;
use BusyBird::Log qw(bblog);
use CPS qw(kforeach);
use Carp;
use Scalar::Util qw(weaken);
use Try::Tiny;

sub new {
    my ($class) = @_;
    my $self = bless {
        filters => [],
    }, $class;
    $self->{queue} = $self->_create_queue();
    return $self;
}

sub _create_queue {
    my ($self) = @_;
    weaken $self;
    return Async::Queue->new(concurrency => 1, worker => sub {
        my ($data, $done) = @_;
        kforeach $self->{filters}, sub {
            my ($filter, $knext) = @_;
            try {
                $filter->($data, sub {
                    my ($result) = @_;
                    if(ref($result) && ref($result) eq 'ARRAY') {
                        $data = $result;
                    }else {
                        bblog('warn', 'The filter did not return an array-ref. Ignored.');
                    }
                    $knext->();
                });
            }catch {
                my ($e) = @_;
                bblog('error', "Filter dies: $e");
                $knext->();
            };
        }, sub {
            $done->($data);
        };
    });
}

sub add {
    my ($self, $async_filter) = @_;
    if($self->{queue}->running) {
        croak "You cannot add a filter while there is a status running in it."
    }
    push(@{$self->{filters}}, $async_filter);
}

sub execute {
    my ($self, $data, $callback) = @_;
    $self->{queue}->push($data, $callback);
}


1;

__END__

=pod

=head1 NAME



( run in 2.397 seconds using v1.01-cache-2.11-cpan-d8267643d1d )