EV-ClickHouse
view release on metacpan or search on metacpan
}
/* in progress â wait for writability */
ev_io_init(&self->rio, io_cb, self->fd, EV_READ);
self->rio.data = (void *)self;
ev_io_init(&self->wio, io_cb, self->fd, EV_WRITE);
self->wio.data = (void *)self;
start_writing(self);
if (self->connect_timeout > 0) {
ev_timer_set(&self->timer, (ev_tstamp)self->connect_timeout, 0.0);
ev_timer_start(self->loop, &self->timer);
self->timing = 1;
}
}
/* Mark the connection as ready, fire on_connect, dispatch queued queries.
* Returns 1 if self was freed. */
static int finish_connect(ev_clickhouse_t *self) {
stop_timing(self);
self->connected = 1;
CLEAR_STR(self->last_tls_error); /* successful connect supersedes stale TLS error */
if (self->on_connect &&
fire_zero_arg_cb(self, self->on_connect, "connect")) return 1;
if (!ngx_queue_empty(&self->send_queue))
return pipeline_advance(self);
return 0;
}
static void on_connect_done(ev_clickhouse_t *self) {
self->connecting = 0;
self->reconnect_attempts = 0;
stop_writing(self);
/* Keep the connect_timeout timer armed across the TLS handshake and
* native ServerHello phases â finish_connect() stops it once the
* connection is fully ready to accept queries. */
#ifdef HAVE_OPENSSL
if (self->tls_enabled) {
int ret;
self->ssl_ctx = SSL_CTX_new(TLS_client_method());
if (!self->ssl_ctx) {
fail_connection(self, "SSL_CTX_new failed");
return;
}
SSL_CTX_set_default_verify_paths(self->ssl_ctx);
if (self->tls_skip_verify)
SSL_CTX_set_verify(self->ssl_ctx, SSL_VERIFY_NONE, NULL);
else
SSL_CTX_set_verify(self->ssl_ctx, SSL_VERIFY_PEER, NULL);
if (self->tls_ca_file) {
if (SSL_CTX_load_verify_locations(self->ssl_ctx, self->tls_ca_file, NULL) != 1) {
capture_tls_error(self);
fail_connection(self, "SSL_CTX_load_verify_locations failed");
return;
}
}
/* Mutual TLS: load client certificate + private key when both
* are configured. SSL_CTX_check_private_key verifies that the
* private key matches the loaded certificate's public half. */
if (self->tls_cert_file && self->tls_key_file) {
if (SSL_CTX_use_certificate_chain_file(self->ssl_ctx, self->tls_cert_file) != 1) {
capture_tls_error(self);
fail_connection(self, "SSL_CTX_use_certificate_chain_file failed");
return;
}
if (SSL_CTX_use_PrivateKey_file(self->ssl_ctx, self->tls_key_file, SSL_FILETYPE_PEM) != 1) {
capture_tls_error(self);
fail_connection(self, "SSL_CTX_use_PrivateKey_file failed");
return;
}
if (SSL_CTX_check_private_key(self->ssl_ctx) != 1) {
capture_tls_error(self);
fail_connection(self, "TLS client cert / private key mismatch");
return;
}
} else if (self->tls_cert_file || self->tls_key_file) {
fail_connection(self, "tls_cert_file and tls_key_file must both be set");
return;
}
self->ssl = SSL_new(self->ssl_ctx);
if (!self->ssl) {
capture_tls_error(self);
fail_connection(self, "SSL_new failed");
return;
}
SSL_set_fd(self->ssl, self->fd);
int host_is_ip = is_ip_literal(self->host);
/* SNI must not be sent for IP address literals (RFC 6066 s3) */
if (!host_is_ip)
SSL_set_tlsext_host_name(self->ssl, self->host);
/* Verify server certificate matches hostname or IP */
if (!self->tls_skip_verify) {
X509_VERIFY_PARAM *param = SSL_get0_param(self->ssl);
X509_VERIFY_PARAM_set_hostflags(param, X509_CHECK_FLAG_NO_PARTIAL_WILDCARDS);
if (host_is_ip)
X509_VERIFY_PARAM_set1_ip_asc(param, self->host);
else
X509_VERIFY_PARAM_set1_host(param, self->host, 0);
}
ret = SSL_connect(self->ssl);
if (ret == 1) {
/* handshake done immediately */
goto handshake_done;
} else {
int err = SSL_get_error(self->ssl, ret);
if (err == SSL_ERROR_WANT_READ) {
start_reading(self);
} else if (err == SSL_ERROR_WANT_WRITE) {
start_writing(self);
} else {
capture_tls_error(self);
fail_connection(self, "SSL_connect failed");
return;
}
/* continue TLS handshake in io_cb */
return;
}
}
handshake_done:
#endif
if (self->protocol == PROTO_NATIVE) {
/* Send ClientHello and wait for ServerHello */
size_t hello_len;
char *hello = build_native_hello(self, &hello_len);
send_replace(self, hello, hello_len);
( run in 0.312 second using v1.01-cache-2.11-cpan-13bb782fe5a )