Async-Stream
view release on metacpan or search on metacpan
lib/Async/Stream.pm view on Meta::CPAN
use 5.010;
use strict;
use warnings;
no warnings qw(ambiguous);
use Async::Stream::Item;
use Async::Stream::Iterator;
use Carp qw(croak);
use Scalar::Util qw(weaken);
=head1 NAME
Async::Stream - it's convenient way to work with async data flow.
=head1 VERSION
Version 0.11
=cut
lib/Async/Stream.pm view on Meta::CPAN
@stream_items = sort $comporator @stream_items;
}
$is_sorted = 1;
$return_cb->(shift @stream_items);
} else {
$return_cb->();
}
}
});
};$next_cb->();
weaken $next_cb;
}
};
$self->_set_head($generator, prefetch => 0);
return $self;
}
=head2 cut_sort($predicate, $comparator)
lib/Async/Stream.pm view on Meta::CPAN
} else {
if (@stream_items) {
$is_received = 1;
$return_cb->(pop @stream_items);
} else {
$return_cb->();
}
}
});
};$next_cb->();
weaken $next_cb;
}
};
$self->_set_head($generator, prefetch => 0);
return $self;
}
=head2 merge_in($comparator, @list_streams);
lib/Async/Stream.pm view on Meta::CPAN
my $next_cb = $next_cb;
$iterator->next(sub {
if (@_) {
push @result, $_[0];
$next_cb->();
} else {
$return_cb->(\@result);
}
});
};$next_cb->();
weaken $next_cb;
return $self;
}
=head2 each($action)
Method execute action on each item in stream.
$stream->each(sub {
print $_, "\n";
lib/Async/Stream.pm view on Meta::CPAN
my $each; $each = sub {
my $each = $each;
$iterator->next(sub {
if (@_) {
$action->() for ($_[0]);
$each->()
}
});
}; $each->();
weaken $each;
return $self;
}
=head2 shift_each($action)
Method acts like each,but after process item, it removes them from the stream
$stream->shift_each(sub {
lib/Async/Stream.pm view on Meta::CPAN
my $each; $each = sub {
my $each = $each;
$self->shift(sub {
if (@_) {
$action->() for ($_[0]);
$each->()
}
});
}; $each->();
weaken $each;
return $self;
}
=head2 reduce($accumulator, $returing_cb)
Performs a reduction on the items of the stream.
$stream->reduce(
sub{ $a + $b },
lib/Async/Stream.pm view on Meta::CPAN
local *{ $pkg . '::a' } = \$prev;
local *{ $pkg . '::b' } = \$_[0];
$prev = $code->();
}
$reduce_cb->();
} else {
$return_cb->($prev);
}
});
};$reduce_cb->();
weaken $reduce_cb;
} else {
$return_cb->();
}
});
return $self;
}
=head2 sum($returing_cb)
lib/Async/Stream.pm view on Meta::CPAN
my $next_cb ; $next_cb = sub {
my $next_cb = $next_cb;
$iterator->next(sub {
if (@_) {
$result++;
return $next_cb->();
}
$return_cb->($result)
});
}; $next_cb->();
weaken $next_cb;
return $self;
}
=head2 any($predicat, $return_cb)
Method look for any equivalent item in steam. if there is any then return that.
if there isn't then return nothing.
$stream->any(sub {$_ % 2}, sub{
lib/Async/Stream.pm view on Meta::CPAN
if ($is_valid) {
$return_cb->($_[0]);
} else {
$next_cb->();
}
} else {
$return_cb->()
}
});
}; $next_cb->();
weaken $next_cb;
return $self;
}
sub _set_head {
my $self = shift;
my $generator = shift;
my %args = @_;
( run in 0.531 second using v1.01-cache-2.11-cpan-65fba6d93b7 )