EV-ClickHouse

 view release on metacpan or  search on metacpan

xs/io.c  view on Meta::CPAN

    }

    /* in progress — wait for writability */
    ev_io_init(&self->rio, io_cb, self->fd, EV_READ);
    self->rio.data = (void *)self;
    ev_io_init(&self->wio, io_cb, self->fd, EV_WRITE);
    self->wio.data = (void *)self;

    start_writing(self);

    if (self->connect_timeout > 0) {
        ev_timer_set(&self->timer, (ev_tstamp)self->connect_timeout, 0.0);
        ev_timer_start(self->loop, &self->timer);
        self->timing = 1;
    }
}

/* Mark the connection as ready, fire on_connect, dispatch queued queries.
 * Returns 1 if self was freed. */
static int finish_connect(ev_clickhouse_t *self) {
    stop_timing(self);
    self->connected = 1;
    CLEAR_STR(self->last_tls_error);    /* successful connect supersedes stale TLS error */
    if (self->on_connect &&
        fire_zero_arg_cb(self, self->on_connect, "connect")) return 1;
    if (!ngx_queue_empty(&self->send_queue))
        return pipeline_advance(self);
    return 0;
}

static void on_connect_done(ev_clickhouse_t *self) {
    self->connecting = 0;
    self->reconnect_attempts = 0;

    stop_writing(self);
    /* Keep the connect_timeout timer armed across the TLS handshake and
     * native ServerHello phases — finish_connect() stops it once the
     * connection is fully ready to accept queries. */

#ifdef HAVE_OPENSSL
    if (self->tls_enabled) {
        int ret;
        self->ssl_ctx = SSL_CTX_new(TLS_client_method());
        if (!self->ssl_ctx) {
            fail_connection(self, "SSL_CTX_new failed");
            return;
        }
        SSL_CTX_set_default_verify_paths(self->ssl_ctx);
        if (self->tls_skip_verify)
            SSL_CTX_set_verify(self->ssl_ctx, SSL_VERIFY_NONE, NULL);
        else
            SSL_CTX_set_verify(self->ssl_ctx, SSL_VERIFY_PEER, NULL);
        if (self->tls_ca_file) {
            if (SSL_CTX_load_verify_locations(self->ssl_ctx, self->tls_ca_file, NULL) != 1) {
                capture_tls_error(self);
                fail_connection(self, "SSL_CTX_load_verify_locations failed");
                return;
            }
        }
        /* Mutual TLS: load client certificate + private key when both
         * are configured. SSL_CTX_check_private_key verifies that the
         * private key matches the loaded certificate's public half. */
        if (self->tls_cert_file && self->tls_key_file) {
            if (SSL_CTX_use_certificate_chain_file(self->ssl_ctx, self->tls_cert_file) != 1) {
                capture_tls_error(self);
                fail_connection(self, "SSL_CTX_use_certificate_chain_file failed");
                return;
            }
            if (SSL_CTX_use_PrivateKey_file(self->ssl_ctx, self->tls_key_file, SSL_FILETYPE_PEM) != 1) {
                capture_tls_error(self);
                fail_connection(self, "SSL_CTX_use_PrivateKey_file failed");
                return;
            }
            if (SSL_CTX_check_private_key(self->ssl_ctx) != 1) {
                capture_tls_error(self);
                fail_connection(self, "TLS client cert / private key mismatch");
                return;
            }
        } else if (self->tls_cert_file || self->tls_key_file) {
            fail_connection(self, "tls_cert_file and tls_key_file must both be set");
            return;
        }
        self->ssl = SSL_new(self->ssl_ctx);
        if (!self->ssl) {
            capture_tls_error(self);
            fail_connection(self, "SSL_new failed");
            return;
        }
        SSL_set_fd(self->ssl, self->fd);

        int host_is_ip = is_ip_literal(self->host);

        /* SNI must not be sent for IP address literals (RFC 6066 s3) */
        if (!host_is_ip)
            SSL_set_tlsext_host_name(self->ssl, self->host);

        /* Verify server certificate matches hostname or IP */
        if (!self->tls_skip_verify) {
            X509_VERIFY_PARAM *param = SSL_get0_param(self->ssl);
            X509_VERIFY_PARAM_set_hostflags(param, X509_CHECK_FLAG_NO_PARTIAL_WILDCARDS);
            if (host_is_ip)
                X509_VERIFY_PARAM_set1_ip_asc(param, self->host);
            else
                X509_VERIFY_PARAM_set1_host(param, self->host, 0);
        }

        ret = SSL_connect(self->ssl);
        if (ret == 1) {
            /* handshake done immediately */
            goto handshake_done;
        } else {
            int err = SSL_get_error(self->ssl, ret);
            if (err == SSL_ERROR_WANT_READ) {
                start_reading(self);
            } else if (err == SSL_ERROR_WANT_WRITE) {
                start_writing(self);
            } else {
                capture_tls_error(self);
                fail_connection(self, "SSL_connect failed");
                return;
            }
            /* continue TLS handshake in io_cb */
            return;
        }
    }
handshake_done:
#endif

    if (self->protocol == PROTO_NATIVE) {
        /* Send ClientHello and wait for ServerHello */
        size_t hello_len;
        char *hello = build_native_hello(self, &hello_len);
        send_replace(self, hello, hello_len);



( run in 0.312 second using v1.01-cache-2.11-cpan-13bb782fe5a )