Async-Redis
view release on metacpan or search on metacpan
lib/Async/Redis.pm view on Meta::CPAN
}
elsif ($subscription->can('_fail_fatal')) {
$subscription->_fail_fatal($typed_error);
}
else {
# Pre-Phase-2 fallback: existing _close method.
$subscription->_close if $subscription->can('_close');
}
}
# 8. on_disconnect: only if we were publicly connected.
if ($was_connected && $self->{on_disconnect}) {
$self->{on_disconnect}->($self, "$typed_error");
}
1;
};
my $caught = $@;
# Always clear the guard, even if a callback died.
$self->{_fatal_in_progress} = 0;
die $caught if !$ok && $caught;
}
# Decode Protocol::Redis response to Perl value
sub _decode_response {
my ($self, $msg) = @_;
return undef unless $msg;
my $type = $msg->{type};
my $data = $msg->{data};
# Simple string (+)
if ($type eq '+') {
return $data;
}
# Error (-)
elsif ($type eq '-') {
die Async::Redis::Error::Redis->from_message($data);
}
# Integer (:)
elsif ($type eq ':') {
return 0 + $data;
}
# Bulk string ($)
elsif ($type eq '$') {
return $data; # undef for null bulk
}
# Array (*)
elsif ($type eq '*') {
return undef unless defined $data; # null array
return [ map { $self->_decode_response($_) } @$data ];
}
return $data;
}
# Non-throwing decoder used by the unified reader. Classifies each frame
# as one of:
# ('ok', $decoded_value) - normal response
# ('redis_error', $error_object) - -ERR frame from Redis
# ('protocol_error', $error_object) - fatal desync (malformed)
sub _decode_response_result {
my ($self, $msg) = @_;
if (!defined $msg) {
return ('protocol_error', Async::Redis::Error::Protocol->new(
message => 'undef message from parser',
));
}
my $type = $msg->{type} // '';
my $data = $msg->{data};
if ($type eq '+') {
return ('ok', $data);
}
elsif ($type eq '-') {
return ('redis_error', Async::Redis::Error::Redis->from_message($data));
}
elsif ($type eq ':') {
return ('ok', 0 + ($data // 0));
}
elsif ($type eq '$') {
return ('ok', $data);
}
elsif ($type eq '*') {
return ('ok', undef) if !defined $data; # nil array
my @out;
for my $child (@$data) {
my ($k, $v) = $self->_decode_response_result($child);
if ($k eq 'protocol_error') {
return ($k, $v); # propagate fatal
}
push @out, $v;
}
return ('ok', \@out);
}
else {
return ('protocol_error', Async::Redis::Error::Protocol->new(
message => "unknown frame type: $type",
));
}
}
# ============================================================================
# Convenience Commands
# ============================================================================
async sub ping {
my ($self) = @_;
return await $self->command('PING');
}
async sub set {
my ($self, $key, $value, %opts) = @_;
my @cmd = ('SET', $key, $value);
push @cmd, 'EX', $opts{ex} if exists $opts{ex};
push @cmd, 'PX', $opts{px} if exists $opts{px};
push @cmd, 'NX' if $opts{nx};
( run in 0.454 second using v1.01-cache-2.11-cpan-bbb979687b5 )