Async-Redis

 view release on metacpan or  search on metacpan

examples/pagi-chat/lib/ChatApp/State.pm  view on Meta::CPAN

#

use strict;
use warnings;
use Future;
use Future::AsyncAwait;
use Future::Selector;
use Exporter 'import';
use JSON::MaybeXS;
use Time::HiRes qw(time);
use Scalar::Util qw(weaken);
use Future::IO;

our @EXPORT_OK = qw(
    init_redis get_redis get_pubsub
    get_session create_session update_session remove_session
    get_session_by_name set_session_connected set_session_disconnected
    is_session_connected
    get_room add_room remove_room get_all_rooms
    add_user_to_room remove_user_from_room get_room_users
    add_message get_room_messages

lib/Async/Redis.pm  view on Meta::CPAN

use strict;
use warnings;
use 5.018;

our $VERSION = '0.002000';

use Future;
use Future::AsyncAwait;
use Future::IO 0.23;
use Future::Selector 0.05;
use Scalar::Util qw(blessed weaken);
use Socket qw(pack_sockaddr_in pack_sockaddr_un inet_aton AF_INET AF_UNIX SOCK_STREAM);
use IO::Handle ();
use IO::Socket::INET;
use Time::HiRes ();

# Error classes
use Async::Redis::Error::Connection;
use Async::Redis::Error::Timeout;
use Async::Redis::Error::Disconnected;
use Async::Redis::Error::Redis;

lib/Async/Redis.pm  view on Meta::CPAN

# Fires _resume_after_reconnect on the subscription on success, or
# _fail_fatal on unrecoverable reconnect failure.
sub _reconnect_async {
    my ($self, $sub) = @_;

    # Dedup against any reconnect already in progress (from either this
    # path or _ensure_connected). The slot is the shared signal.
    return if $self->{_reconnect_future}
        && !$self->{_reconnect_future}->is_ready;

    weaken(my $weak_self = $self);
    weaken(my $weak_sub  = $sub);

    my $f = (async sub {
        # Reconnect the socket. _reconnect handles retry/backoff and
        # dies with Disconnected if reconnect_max_attempts is exhausted.
        await $weak_self->_reconnect;

        # Delegate the replay, on_reconnect, and driver-restart work to
        # the subscription's unified resume path. _resume_after_reconnect
        # handles clearing _paused, setting in_pubsub, replaying all
        # tracked channels/patterns, firing on_reconnect, and starting

lib/Async/Redis/Subscription.pm  view on Meta::CPAN

package Async::Redis::Subscription;

use strict;
use warnings;
use 5.018;

use Carp ();
use Future;
use Future::AsyncAwait;
use Future::IO;
use Scalar::Util qw(blessed refaddr weaken);


# Threshold for periodic event-loop yield inside the callback driver
# loop. Prevents stack growth when many messages are pre-queued and
# await on an already-ready Future returns synchronously.
use constant MAX_SYNC_DEPTH => 32;

sub new {
    my ($class, %args) = @_;

lib/Async/Redis/Subscription.pm  view on Meta::CPAN

    my $redis = $self->{redis};
    my $depth = ($redis && $redis->{message_queue_depth})
        ? $redis->{message_queue_depth}
        : 0;  # 0 = unbounded (default)

    if ($depth && scalar(@{$self->{_pending_messages}}) >= $depth) {
        # Queue full. Return a Future that queues the message once a slot
        # opens (signalled by next() calling _slot_waiter->done).
        $self->{_slot_waiter} //= Future->new;
        my $slot = $self->{_slot_waiter};
        weaken(my $weak = $self);
        return $slot->then(sub {
            return Future->done if !$weak || $weak->{_closed};
            push @{$weak->{_pending_messages}}, $msg;
            if (my $w = delete $weak->{_message_waiter}) {
                $w->done unless $w->is_ready;
            }
            Future->done;
        });
    }



( run in 2.687 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )