Coro-PrioChannel
view release on metacpan or search on metacpan
lib/Coro/PrioChannel/Multi.pm view on Meta::CPAN
package Coro::PrioChannel::Multi;
{
$Coro::PrioChannel::Multi::VERSION = '0.005';
}
use strict;
use warnings;
# ABSTRACT: Multiple-listener priority message queues for Coro
use Coro::PrioChannel;
use Scalar::Util qw(weaken);
sub LIM() { 0 }
sub CHAN() { 1 }
sub new
{
my $class = shift;
my $limit = shift;
my $self = bless [$limit, []], $class;
$self;
}
sub clean
{
my $self = shift;
@{$self->[CHAN]} = grep { defined } @{$self->[CHAN]};
# when we pull out the refs this way, they're
# no longer weakened, so re-weaking everything.
# (easier than using splice to pull undef items out -
# if we get too many readers, we'll re-evaluate if this
# is slow.)
weaken($_) for @{$self->[CHAN]};
}
sub number_of_listeners
{
my $self = shift;
$self->clean();
scalar @{$self->[CHAN]};
}
# debugging aid.
sub _status
{
my $self = shift;
$self->clean();
'Channel=size :: ' . join ":", map { $_ . "=" . $self->[CHAN][$_]->size() } 0..$#$self;
}
# create new channel, add it to $self, ensure it's weakened, and return
# the non-weak version.
sub listen
{
my $self = shift;
my $channel = Coro::PrioChannel->new($self->[LIM]);
push @{$self->[CHAN]}, $channel;
$self->clean();
$channel;
}
sub put
{
my $self = shift;
$self->clean();
# if we were really multi-threaded, we'd still
# have to check if $_ was defined, but Coro eliminates
# that possibility since nothing else really runs between
# the clean() above and this (we don't cede)
$_->put(@_) for (@{$self->[CHAN]});
}
1;
__END__
=pod
=head1 NAME
Coro::PrioChannel::Multi - Multiple-listener priority message queues for Coro
=head1 VERSION
version 0.005
=head1 SYNOPSIS
use Coro::PrioChannel::Multi;
my $q = Coro::PrioChannel::Multi->new($maxsize);
$q->put("xxx"[, $prio]);
my $l = $q->listen();
print $l->get; # from Coro::PrioChannel
=head1 DESCRIPTION
A Coro::PrioChannel::Multi is exactly like L<Coro::PrioChannel>, but with
the ability to add (and lose) listeners.
Unlike Coro::Channel, you do have to load this module directly.
Each item that is put into the channel will get sent to all listener(s).
However, there is no deep copy here, if the item put in is a reference,
( run in 0.980 second using v1.01-cache-2.11-cpan-f56aa216473 )