BusyBird
view release on metacpan or search on metacpan
lib/BusyBird/Flow.pm view on Meta::CPAN
package BusyBird::Flow;
use v5.8.0;
use strict;
use warnings;
use Async::Queue;
use BusyBird::Log qw(bblog);
use CPS qw(kforeach);
use Carp;
use Scalar::Util qw(weaken);
use Try::Tiny;
sub new {
my ($class) = @_;
my $self = bless {
filters => [],
}, $class;
$self->{queue} = $self->_create_queue();
return $self;
}
sub _create_queue {
my ($self) = @_;
weaken $self;
return Async::Queue->new(concurrency => 1, worker => sub {
my ($data, $done) = @_;
kforeach $self->{filters}, sub {
my ($filter, $knext) = @_;
try {
$filter->($data, sub {
my ($result) = @_;
if(ref($result) && ref($result) eq 'ARRAY') {
$data = $result;
}else {
bblog('warn', 'The filter did not return an array-ref. Ignored.');
}
$knext->();
});
}catch {
my ($e) = @_;
bblog('error', "Filter dies: $e");
$knext->();
};
}, sub {
$done->($data);
};
});
}
sub add {
my ($self, $async_filter) = @_;
if($self->{queue}->running) {
croak "You cannot add a filter while there is a status running in it."
}
push(@{$self->{filters}}, $async_filter);
}
sub execute {
my ($self, $data, $callback) = @_;
$self->{queue}->push($data, $callback);
}
1;
__END__
=pod
=head1 NAME
( run in 2.397 seconds using v1.01-cache-2.11-cpan-d8267643d1d )