Myriad

 view release on metacpan or  search on metacpan

lib/Myriad/Transport/Redis.pm  view on Meta::CPAN

=over 4

=item * C<stream> - The name of the stream group belongs to.

=item * C<group> - The consumer group name.

=back

=cut

async method remove_group ($stream, $group) {
    try {
        my @args = ('DESTROY', $self->apply_prefix($stream), $group);
        await $redis->xgroup(@args);
        $log->tracef('Deleted consumergroup: %s from stream: %s', $group, $stream);
    } catch ($e) {
        if ($e =~ /requires the key to exist/) {
            $log->warnf('Trying to remove a consumergroup(%s) from stream: %s that does not exist', $group, $stream);
            Myriad::Exception::Transport::Redis::NoSuchStream->throw(
                reason => "no such stream: $stream",
            );
        } else {
            die $e;
        }
    }
}

=head2 pending_messages_info

Return information about the pending messages for a stream and a consumer group.

This currently just execute C<XPENDING> without any filtering.

=over 4

=item * C<stream> - The name of the stream we want to check.

=item * C<group> - The consumers group name that we want to check.

=back

=cut

async method pending_messages_info($stream, $group) {
    await $redis->xpending($self->apply_prefix($stream), $group);
}

=head2 stream_length

Return the length of a given stream

=cut

async method stream_length ($stream) {
    return await $redis->xlen($self->apply_prefix($stream));
}

=head2 borrow_instance_from_pool

Returns a Redis connection either from a pool of connection or a new one.
With the possibility of waiting to get one, if all connection were busy and we maxed out our limit.

=cut

async method borrow_instance_from_pool {
    $log->tracef('Available Redis pool count: %d', 0 + $redis_pool->@*);
    if (my $available_redis = shift $redis_pool->@*) {
        ++$pending_redis_count;
        return $available_redis;
    } elsif ($pending_redis_count < $max_pool_count) {
        ++$pending_redis_count;
        return await $self->redis;
    }
    push @$waiting_redis_pool, my $f = $self->loop->new_future;
    $log->debugf('All Redis instances are pending, added to waiting list. Current Redis count: %d/%d | Waiting count: %d', $pending_redis_count, $max_pool_count, 0 + $waiting_redis_pool->@*);
    return await $f;
}

=head2 return_instance_to_pool

This puts back a redis connection into Redis pool, so it can be used by other called.
It should be called at the end of every usage, as on_ready.

It should also be possible with a try/finally combination..
but that's currently failing with the $redis_pool slot not being defined.

Takes the following parameters:

=over 4

=item * C<$instance> - Redis connection to be returned.

=back

=cut

method return_instance_to_pool ($instance) {
    if( my $waiting_redis = shift $waiting_redis_pool->@*) {
        $waiting_redis->done($instance)
    } else {
        push $redis_pool->@*, $instance;
        $log->tracef('Returning instance to pool, Redis used/available now %d/%d', $pending_redis_count, 0 + $redis_pool->@*);
        $pending_redis_count--;
    }
    return;
}

=head2 redis

Resolves to a new L<Net::Async::Redis> or L<Net::Async::Redis::Cluster>
instance, depending on the setting of C<$use_cluster>.

=cut

async method redis () {
    my $instance;
    if($use_cluster) {
        $instance = Net::Async::Redis::Cluster->new(
            client_side_cache_size => $clientside_cache_size,
        );
        $self->add_child(



( run in 0.740 second using v1.01-cache-2.11-cpan-5b529ec07f3 )