Async-Stream

 view release on metacpan or  search on metacpan

lib/Async/Stream.pm  view on Meta::CPAN


	my $prefetch = $args{prefetch} // $self->{_prefetch};

	my $new_generator = $generator;

	if ($prefetch) {
		$new_generator = _get_prefetch_generator($generator, $self->{_prefetch});
	}

	$self->{_head} = Async::Stream::Item->new(undef, $new_generator);

	return $self;
}

sub _get_prefetch_generator {
	my ($generator,$prefetch) = @_;

	my @responses_cache;
	my @requests_queue;
	my $is_exhausted = 0;
	my $item_requested = 0;

	return sub {
			my $return_cb = shift;

			if (@responses_cache) {
				$return_cb->(shift @responses_cache);
			} else {
				push @requests_queue, $return_cb;
			}

			if (!$is_exhausted) {
				for (0 .. ($prefetch - $item_requested)) {
					$item_requested++;
					$generator->(sub {
							$item_requested--;
							if (@_) {
								if (@requests_queue) {
									shift(@requests_queue)->($_[0]);
								} else {
									push @responses_cache, $_[0];
								}
							} else {
								$is_exhausted = 1;
								if (!$item_requested && @requests_queue) {
									shift(@requests_queue)->();
								}
							}
						});
				}
			} elsif (!$item_requested && @requests_queue) {
				shift(@requests_queue)->();
			}
	};
}

=head1 AUTHOR

Kirill Sysoev, C<< <k.sysoev at me.com> >>

=head1 BUGS AND LIMITATIONS

Please report any bugs or feature requests to 
L<https://github.com/pestkam/p5-Async-Stream/issues>.

=head1 SUPPORT

You can find documentation for this module with the perldoc command.

  perldoc Async::Stream::Item


=head1 LICENSE AND COPYRIGHT

Copyright 2017 Kirill Sysoev.

This program is free software; you can redistribute it and/or modify it
under the terms of the the Artistic License (2.0). You may obtain a
copy of the full license at:

L<http://www.perlfoundation.org/artistic_license_2_0>
=cut

1; # End of Async::Stream



( run in 2.009 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )