Async-Stream
view release on metacpan or search on metacpan
lib/Async/Stream/Pushable.pm view on Meta::CPAN
package Async::Stream::Pushable;
use 5.010;
use strict;
use warnings;
use base qw(Async::Stream);
use Carp qw(croak);
=head1 NAME
Use that class for creating streams which you can use to push item to them.
=head1 VERSION
Version 0.11
=cut
our $VERSION = '0.12';
=head1 SYNOPSIS
Creating pushable streams,
you can use this type of streams for example for observer pattern
use Async::Stream::Pushable;
my $stream = Async::Stream::Pushable->new;
$stream->push(1,2,3)->finalize;
# Or for example
my $stream = Async::Stream::Pushable->new;
$some_object->subscribe(sub {
my $new_item = shift;
$stream->push($new_item);
});
=head1 SUBROUTINES/METHODS
=head2 new(@array_of_items)
Constructor creates instance of class.
Class method create stream which you can use that to push items to that.
my $stream = Async::Stream::Pushable->new(@urls)
=cut
sub new {
my $class = shift;
my $self;
$self = $class->SUPER::new(
sub {
my ($return_cb) = @_;
if (@{$self->{_items}}) {
$return_cb->(shift @{ $self->{_items} });
} else {
if ($self->{_is_finalized}){
$return_cb->();
}
else {
push @{ $self->{_requests} }, $return_cb;
}
}
},
@_,
);
$self->{_items} = [];
$self->{_requests} = [];
$self->{_is_finalized} = 0;
return $self;
}
=head2 push(@new_items)
Push new items to stream
my $stream->push(@new_items);
=cut
sub push {
my ($self, @new_items) = @_;
croak q{The stream is finalized} if ($self->{_is_finalized});
while (@{ $self->{_requests} }) {
my $return_cb = shift @{ $self->{_requests} };
$return_cb->( shift @new_items );
}
if (@new_items) {
push @{ $self->{_items} }, @new_items;
}
return $self;
}
=head2 finalize()
Finalize stream
my $stream->finalize;
=cut
sub finalize {
my ($self) = @_;
croak q{The stream has already been finalized} if ($self->{_is_finalized});
( run in 1.243 second using v1.01-cache-2.11-cpan-ceb78f64989 )