EV-ClickHouse

 view release on metacpan or  search on metacpan

xs/proto_native_parse.c  view on Meta::CPAN

            stop_timing(self);

            /* Query error — deliver to callback */
            CLEAR_SV(self->native_rows);
            CLEAR_INSERT(self);
            CLEAR_STR(self->insert_err);
            self->native_state = NATIVE_IDLE;
            self->recv_len = 0; /* flush malformed data */
            if (self->send_count > 0) self->send_count--;
            lc_free_dicts(self);
            int destroyed = deliver_error(self, errmsg);
            Safefree(errmsg);
            if (destroyed) return;

            /* advance pipeline — may free self via try_write error */
            pipeline_advance(self);
            return;
        }

        if (rc == 2) {
            /* EndOfStream — deliver accumulated rows or deferred error */
            stop_timing(self);
            self->native_state = NATIVE_IDLE;
            if (self->send_count > 0) self->send_count--;
            lc_free_dicts(self);

            /* Flush any uncoalesced progress accumulated since the last
             * fire so users instrumenting via on_progress see the full
             * total when the query completes within one progress_period
             * of the last fire. */
            if (self->on_progress && self->progress_period > 0) {
                int any = 0, i;
                for (i = 0; i < 5; i++) if (self->progress_acc[i]) { any = 1; break; }
                if (any) {
                    uint64_t pp[5];
                    memcpy(pp, self->progress_acc, sizeof(pp));
                    memset(self->progress_acc, 0, sizeof(self->progress_acc));
                    if (fire_progress_cb(self, pp) < 0) return;
                }
            }

            if (self->insert_err) {
                char *err = self->insert_err;
                self->insert_err = NULL;
                CLEAR_SV(self->native_rows);
                int destroyed = deliver_error(self, err);
                Safefree(err);
                if (destroyed) return;
            } else {
                AV *rows = self->native_rows;
                self->native_rows = NULL;
                if (deliver_rows(self, rows)) return;
            }

            /* advance pipeline — may free self via try_write error */
            pipeline_advance(self);
            return;
        }

        if (rc == 3) {
            /* Pong — ack a keepalive ping, or deliver to user's ping() cb */
            self->native_state = NATIVE_IDLE;
            if (self->ka_in_flight > 0) {
                /* Keepalive ack: not tied to send_count or any user cb */
                self->ka_in_flight--;
                continue;
            }
            stop_timing(self);
            if (self->send_count > 0) self->send_count--;
            AV *rows = newAV();
            if (deliver_rows(self, rows)) return;
            pipeline_advance(self);
            return;
        }

        /* rc == 1: Data/Progress/ProfileInfo — continue reading */
    }
}



( run in 1.239 second using v1.01-cache-2.11-cpan-f56aa216473 )