Async-Redis
view release on metacpan or search on metacpan
lib/Async/Redis/AutoPipeline.pm view on Meta::CPAN
$self->_send_batch(\@batch);
}
$self->{_flushing} = 0;
}
# Detach and return all queued-but-not-yet-flushed commands. Caller is
# responsible for failing their futures. Called by Async::Redis::_reader_fatal
# when the connection dies before a scheduled flush.
sub _detach_queued {
my ($self) = @_;
my $queued = $self->{_queue} // [];
$self->{_queue} = [];
$self->{_flush_pending} = 0;
return $queued;
}
sub _send_batch {
my ($self, $batch) = @_;
my $redis = $self->{redis};
my @commands = map { $_->{cmd} } @$batch;
my @futures = map { $_->{future} } @$batch;
my $submit = (async sub {
my $buffer = '';
my @deadlines;
for my $cmd (@commands) {
$buffer .= $redis->_build_command(@$cmd);
push @deadlines, $redis->_calculate_deadline(@$cmd);
}
await $redis->_with_write_gate(sub {
return (async sub {
if (!$redis->{_socket_live}) {
if ($redis->_reconnect_enabled) {
await $redis->_ensure_connected;
} else {
die Async::Redis::Error::Disconnected->new(
message => "Not connected",
);
}
}
for my $i (0 .. $#commands) {
$redis->_add_inflight(
$futures[$i],
$commands[$i][0],
[ @{$commands[$i]}[1..$#{$commands[$i]}] ],
$deadlines[$i],
'fail',
);
}
await $redis->_send($buffer);
})->();
});
$redis->_ensure_reader;
})->();
# Transport failure on submit cascades to every future that wasn't
# already failed by _reader_fatal.
$submit->on_fail(sub {
my ($err) = @_;
for my $f (@futures) {
$f->fail($err) unless $f->is_ready;
}
});
# Ownership: the client's Future::Selector (_tasks) owns this submit
# task. Any caller currently awaiting inside run_until_ready sees a
# submit failure propagated via the selector. No ->retain needed.
$redis->{_tasks}->add(data => 'autopipe-submit', f => $submit);
}
1;
__END__
=head1 NAME
Async::Redis::AutoPipeline - Automatic command batching
=head1 DESCRIPTION
AutoPipeline transparently batches Redis commands issued in the same
event loop tick into a single pipeline, reducing network round-trips
without changing the caller's API.
=head2 How It Works
# These three commands are batched automatically
my $f1 = $redis->set('a', 1);
my $f2 = $redis->set('b', 2);
my $f3 = $redis->get('a');
await Future->needs_all($f1, $f2, $f3);
When C<auto_pipeline =E<gt> 1>:
=over 4
=item 1.
Commands queue locally instead of sending immediately.
=item 2.
A next-tick flush is scheduled with C<< Future::IO->sleep(0) >>.
=item 3.
When the event loop yields, queued commands flush as a pipeline.
=item 4.
Responses are distributed to the original command futures.
=back
=head2 Invariants
( run in 2.431 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )