Async-Redis

 view release on metacpan or  search on metacpan

lib/Async/Redis.pm  view on Meta::CPAN

            }
            elsif ($subscription->can('_fail_fatal')) {
                $subscription->_fail_fatal($typed_error);
            }
            else {
                # Pre-Phase-2 fallback: existing _close method.
                $subscription->_close if $subscription->can('_close');
            }
        }

        # 8. on_disconnect: only if we were publicly connected.
        if ($was_connected && $self->{on_disconnect}) {
            $self->{on_disconnect}->($self, "$typed_error");
        }

        1;
    };

    my $caught = $@;
    # Always clear the guard, even if a callback died.
    $self->{_fatal_in_progress} = 0;
    die $caught if !$ok && $caught;
}

# Decode Protocol::Redis response to Perl value
sub _decode_response {
    my ($self, $msg) = @_;

    return undef unless $msg;

    my $type = $msg->{type};
    my $data = $msg->{data};

    # Simple string (+)
    if ($type eq '+') {
        return $data;
    }
    # Error (-)
    elsif ($type eq '-') {
        die Async::Redis::Error::Redis->from_message($data);
    }
    # Integer (:)
    elsif ($type eq ':') {
        return 0 + $data;
    }
    # Bulk string ($)
    elsif ($type eq '$') {
        return $data;  # undef for null bulk
    }
    # Array (*)
    elsif ($type eq '*') {
        return undef unless defined $data;  # null array
        return [ map { $self->_decode_response($_) } @$data ];
    }

    return $data;
}

# Non-throwing decoder used by the unified reader. Classifies each frame
# as one of:
#   ('ok',             $decoded_value)      - normal response
#   ('redis_error',    $error_object)       - -ERR frame from Redis
#   ('protocol_error', $error_object)       - fatal desync (malformed)
sub _decode_response_result {
    my ($self, $msg) = @_;

    if (!defined $msg) {
        return ('protocol_error', Async::Redis::Error::Protocol->new(
            message => 'undef message from parser',
        ));
    }

    my $type = $msg->{type} // '';
    my $data = $msg->{data};

    if ($type eq '+') {
        return ('ok', $data);
    }
    elsif ($type eq '-') {
        return ('redis_error', Async::Redis::Error::Redis->from_message($data));
    }
    elsif ($type eq ':') {
        return ('ok', 0 + ($data // 0));
    }
    elsif ($type eq '$') {
        return ('ok', $data);
    }
    elsif ($type eq '*') {
        return ('ok', undef) if !defined $data;   # nil array
        my @out;
        for my $child (@$data) {
            my ($k, $v) = $self->_decode_response_result($child);
            if ($k eq 'protocol_error') {
                return ($k, $v);   # propagate fatal
            }
            push @out, $v;
        }
        return ('ok', \@out);
    }
    else {
        return ('protocol_error', Async::Redis::Error::Protocol->new(
            message => "unknown frame type: $type",
        ));
    }
}

# ============================================================================
# Convenience Commands
# ============================================================================

async sub ping {
    my ($self) = @_;
    return await $self->command('PING');
}

async sub set {
    my ($self, $key, $value, %opts) = @_;
    my @cmd = ('SET', $key, $value);
    push @cmd, 'EX', $opts{ex} if exists $opts{ex};
    push @cmd, 'PX', $opts{px} if exists $opts{px};
    push @cmd, 'NX' if $opts{nx};



( run in 0.454 second using v1.01-cache-2.11-cpan-bbb979687b5 )