Async-Stream
view release on metacpan or search on metacpan
lib/Async/Stream.pm view on Meta::CPAN
my $prefetch = $args{prefetch} // $self->{_prefetch};
my $new_generator = $generator;
if ($prefetch) {
$new_generator = _get_prefetch_generator($generator, $self->{_prefetch});
}
$self->{_head} = Async::Stream::Item->new(undef, $new_generator);
return $self;
}
sub _get_prefetch_generator {
my ($generator,$prefetch) = @_;
my @responses_cache;
my @requests_queue;
my $is_exhausted = 0;
my $item_requested = 0;
return sub {
my $return_cb = shift;
if (@responses_cache) {
$return_cb->(shift @responses_cache);
} else {
push @requests_queue, $return_cb;
}
if (!$is_exhausted) {
for (0 .. ($prefetch - $item_requested)) {
$item_requested++;
$generator->(sub {
$item_requested--;
if (@_) {
if (@requests_queue) {
shift(@requests_queue)->($_[0]);
} else {
push @responses_cache, $_[0];
}
} else {
$is_exhausted = 1;
if (!$item_requested && @requests_queue) {
shift(@requests_queue)->();
}
}
});
}
} elsif (!$item_requested && @requests_queue) {
shift(@requests_queue)->();
}
};
}
=head1 AUTHOR
Kirill Sysoev, C<< <k.sysoev at me.com> >>
=head1 BUGS AND LIMITATIONS
Please report any bugs or feature requests to
L<https://github.com/pestkam/p5-Async-Stream/issues>.
=head1 SUPPORT
You can find documentation for this module with the perldoc command.
perldoc Async::Stream::Item
=head1 LICENSE AND COPYRIGHT
Copyright 2017 Kirill Sysoev.
This program is free software; you can redistribute it and/or modify it
under the terms of the the Artistic License (2.0). You may obtain a
copy of the full license at:
L<http://www.perlfoundation.org/artistic_license_2_0>
=cut
1; # End of Async::Stream
( run in 2.009 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )