EV-Kafka

 view release on metacpan or  search on metacpan

src/EV__Kafka.xs  view on Meta::CPAN

#endif
    else {
        conn_emit_error(aTHX_ self, "unsupported SASL mechanism");
        kf_buf_free(&body);
        return;
    }

    self->state = CONN_SASL_AUTH;
    {
        int16_t ver = self->api_versions[API_SASL_AUTHENTICATE];
        if (ver < 0) ver = 1;
        if (ver > 2) ver = 2;
        conn_send_request(aTHX_ self, API_SASL_AUTHENTICATE, ver, &body, NULL, 1, 0);
    }
    kf_buf_free(&body);
}

static void conn_parse_sasl_authenticate_response(pTHX_ ev_kafka_conn_t *self,
    const char *data, size_t len)
{
    const char *p = data;
    const char *end = data + len;

    if (end - p < 2) goto err;
    int16_t error_code = kf_read_i16(p); p += 2;

    const char *errmsg_str = NULL;
    int16_t errmsg_len = 0;
    {
        int n = kf_read_string(p, end, &errmsg_str, &errmsg_len);
        if (n < 0) goto err;
        p += n;
    }

    /* auth_bytes */
    const char *auth_data = NULL;
    int32_t auth_data_len = 0;
    if (end - p >= 4) {
        auth_data_len = kf_read_i32(p); p += 4;
        if (auth_data_len > 0 && end - p >= auth_data_len) {
            auth_data = p;
            p += auth_data_len;
        }
    }

    if (error_code != 0) {
        char errbuf[512];
        if (errmsg_str && errmsg_len > 0)
            snprintf(errbuf, sizeof(errbuf), "SASL auth failed: %.*s", (int)errmsg_len, errmsg_str);
        else
            snprintf(errbuf, sizeof(errbuf), "SASL auth failed: error %d", error_code);
        conn_emit_error(aTHX_ self, errbuf);
        if (conn_check_destroyed(self)) return;
        conn_handle_disconnect(aTHX_ self, "SASL auth failed");
        return;
    }

#ifdef HAVE_OPENSSL
    /* SCRAM multi-step handling */
    if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FIRST && auth_data) {
        /* Server-first-message: r=<nonce>,s=<salt>,i=<iterations> */
        /* Parse server response, compute proof, send client-final */
        const char *server_nonce = NULL;
        size_t server_nonce_len = 0;
        const char *salt_b64 = NULL;
        size_t salt_b64_len = 0;
        int iterations = 0;
        {
            const char *sp = auth_data;
            const char *se = auth_data + auth_data_len;
            while (sp < se) {
                if (sp + 2 <= se && sp[0] == 'r' && sp[1] == '=') {
                    sp += 2; server_nonce = sp;
                    while (sp < se && *sp != ',') sp++;
                    server_nonce_len = sp - server_nonce;
                } else if (sp + 2 <= se && sp[0] == 's' && sp[1] == '=') {
                    sp += 2; salt_b64 = sp;
                    while (sp < se && *sp != ',') sp++;
                    salt_b64_len = sp - salt_b64;
                } else if (sp + 2 <= se && sp[0] == 'i' && sp[1] == '=') {
                    sp += 2;
                    iterations = atoi(sp);
                    while (sp < se && *sp != ',') sp++;
                }
                if (sp < se && *sp == ',') sp++;
                else sp++;
            }
        }

        if (!server_nonce || !salt_b64 || iterations <= 0) {
            conn_emit_error(aTHX_ self, "SCRAM: malformed server-first-message");
            if (conn_check_destroyed(self)) return;
            conn_handle_disconnect(aTHX_ self, "SCRAM auth failed");
            return;
        }

        /* RFC 5802: server nonce must start with client nonce */
        if (server_nonce_len < 32 ||
            memcmp(server_nonce, self->scram_nonce, 32) != 0) {
            conn_emit_error(aTHX_ self, "SCRAM: server nonce mismatch");
            if (conn_check_destroyed(self)) return;
            conn_handle_disconnect(aTHX_ self, "SCRAM auth failed");
            return;
        }

        int is_sha512 = (strcmp(self->sasl_mechanism, "SCRAM-SHA-512") == 0);
        const EVP_MD *md = is_sha512 ? EVP_sha512() : EVP_sha256();
        int digest_len = is_sha512 ? 64 : 32;

        /* Decode salt from base64 */
        unsigned char salt[128];
        int salt_len;
        {
            BIO *b64 = BIO_new(BIO_f_base64());
            BIO *bmem = BIO_new_mem_buf(salt_b64, (int)salt_b64_len);
            bmem = BIO_push(b64, bmem);
            BIO_set_flags(bmem, BIO_FLAGS_BASE64_NO_NL);
            salt_len = BIO_read(bmem, salt, sizeof(salt));
            BIO_free_all(bmem);
            if (salt_len <= 0) {
                conn_emit_error(aTHX_ self, "SCRAM: bad salt");
                if (conn_check_destroyed(self)) return;
                conn_handle_disconnect(aTHX_ self, "SCRAM auth failed");
                return;
            }
        }

        /* SaltedPassword = Hi(password, salt, iterations) using PBKDF2 */
        unsigned char salted_password[64];
        PKCS5_PBKDF2_HMAC(self->sasl_password, strlen(self->sasl_password),
            salt, salt_len, iterations, md, digest_len, salted_password);

        /* ClientKey = HMAC(SaltedPassword, "Client Key") */
        unsigned char client_key[64];
        unsigned int ck_len = digest_len;
        HMAC(md, salted_password, digest_len,
            (unsigned char *)"Client Key", 10, client_key, &ck_len);

        /* ServerKey = HMAC(SaltedPassword, "Server Key") — saved for
         * server-final-message verification. */
        unsigned int sk_hmac_len = digest_len;
        HMAC(md, salted_password, digest_len,
            (unsigned char *)"Server Key", 10,
            self->scram_server_key, &sk_hmac_len);
        self->scram_server_key_len = (int)sk_hmac_len;

        /* StoredKey = H(ClientKey) */
        unsigned char stored_key[64];
        {
            EVP_MD_CTX *ctx = EVP_MD_CTX_new();
            unsigned int sk_len;
            EVP_DigestInit_ex(ctx, md, NULL);
            EVP_DigestUpdate(ctx, client_key, digest_len);
            EVP_DigestFinal_ex(ctx, stored_key, &sk_len);
            EVP_MD_CTX_free(ctx);
        }

        /* AuthMessage = client-first-bare + "," + server-first + "," + client-final-without-proof */
        char channel_binding_b64[] = "biws"; /* base64("n,,") */
        kf_buf_t auth_msg;
        kf_buf_init(&auth_msg);
        kf_buf_append(&auth_msg, self->scram_client_first, self->scram_client_first_len);
        kf_buf_append(&auth_msg, ",", 1);
        kf_buf_append(&auth_msg, auth_data, auth_data_len);
        kf_buf_append(&auth_msg, ",c=", 3);
        kf_buf_append(&auth_msg, channel_binding_b64, 4);
        kf_buf_append(&auth_msg, ",r=", 3);
        kf_buf_append(&auth_msg, server_nonce, server_nonce_len);

        /* ClientSignature = HMAC(StoredKey, AuthMessage) */
        unsigned char client_sig[64];
        unsigned int cs_len = digest_len;
        HMAC(md, stored_key, digest_len,
            (unsigned char *)auth_msg.data, auth_msg.len, client_sig, &cs_len);

        /* ClientProof = ClientKey XOR ClientSignature */
        unsigned char proof[64];
        int di;
        for (di = 0; di < digest_len; di++)
            proof[di] = client_key[di] ^ client_sig[di];

        /* Save AuthMessage for server-signature verification at step 2. */
        if (self->scram_auth_message) Safefree(self->scram_auth_message);
        self->scram_auth_message = savepvn(auth_msg.data, auth_msg.len);
        self->scram_auth_message_len = auth_msg.len;

        kf_buf_free(&auth_msg);

        /* Base64 encode proof */
        char proof_b64[256];
        {



( run in 1.471 second using v1.01-cache-2.11-cpan-71847e10f99 )