EV-Kafka
view release on metacpan or search on metacpan
src/EV__Kafka.xs view on Meta::CPAN
#endif
else {
conn_emit_error(aTHX_ self, "unsupported SASL mechanism");
kf_buf_free(&body);
return;
}
self->state = CONN_SASL_AUTH;
{
int16_t ver = self->api_versions[API_SASL_AUTHENTICATE];
if (ver < 0) ver = 1;
if (ver > 2) ver = 2;
conn_send_request(aTHX_ self, API_SASL_AUTHENTICATE, ver, &body, NULL, 1, 0);
}
kf_buf_free(&body);
}
static void conn_parse_sasl_authenticate_response(pTHX_ ev_kafka_conn_t *self,
const char *data, size_t len)
{
const char *p = data;
const char *end = data + len;
if (end - p < 2) goto err;
int16_t error_code = kf_read_i16(p); p += 2;
const char *errmsg_str = NULL;
int16_t errmsg_len = 0;
{
int n = kf_read_string(p, end, &errmsg_str, &errmsg_len);
if (n < 0) goto err;
p += n;
}
/* auth_bytes */
const char *auth_data = NULL;
int32_t auth_data_len = 0;
if (end - p >= 4) {
auth_data_len = kf_read_i32(p); p += 4;
if (auth_data_len > 0 && end - p >= auth_data_len) {
auth_data = p;
p += auth_data_len;
}
}
if (error_code != 0) {
char errbuf[512];
if (errmsg_str && errmsg_len > 0)
snprintf(errbuf, sizeof(errbuf), "SASL auth failed: %.*s", (int)errmsg_len, errmsg_str);
else
snprintf(errbuf, sizeof(errbuf), "SASL auth failed: error %d", error_code);
conn_emit_error(aTHX_ self, errbuf);
if (conn_check_destroyed(self)) return;
conn_handle_disconnect(aTHX_ self, "SASL auth failed");
return;
}
#ifdef HAVE_OPENSSL
/* SCRAM multi-step handling */
if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FIRST && auth_data) {
/* Server-first-message: r=<nonce>,s=<salt>,i=<iterations> */
/* Parse server response, compute proof, send client-final */
const char *server_nonce = NULL;
size_t server_nonce_len = 0;
const char *salt_b64 = NULL;
size_t salt_b64_len = 0;
int iterations = 0;
{
const char *sp = auth_data;
const char *se = auth_data + auth_data_len;
while (sp < se) {
if (sp + 2 <= se && sp[0] == 'r' && sp[1] == '=') {
sp += 2; server_nonce = sp;
while (sp < se && *sp != ',') sp++;
server_nonce_len = sp - server_nonce;
} else if (sp + 2 <= se && sp[0] == 's' && sp[1] == '=') {
sp += 2; salt_b64 = sp;
while (sp < se && *sp != ',') sp++;
salt_b64_len = sp - salt_b64;
} else if (sp + 2 <= se && sp[0] == 'i' && sp[1] == '=') {
sp += 2;
iterations = atoi(sp);
while (sp < se && *sp != ',') sp++;
}
if (sp < se && *sp == ',') sp++;
else sp++;
}
}
if (!server_nonce || !salt_b64 || iterations <= 0) {
conn_emit_error(aTHX_ self, "SCRAM: malformed server-first-message");
if (conn_check_destroyed(self)) return;
conn_handle_disconnect(aTHX_ self, "SCRAM auth failed");
return;
}
/* RFC 5802: server nonce must start with client nonce */
if (server_nonce_len < 32 ||
memcmp(server_nonce, self->scram_nonce, 32) != 0) {
conn_emit_error(aTHX_ self, "SCRAM: server nonce mismatch");
if (conn_check_destroyed(self)) return;
conn_handle_disconnect(aTHX_ self, "SCRAM auth failed");
return;
}
int is_sha512 = (strcmp(self->sasl_mechanism, "SCRAM-SHA-512") == 0);
const EVP_MD *md = is_sha512 ? EVP_sha512() : EVP_sha256();
int digest_len = is_sha512 ? 64 : 32;
/* Decode salt from base64 */
unsigned char salt[128];
int salt_len;
{
BIO *b64 = BIO_new(BIO_f_base64());
BIO *bmem = BIO_new_mem_buf(salt_b64, (int)salt_b64_len);
bmem = BIO_push(b64, bmem);
BIO_set_flags(bmem, BIO_FLAGS_BASE64_NO_NL);
salt_len = BIO_read(bmem, salt, sizeof(salt));
BIO_free_all(bmem);
if (salt_len <= 0) {
conn_emit_error(aTHX_ self, "SCRAM: bad salt");
if (conn_check_destroyed(self)) return;
conn_handle_disconnect(aTHX_ self, "SCRAM auth failed");
return;
}
}
/* SaltedPassword = Hi(password, salt, iterations) using PBKDF2 */
unsigned char salted_password[64];
PKCS5_PBKDF2_HMAC(self->sasl_password, strlen(self->sasl_password),
salt, salt_len, iterations, md, digest_len, salted_password);
/* ClientKey = HMAC(SaltedPassword, "Client Key") */
unsigned char client_key[64];
unsigned int ck_len = digest_len;
HMAC(md, salted_password, digest_len,
(unsigned char *)"Client Key", 10, client_key, &ck_len);
/* ServerKey = HMAC(SaltedPassword, "Server Key") â saved for
* server-final-message verification. */
unsigned int sk_hmac_len = digest_len;
HMAC(md, salted_password, digest_len,
(unsigned char *)"Server Key", 10,
self->scram_server_key, &sk_hmac_len);
self->scram_server_key_len = (int)sk_hmac_len;
/* StoredKey = H(ClientKey) */
unsigned char stored_key[64];
{
EVP_MD_CTX *ctx = EVP_MD_CTX_new();
unsigned int sk_len;
EVP_DigestInit_ex(ctx, md, NULL);
EVP_DigestUpdate(ctx, client_key, digest_len);
EVP_DigestFinal_ex(ctx, stored_key, &sk_len);
EVP_MD_CTX_free(ctx);
}
/* AuthMessage = client-first-bare + "," + server-first + "," + client-final-without-proof */
char channel_binding_b64[] = "biws"; /* base64("n,,") */
kf_buf_t auth_msg;
kf_buf_init(&auth_msg);
kf_buf_append(&auth_msg, self->scram_client_first, self->scram_client_first_len);
kf_buf_append(&auth_msg, ",", 1);
kf_buf_append(&auth_msg, auth_data, auth_data_len);
kf_buf_append(&auth_msg, ",c=", 3);
kf_buf_append(&auth_msg, channel_binding_b64, 4);
kf_buf_append(&auth_msg, ",r=", 3);
kf_buf_append(&auth_msg, server_nonce, server_nonce_len);
/* ClientSignature = HMAC(StoredKey, AuthMessage) */
unsigned char client_sig[64];
unsigned int cs_len = digest_len;
HMAC(md, stored_key, digest_len,
(unsigned char *)auth_msg.data, auth_msg.len, client_sig, &cs_len);
/* ClientProof = ClientKey XOR ClientSignature */
unsigned char proof[64];
int di;
for (di = 0; di < digest_len; di++)
proof[di] = client_key[di] ^ client_sig[di];
/* Save AuthMessage for server-signature verification at step 2. */
if (self->scram_auth_message) Safefree(self->scram_auth_message);
self->scram_auth_message = savepvn(auth_msg.data, auth_msg.len);
self->scram_auth_message_len = auth_msg.len;
kf_buf_free(&auth_msg);
/* Base64 encode proof */
char proof_b64[256];
{
( run in 1.471 second using v1.01-cache-2.11-cpan-71847e10f99 )