Async-Stream

 view release on metacpan or  search on metacpan

lib/Async/Stream.pm  view on Meta::CPAN


use 5.010;
use strict;
use warnings;
no warnings qw(ambiguous);

use Async::Stream::Item;
use Async::Stream::Iterator;

use Carp         qw(croak);
use Scalar::Util qw(weaken);

=head1 NAME

Async::Stream - it's convenient way to work with async data flow.

=head1 VERSION

Version 0.11

=cut

lib/Async/Stream.pm  view on Meta::CPAN

									@stream_items = sort $comporator @stream_items;
								}
								$is_sorted = 1;
								$return_cb->(shift @stream_items);
							} else {
								$return_cb->();
							}
						}
					});
			};$next_cb->();
			weaken $next_cb;
		}
	};

	$self->_set_head($generator, prefetch => 0);

	return $self;
}

=head2 cut_sort($predicate, $comparator)

lib/Async/Stream.pm  view on Meta::CPAN

						} else {
							if (@stream_items) {
								$is_received = 1;
								$return_cb->(pop @stream_items);
							} else {
								$return_cb->();
							}
						}
					});
			};$next_cb->();
			weaken $next_cb;
		}
	};

	$self->_set_head($generator, prefetch => 0);

	return $self;
}

=head2 merge_in($comparator, @list_streams);

lib/Async/Stream.pm  view on Meta::CPAN

		my $next_cb = $next_cb;
		$iterator->next(sub {
				if (@_) {
					push @result, $_[0];
					$next_cb->();
				} else {
					$return_cb->(\@result);
				}
			});
	};$next_cb->();
	weaken $next_cb;

	return $self;
}

=head2 each($action)

Method execute action on each item in stream.

  $stream->each(sub {
      print $_, "\n";

lib/Async/Stream.pm  view on Meta::CPAN


	my $each; $each = sub {
		my $each = $each;
		$iterator->next(sub {
			if (@_) {
				$action->() for ($_[0]);
				$each->()
			}
		});
	}; $each->();
	weaken $each;

	return $self;
}


=head2 shift_each($action)

Method acts like each,but after process item, it removes them from the stream

  $stream->shift_each(sub {

lib/Async/Stream.pm  view on Meta::CPAN


	my $each; $each = sub {
		my $each = $each;
		$self->shift(sub {
			if (@_) {
				$action->() for ($_[0]);
				$each->()
			}
		});
	}; $each->();
	weaken $each;

	return $self;
}

=head2 reduce($accumulator, $returing_cb)

Performs a reduction on the items of the stream.

  $stream->reduce(
    sub{ $a + $b }, 

lib/Async/Stream.pm  view on Meta::CPAN

									local *{ $pkg . '::a' } = \$prev;
									local *{ $pkg . '::b' } = \$_[0];
									$prev = $code->();
								}
								$reduce_cb->();
							} else {
								$return_cb->($prev);
							}
						});
				};$reduce_cb->();
				weaken $reduce_cb;
			} else {
				$return_cb->();
			}
		});

	return $self;
}

=head2 sum($returing_cb)

lib/Async/Stream.pm  view on Meta::CPAN

	my $next_cb ; $next_cb = sub {
		my $next_cb = $next_cb;
		$iterator->next(sub {
				if (@_) {
					$result++;
					return $next_cb->();
				}
				$return_cb->($result)
			});
	}; $next_cb->();
	weaken $next_cb;

	return $self;
}

=head2 any($predicat, $return_cb)

Method look for any equivalent item in steam. if there is any then return that.
if there isn't  then return nothing.

  $stream->any(sub {$_ % 2}, sub{

lib/Async/Stream.pm  view on Meta::CPAN

				if ($is_valid) {
					$return_cb->($_[0]);
				} else {
					$next_cb->();
				}
			} else {
				$return_cb->()
			}
		});
	}; $next_cb->();
	weaken $next_cb;

	return $self;
}


sub _set_head {
	my $self = shift;
	my $generator = shift;
	my %args = @_;



( run in 0.531 second using v1.01-cache-2.11-cpan-65fba6d93b7 )