Async-Stream
view release on metacpan or search on metacpan
lib/Async/Stream.pm view on Meta::CPAN
package Async::Stream;
use 5.010;
use strict;
use warnings;
no warnings qw(ambiguous);
use Async::Stream::Item;
use Async::Stream::Iterator;
use Carp qw(croak);
use Scalar::Util qw(weaken);
=head1 NAME
Async::Stream - it's convenient way to work with async data flow.
=head1 VERSION
Version 0.11
=cut
our $VERSION = '0.12';
=head1 SYNOPSIS
Module helps to organize your async code to stream.
use Async::Stream;
my @urls = qw(
http://ucoz.com
http://ya.ru
http://google.com
);
my $stream = Async::Stream::FromArray->new(@urls);
$stream
->transform(sub {
$return_cb = shift;
http_get $_, sub {
$return_cb->({headers => $_[0], body => $_[0]})
};
})
->grep(sub { $_->{headers}->{Status} =~ /^2/ })
->each(sub {
print $_->{body};
});
=head1 SUBROUTINES/METHODS
=head2 new($generator)
Constructor creates instance of class.
Class method gets 1 arguments - generator subroutine references to generate items.
Generator will get a callback which it will use for returning result.
If generator is exhausted then returning callback is called without arguments.
my $i = 0;
my $stream = Async::Stream->new(sub {
$return_cb = shift;
if ($i < 10) {
$return_cb->($i++);
} else {
$return_cb->();
}
});
=cut
sub new {
my $class = shift;
lib/Async/Stream.pm view on Meta::CPAN
return;
});
}
return;
};
$self->_set_head($generator, prefetch => 0);
return $self;
}
=head2 sort($comparator)
The method sorts whole stream.
$stream->sort(sub{$a <=> $b})->to_arrayref(sub {print @{$_[0]}});
=cut
sub sort {
my $self = shift;
my $comporator = shift;
if (ref $comporator ne 'CODE') {
croak 'First argument can be only subroutine reference'
}
my $pkg = caller;
my $is_sorted = 0;
my @stream_items;
my $iterator = $self->iterator;
my $generator = sub {
my $return_cb = shift;
if ($is_sorted) {
$return_cb->( @stream_items ? shift @stream_items : () );
} else {
my $next_cb; $next_cb = sub {
my $next_cb = $next_cb;
$iterator->next(sub {
if (@_) {
push @stream_items, $_[0];
$next_cb->();
} else {
if (@stream_items) {
{
no strict 'refs';
local *{ $pkg . '::a' } = *{ __PACKAGE__ . '::a' };
local *{ $pkg . '::b' } = *{ __PACKAGE__ . '::b' };
@stream_items = sort $comporator @stream_items;
}
$is_sorted = 1;
$return_cb->(shift @stream_items);
} else {
$return_cb->();
}
}
});
};$next_cb->();
weaken $next_cb;
}
};
$self->_set_head($generator, prefetch => 0);
return $self;
}
=head2 cut_sort($predicate, $comparator)
Sometimes stream can be infinity and you can't you $stream->sort,
you need certain parts of streams for example cut part by length of items.
$stream
->cut_sort(sub {length $a != length $b},sub {$a <=> $b})
->to_arrayref(sub {print @{$_[0]}});
=cut
sub cut_sort {
my $self = shift;
my $cut = shift;
my $comporator = shift;
if (ref $cut ne 'CODE' or ref $comporator ne 'CODE') {
croak 'First and Second arguments can be only subrotine references'
}
my $pkg = caller;
my $iterator = $self->iterator;
my $prev;
my @cur_slice;
my @sorted_array;
my $generator; $generator = sub {
my $return_cb = shift;
if (@sorted_array) {
$return_cb->(shift @sorted_array);
} else {
if (!defined $prev) {
$iterator->next(sub {
if (@_){
$prev = $_[0];
@cur_slice = ($prev);
$generator->($return_cb);
} else {
$return_cb->();
}
});
} else {
$iterator->next(sub {
if (@_) {
my $is_cut;
{
no strict 'refs';
local *{ $pkg . '::a' } = \$prev;
local *{ $pkg . '::b' } = \$_[0];
$is_cut = $cut->();
}
$prev = $_[0];
if ($is_cut) {
lib/Async/Stream.pm view on Meta::CPAN
}
} else {
if (@cur_slice) {
{
no strict 'refs';
local *{ $pkg . '::a' } = *{ __PACKAGE__ . '::a' };
local *{ $pkg . '::b' } = *{ __PACKAGE__ . '::b' };
@sorted_array = sort $comporator @cur_slice;
}
@cur_slice = ();
$return_cb->(shift @sorted_array);
} else {
$return_cb->();
}
}
});
}
}
};
$self->_set_head($generator, prefetch => 0);
return $self;
}
=head2 reverse()
Revers order of stream's items. Can't be done on endless stream.
$stream->reverse;
=cut
sub reverse {
my $self = shift;
my $is_received = 0;
my @stream_items;
my $iterator = $self->iterator;
my $generator = sub {
my $return_cb = shift;
if ($is_received) {
$return_cb->( @stream_items ? pop @stream_items : () );
} else {
my $next_cb; $next_cb = sub {
my $next_cb = $next_cb;
$iterator->next(sub {
if (@_) {
push @stream_items, $_[0];
$next_cb->();
} else {
if (@stream_items) {
$is_received = 1;
$return_cb->(pop @stream_items);
} else {
$return_cb->();
}
}
});
};$next_cb->();
weaken $next_cb;
}
};
$self->_set_head($generator, prefetch => 0);
return $self;
}
=head2 merge_in($comparator, @list_streams);
Merge additional streams into current stream by comparing each item of stream.
$stream->merge_in(sub{$a <=> $b}, $stream1, $stream2);
=cut
sub merge_in {
my $self = shift;
my $comporator = shift;
if (ref $comporator ne 'CODE') {
croak 'First argument can be only reference to subroutine';
}
my $pkg = caller;
my @iterators;
for my $stream ($self, @_) {
if ($stream->isa('Async::Stream')) {
push @iterators, [$stream->iterator];
} else {
croak 'Arguments can be only Async::Stream or instances of derived class'
}
}
my $generator = sub {
my $return_cb = shift;
my $requested_item = grep { @{$_} == 1 } @iterators;
for (my $i = 0; $i < @iterators; $i++) {
if (@{$iterators[$i]} == 1) {
my $iterator_id = $i;
$iterators[$iterator_id][0]->next(sub {
$requested_item--;
if (@_) {
my $item = shift;
push @{$iterators[$iterator_id]}, $item;
} else {
$iterators[$iterator_id] = undef;
}
if ($requested_item == 0) {
### it's awful and need to optimize ###
{
no strict 'refs';
my $comp = sub {
local ${ $pkg . '::a' } = $a->[1];
local ${ $pkg . '::b' } = $b->[1];
return $comporator->();
};
@iterators = sort $comp grep { defined $_ } @iterators;
}
### ###
lib/Async/Stream.pm view on Meta::CPAN
my $generator; $generator = sub {
my $return_cb = shift;
$iterator->next(sub {
if (@_) {
my $key;
$key = $to_key->() for ($_[0]);
if (exists $index_of{$key}) {
$generator->($return_cb);
} else {
$index_of{$key} = undef;
$return_cb->($_[0]);
}
} else {
$return_cb->();
}
});
};
$self->_set_head($generator);
return $self;
}
=head1 TERMINAL METHODS
=head2 to_arrayref($returing_cb)
Method returns stream's iterator.
$stream->to_arrayref(sub {
$array_ref = shift;
#...
});
=cut
sub to_arrayref {
my $self = shift;
my $return_cb = shift;
if (ref $return_cb ne 'CODE') {
croak 'First argument can be only subroutine reference'
}
my @result;
my $iterator = $self->iterator;
my $next_cb; $next_cb = sub {
my $next_cb = $next_cb;
$iterator->next(sub {
if (@_) {
push @result, $_[0];
$next_cb->();
} else {
$return_cb->(\@result);
}
});
};$next_cb->();
weaken $next_cb;
return $self;
}
=head2 each($action)
Method execute action on each item in stream.
$stream->each(sub {
print $_, "\n";
});
=cut
sub each {
my $self = shift;
my $action = shift;
if (ref $action ne 'CODE') {
croak 'First argument can be only subroutine reference'
}
my $iterator = $self->iterator;
my $each; $each = sub {
my $each = $each;
$iterator->next(sub {
if (@_) {
$action->() for ($_[0]);
$each->()
}
});
}; $each->();
weaken $each;
return $self;
}
=head2 shift_each($action)
Method acts like each,but after process item, it removes them from the stream
$stream->shift_each(sub {
print $_, "\n";
});
=cut
sub shift_each {
my $self = shift;
my $action = shift;
if (ref $action ne 'CODE') {
croak 'First argument can be only subroutine reference'
}
my $each; $each = sub {
my $each = $each;
$self->shift(sub {
if (@_) {
$action->() for ($_[0]);
$each->()
}
});
}; $each->();
weaken $each;
return $self;
}
=head2 reduce($accumulator, $returing_cb)
Performs a reduction on the items of the stream.
$stream->reduce(
sub{ $a + $b },
sub {
my $sum_of_items = shift;
...
});
=cut
sub reduce {
my $self = shift;
my $code = shift;
my $return_cb = shift;
if (ref $return_cb ne 'CODE' or ref $code ne 'CODE') {
croak 'First and Second arguments can be only subroutine references'
}
my $pkg = caller;
my $iterator = $self->iterator;
$iterator->next(sub {
if (@_) {
my $prev = $_[0];
my $reduce_cb; $reduce_cb = sub {
my $reduce_cb = $reduce_cb;
$iterator->next(sub {
if (@_) {
{
no strict 'refs';
local *{ $pkg . '::a' } = \$prev;
local *{ $pkg . '::b' } = \$_[0];
$prev = $code->();
}
$reduce_cb->();
} else {
$return_cb->($prev);
}
});
};$reduce_cb->();
weaken $reduce_cb;
} else {
$return_cb->();
}
});
return $self;
}
=head2 sum($returing_cb)
The method computes sum of all items in stream.
$stream->sum(
sub {
my $sum_of_items = shift;
...
});
=cut
sub sum {
my $self = shift;
my $return_cb = shift;
if (ref $return_cb ne 'CODE') {
croak 'First argument can be only subroutine reference'
}
$self->reduce(sub{$a+$b}, $return_cb);
return $self;
}
=head2 min($returing_cb)
The method finds out minimum item among all items in stream.
$stream->min(
sub {
my $min_item = shift;
...
});
=cut
sub min {
my $self = shift;
my $return_cb = shift;
if (ref $return_cb ne 'CODE') {
croak 'First argument can be only subroutine reference'
}
$self->reduce(sub{$a < $b ? $a : $b}, $return_cb);
return $self;
}
=head2 max($returing_cb)
The method finds out maximum item among all items in stream.
$stream->max(
sub {
my $max_item = shift;
...
});
=cut
sub max {
my $self = shift;
my $return_cb = shift;
if (ref $return_cb ne 'CODE') {
croak 'First argument can be only subroutine reference'
}
$self->reduce(sub{$a > $b ? $a : $b}, $return_cb);
return $self;
}
=head2 count($returing_cb)
The method counts number items in streams.
$stream->count(sub {
my $count = shift;
});
=cut
sub count {
my $self = shift;
my $return_cb = shift;
if (ref $return_cb ne 'CODE') {
croak 'First argument can be only subroutine reference'
}
my $result = 0;
my $iterator = $self->iterator;
my $next_cb ; $next_cb = sub {
my $next_cb = $next_cb;
$iterator->next(sub {
if (@_) {
$result++;
return $next_cb->();
}
$return_cb->($result)
});
}; $next_cb->();
weaken $next_cb;
return $self;
}
=head2 any($predicat, $return_cb)
Method look for any equivalent item in steam. if there is any then return that.
if there isn't then return nothing.
$stream->any(sub {$_ % 2}, sub{
my $odd_item = shift
...
});
=cut
sub any {
my $self = shift;
my $predicat = shift;
my $return_cb = shift;
my $iterator = $self->iterator;
my $next_cb; $next_cb = sub {
my $next_cb = $next_cb;
$iterator->next(sub {
if (@_) {
my $is_valid;
$is_valid = $predicat->() for ($_[0]);
if ($is_valid) {
$return_cb->($_[0]);
} else {
$next_cb->();
}
} else {
$return_cb->()
}
});
}; $next_cb->();
weaken $next_cb;
return $self;
}
sub _set_head {
my $self = shift;
my $generator = shift;
my %args = @_;
my $prefetch = $args{prefetch} // $self->{_prefetch};
my $new_generator = $generator;
if ($prefetch) {
$new_generator = _get_prefetch_generator($generator, $self->{_prefetch});
}
$self->{_head} = Async::Stream::Item->new(undef, $new_generator);
return $self;
}
sub _get_prefetch_generator {
my ($generator,$prefetch) = @_;
my @responses_cache;
my @requests_queue;
my $is_exhausted = 0;
my $item_requested = 0;
return sub {
my $return_cb = shift;
if (@responses_cache) {
$return_cb->(shift @responses_cache);
} else {
push @requests_queue, $return_cb;
}
if (!$is_exhausted) {
for (0 .. ($prefetch - $item_requested)) {
$item_requested++;
$generator->(sub {
$item_requested--;
if (@_) {
if (@requests_queue) {
shift(@requests_queue)->($_[0]);
} else {
push @responses_cache, $_[0];
}
} else {
$is_exhausted = 1;
if (!$item_requested && @requests_queue) {
shift(@requests_queue)->();
}
}
});
}
} elsif (!$item_requested && @requests_queue) {
( run in 2.853 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )