Async-Redis

 view release on metacpan or  search on metacpan

lib/Async/Redis/Subscription.pm  view on Meta::CPAN

            channel => $frame->[2],
            data    => $frame->[3],
        };
    }
    elsif ($type eq 'smessage') {
        $msg = {
            type    => 'smessage',
            channel => $frame->[1],
            pattern => undef,
            data    => $frame->[2],
        };
    }
    else {
        return undef;   # non-message frame (subscribe confirmation, etc.)
    }

    # Queue the message for consumption by next() (iterator mode) or the
    # callback driver loop (callback mode). The driver invokes _on_message;
    # _dispatch_frame is intentionally agnostic about the consumption mode.
    # This keeps backpressure uniform: the depth limit applies to both modes.
    return if $self->{_closed};

    my $redis = $self->{redis};
    my $depth = ($redis && $redis->{message_queue_depth})
        ? $redis->{message_queue_depth}
        : 0;  # 0 = unbounded (default)

    if ($depth && scalar(@{$self->{_pending_messages}}) >= $depth) {
        # Queue full. Return a Future that queues the message once a slot
        # opens (signalled by next() calling _slot_waiter->done).
        $self->{_slot_waiter} //= Future->new;
        my $slot = $self->{_slot_waiter};
        weaken(my $weak = $self);
        return $slot->then(sub {
            return Future->done if !$weak || $weak->{_closed};
            push @{$weak->{_pending_messages}}, $msg;
            if (my $w = delete $weak->{_message_waiter}) {
                $w->done unless $w->is_ready;
            }
            Future->done;
        });
    }

    push @{$self->{_pending_messages}}, $msg;
    if (my $w = delete $self->{_message_waiter}) {
        $w->done unless $w->is_ready;
    }
    return undef;
}

# The callback-mode driver loop. Consumes from _pending_messages via
# _dequeue (populated by _run_reader's dispatch path), invokes the
# user's _on_message callback, and awaits its returned Future if any
# for consumer-opted backpressure.
#
# Exits cleanly when _dequeue returns undef (subscription closed or
# paused for reconnect). Dies with the typed error if _dequeue dies
# (fatal); _run_driver's Future failure is visible through the
# client's Future::Selector to any caller using run_until_ready.
#
# Periodic sleep(0) yield every MAX_SYNC_DEPTH iterations prevents
# stack growth when messages are pre-queued and await returns
# synchronously from an already-ready Future.
async sub _run_driver {
    my ($self) = @_;
    my $iter = 0;
    while (!$self->{_closed} && !$self->{_paused}) {
        my $msg;
        my $deq_ok = eval { $msg = await $self->_dequeue(1); 1 };
        unless ($deq_ok) {
            my $err = $@;
            # _fail_fatal already set _closed and fired on_error; don't
            # double-fire. Any other propagation path routes through
            # _handle_fatal_error.
            return if $self->{_closed} || $self->{_paused};
            $self->_handle_fatal_error($err);
            return;
        }
        last unless defined $msg;
        last if $self->{_closed} || $self->{_paused};

        my $cb = $self->{_on_message} or last;
        my $result = $self->_invoke_user_callback($cb, $msg);

        if (blessed($result) && $result->isa('Future')) {
            my $cb_ok = eval { await $result; 1 };
            unless ($cb_ok) {
                my $err = $@;
                return if $self->{_closed} || $self->{_paused};
                $self->_handle_fatal_error(
                    "on_message callback Future failed: $err"
                );
                return;
            }
        }

        # Periodic yield prevents stack blowup when pre-queued messages
        # resolve await synchronously.
        await Future::IO->sleep(0) if ++$iter % MAX_SYNC_DEPTH == 0;
    }
}

# Start the driver if not already running. Idempotent.
# Only starts when _on_message is set (callback mode). Iterator mode
# consumers call next() directly — no driver loop needed.
#
# Ownership: the driver Future is added to the client's Future::Selector
# ($redis->{_tasks}) and stored in $self->{_driver_step}. The selector
# owns the task; the slot is the dedup signal. on_ready clears the slot
# regardless of outcome. No ->retain.
sub _start_driver {
    my ($self, $force) = @_;
    return if $self->{_driver_step} && !$self->{_driver_step}->is_ready;
    return unless $self->{_on_message};   # only callback mode needs a driver
    return if $self->{_closed};
    return if $self->{_paused};
    return unless $self->channel_count > 0;

    my $redis = $self->{redis} or return;

    my $f = $self->_run_driver;



( run in 1.583 second using v1.01-cache-2.11-cpan-71847e10f99 )