Async-Redis

 view release on metacpan or  search on metacpan

Changes  view on Meta::CPAN

        - Pipeline results contain error objects at failed slot positions
    - Bug Fix: Pool max connection race condition
        - Concurrent acquire() calls could exceed max connections
        - Added _creating counter to track in-flight connection creations
    - Dependency Change: Future::IO bumped to 0.23 (was 0.19)
        - Future::IO 0.22+ broke load_impl('IOAsync') by adding a poll
          method check that IO::Async's impl doesn't satisfy
        - Removed IO::Async as a test dependency entirely
        - Tests now use Future::IO's built-in poll-based default impl
        - Test suite is fully event-loop agnostic
    - Examples: pagi-chat stats timer uses Future::IO instead of IO::Async

0.001005  2026-03-15
    - Feature: Unix domain socket connections
        - Connect via path parameter or redis+unix:// URI scheme
        - Constructor stores path and skips host/port for unix sockets
        - Added docker-compose redis-unix service for testing
    - Feature: PubSub auto-resubscribe on reconnect
        - Subscriptions automatically re-established after connection drop
        - on_reconnect callback on Subscription object for notification
        - _read_pubsub_frame checks connected state to fail fast

examples/pagi-chat/lib/ChatApp/State.pm  view on Meta::CPAN


    # Separate connection for PubSub
    my $pubsub_redis = Async::Redis->new(host => $host, port => $port);
    await $pubsub_redis->connect;
    $pubsub = await $pubsub_redis->subscribe(BROADCAST_CHANNEL);

    # Initialize background task selector and start the runner
    $background_selector = Future::Selector->new;
    _start_selector_runner();

    # Start periodic stats timer (every 10 seconds)
    _start_stats_timer();

    # Initialize default rooms
    await add_room('general', 'system');
    await add_room('random', 'system');
    await add_room('help', 'system');

    print STDERR "Worker $$: Redis state initialized\n";
    return $redis;
}

sub get_redis { $redis }
sub get_pubsub { $pubsub }

# Background selector runner
my $selector_runner_future;

# Periodic stats timer
my $stats_timer;

# Start the selector with the PubSub listener as the main long-running task
sub _start_selector_runner {
    # Add the broadcast listener as a long-running task
    # Use gen => to provide a generator that creates the future
    # (f => expects a completed future, gen => expects a coderef)
    $background_selector->add(
        data => 'pubsub-listener',
        gen  => sub { _broadcast_listener() },
    );

    # Run the selector in the background
    $selector_runner_future = $background_selector->run->on_fail(sub {
        my ($err) = @_;
        warn "[selector] Runner failed: $err";
    })->retain;
}

# Start periodic stats timer using Future::IO (event-loop agnostic)
sub _start_stats_timer {
    $stats_timer = (async sub {
        while (1) {
            await Future::IO->sleep(10);
            next unless %local_sessions;  # Skip if no clients
            eval {
                my $stats = await get_stats();
                my $msg = $JSON->encode({
                    global  => 1,
                    payload => {
                        type         => 'stats',
                        users_online => $stats->{users_online},

examples/pagi-chat/public/js/app.js  view on Meta::CPAN

            console.log(`Reconnecting in ${Math.round(delay)}ms (attempt ${state.reconnectAttempts})...`);
            setConnectionStatus('reconnecting');
            setTimeout(connectWebSocket, delay);
        };

        state.ws.onerror = (error) => {
            console.error('WebSocket error:', error);
        };

        state.ws.onmessage = (event) => {
            // Any message resets the heartbeat timer
            state.lastPongTime = Date.now();

            try {
                const data = JSON.parse(event.data);
                handleWebSocketMessage(data);
            } catch (e) {
                console.error('Failed to parse message:', e);
            }
        };
    }



( run in 0.835 second using v1.01-cache-2.11-cpan-adec679a428 )