Myriad
view release on metacpan or search on metacpan
lib/Myriad/Transport/Redis.pm view on Meta::CPAN
=over 4
=item * C<stream> - The name of the stream group belongs to.
=item * C<group> - The consumer group name.
=back
=cut
async method remove_group ($stream, $group) {
try {
my @args = ('DESTROY', $self->apply_prefix($stream), $group);
await $redis->xgroup(@args);
$log->tracef('Deleted consumergroup: %s from stream: %s', $group, $stream);
} catch ($e) {
if ($e =~ /requires the key to exist/) {
$log->warnf('Trying to remove a consumergroup(%s) from stream: %s that does not exist', $group, $stream);
Myriad::Exception::Transport::Redis::NoSuchStream->throw(
reason => "no such stream: $stream",
);
} else {
die $e;
}
}
}
=head2 pending_messages_info
Return information about the pending messages for a stream and a consumer group.
This currently just execute C<XPENDING> without any filtering.
=over 4
=item * C<stream> - The name of the stream we want to check.
=item * C<group> - The consumers group name that we want to check.
=back
=cut
async method pending_messages_info($stream, $group) {
await $redis->xpending($self->apply_prefix($stream), $group);
}
=head2 stream_length
Return the length of a given stream
=cut
async method stream_length ($stream) {
return await $redis->xlen($self->apply_prefix($stream));
}
=head2 borrow_instance_from_pool
Returns a Redis connection either from a pool of connection or a new one.
With the possibility of waiting to get one, if all connection were busy and we maxed out our limit.
=cut
async method borrow_instance_from_pool {
$log->tracef('Available Redis pool count: %d', 0 + $redis_pool->@*);
if (my $available_redis = shift $redis_pool->@*) {
++$pending_redis_count;
return $available_redis;
} elsif ($pending_redis_count < $max_pool_count) {
++$pending_redis_count;
return await $self->redis;
}
push @$waiting_redis_pool, my $f = $self->loop->new_future;
$log->debugf('All Redis instances are pending, added to waiting list. Current Redis count: %d/%d | Waiting count: %d', $pending_redis_count, $max_pool_count, 0 + $waiting_redis_pool->@*);
return await $f;
}
=head2 return_instance_to_pool
This puts back a redis connection into Redis pool, so it can be used by other called.
It should be called at the end of every usage, as on_ready.
It should also be possible with a try/finally combination..
but that's currently failing with the $redis_pool slot not being defined.
Takes the following parameters:
=over 4
=item * C<$instance> - Redis connection to be returned.
=back
=cut
method return_instance_to_pool ($instance) {
if( my $waiting_redis = shift $waiting_redis_pool->@*) {
$waiting_redis->done($instance)
} else {
push $redis_pool->@*, $instance;
$log->tracef('Returning instance to pool, Redis used/available now %d/%d', $pending_redis_count, 0 + $redis_pool->@*);
$pending_redis_count--;
}
return;
}
=head2 redis
Resolves to a new L<Net::Async::Redis> or L<Net::Async::Redis::Cluster>
instance, depending on the setting of C<$use_cluster>.
=cut
async method redis () {
my $instance;
if($use_cluster) {
$instance = Net::Async::Redis::Cluster->new(
client_side_cache_size => $clientside_cache_size,
);
$self->add_child(
( run in 0.740 second using v1.01-cache-2.11-cpan-5b529ec07f3 )