Async-Stream

 view release on metacpan or  search on metacpan

lib/Async/Stream.pm  view on Meta::CPAN


	my $iterator = $self->iterator;
	my $generator = sub {
			my $return_cb = shift;
			$iterator->next(sub {
					if (@_) {
						$action->() for ($_[0]);
						$return_cb->($_[0]);
					} else {
						$return_cb->()
					}
				});
		};

	$self->_set_head($generator, prefetch => 0);

	return $self;
}

=head2 grep($predicate)

The method greps current stream. Filter works like lazy grep.

  $stream->grep(sub {$_ % 2})->to_arrayref(sub {print @{$_[0]}});

=cut
sub grep {
	my $self = shift;
	my $predicate = shift;

	if (ref $predicate ne 'CODE') {
		croak 'First argument can be only subroutine reference'
	}

	my $iterator = $self->iterator;

	my $next_cb; $next_cb = sub {
		my $return_cb = shift;
		$iterator->next(sub {
			if (@_) {
				my $is_valid;
				$is_valid = $predicate->() for ($_[0]);
				if ($is_valid) {
					$return_cb->($_[0]);
				} else {
					$next_cb->($return_cb);
				}
			} else {
				$return_cb->()
			}
		});
	};

	$self->_set_head($next_cb, prefetch => 0);

	return $self;
}

=head2 map($transformer)

Method makes synchronous transformation for stream, like usual map for array.

  $stream->map(sub { $_ * 2 })->to_arrayref(sub {print @{$_[0]}});

=cut
sub map {
	my $self = shift;
	my $transformer = shift;

	if (ref $transformer ne 'CODE') {
		croak 'First argument can be only subroutine reference'
	}

	my $iterator = $self->iterator;

	my $next_cb; $next_cb = sub {
		my $return_cb = shift;
		$iterator->next(sub {
			if (@_) {
				$return_cb->($transformer->()) for ($_[0]);
			} else {
				$return_cb->()
			}
		});
	};

	$self->_set_head($next_cb, prefetch => 0);

	return $self;
}

=head2 transform($transformer)

Method transform current stream. 
Transform works like lazy map with async response. 
You can use the method for example for async http request or another async 
operation.

  $stream->transform(sub {
      $return_cb = shift;
      $return_cb->($_ * 2)
    })->to_arrayref(sub {print @{$_[0]}});

=cut
sub transform {
	my $self = shift;
	my $transformer = shift;

	if (ref $transformer ne 'CODE') {
		croak 'First argument can be only subroutine reference'
	}

	my $iterator = $self->iterator;

	my $next_cb; $next_cb = sub {
		my $return_cb = shift;
		$iterator->next(sub {
			if (@_) {
				$transformer->($return_cb) for ($_[0]);
			} else {
				$return_cb->()



( run in 0.610 second using v1.01-cache-2.11-cpan-d8267643d1d )