Async-Redis

 view release on metacpan or  search on metacpan

lib/Async/Redis/AutoPipeline.pm  view on Meta::CPAN


        $self->_send_batch(\@batch);
    }

    $self->{_flushing} = 0;
}

# Detach and return all queued-but-not-yet-flushed commands. Caller is
# responsible for failing their futures. Called by Async::Redis::_reader_fatal
# when the connection dies before a scheduled flush.
sub _detach_queued {
    my ($self) = @_;
    my $queued = $self->{_queue} // [];
    $self->{_queue} = [];
    $self->{_flush_pending} = 0;
    return $queued;
}

sub _send_batch {
    my ($self, $batch) = @_;
    my $redis = $self->{redis};

    my @commands = map { $_->{cmd}    } @$batch;
    my @futures  = map { $_->{future} } @$batch;

    my $submit = (async sub {
        my $buffer = '';
        my @deadlines;
        for my $cmd (@commands) {
            $buffer .= $redis->_build_command(@$cmd);
            push @deadlines, $redis->_calculate_deadline(@$cmd);
        }

        await $redis->_with_write_gate(sub {
            return (async sub {
                if (!$redis->{_socket_live}) {
                    if ($redis->_reconnect_enabled) {
                        await $redis->_ensure_connected;
                    } else {
                        die Async::Redis::Error::Disconnected->new(
                            message => "Not connected",
                        );
                    }
                }
                for my $i (0 .. $#commands) {
                    $redis->_add_inflight(
                        $futures[$i],
                        $commands[$i][0],
                        [ @{$commands[$i]}[1..$#{$commands[$i]}] ],
                        $deadlines[$i],
                        'fail',
                    );
                }
                await $redis->_send($buffer);
            })->();
        });

        $redis->_ensure_reader;
    })->();

    # Transport failure on submit cascades to every future that wasn't
    # already failed by _reader_fatal.
    $submit->on_fail(sub {
        my ($err) = @_;
        for my $f (@futures) {
            $f->fail($err) unless $f->is_ready;
        }
    });

    # Ownership: the client's Future::Selector (_tasks) owns this submit
    # task. Any caller currently awaiting inside run_until_ready sees a
    # submit failure propagated via the selector. No ->retain needed.
    $redis->{_tasks}->add(data => 'autopipe-submit', f => $submit);
}

1;

__END__

=head1 NAME

Async::Redis::AutoPipeline - Automatic command batching

=head1 DESCRIPTION

AutoPipeline transparently batches Redis commands issued in the same
event loop tick into a single pipeline, reducing network round-trips
without changing the caller's API.

=head2 How It Works

    # These three commands are batched automatically
    my $f1 = $redis->set('a', 1);
    my $f2 = $redis->set('b', 2);
    my $f3 = $redis->get('a');

    await Future->needs_all($f1, $f2, $f3);

When C<auto_pipeline =E<gt> 1>:

=over 4

=item 1.

Commands queue locally instead of sending immediately.

=item 2.

A next-tick flush is scheduled with C<< Future::IO->sleep(0) >>.

=item 3.

When the event loop yields, queued commands flush as a pipeline.

=item 4.

Responses are distributed to the original command futures.

=back

=head2 Invariants



( run in 2.431 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )