EV-Pg

 view release on metacpan or  search on metacpan

Pg.xs  view on Meta::CPAN

                    ev_pg_cb_t *cbt = self->cb_head;
                    if (!cbt->is_pipeline_sync) {
                        advance_cb_queue(self);
                    } else {
                        update_idle_ref(self);
                    }
                } else {
                    update_idle_ref(self);
                }
            }

            if (self->copy_mode) break;

            if (!self->cb_head) break;
            continue;
        }

        {
            ExecStatusType st = PQresultStatus(res);
            if (st == PGRES_PIPELINE_SYNC) {
                /* Skip stale pipeline syncs left by skip_pending */
                if (self->skip_results > 0) {
                    self->skip_results--;
                    PQclear(res);
                    continue;
                }
                int consumed = deliver_result(self, res);
                if (self->magic != EV_PG_MAGIC) {
                    if (last_res) PQclear(last_res);
                    return;
                }
                if (!consumed) advance_cb_queue(self);
                continue;
            }
            if (st == PGRES_COPY_IN || st == PGRES_COPY_OUT || st == PGRES_COPY_BOTH) {
                if (self->skip_results > 0) {
                    /* Skipped COPY — must use protocol-correct drain */
                    int drained = 0;
                    PQclear(res);
                    if (st == PGRES_COPY_OUT || st == PGRES_COPY_BOTH) {
                        char *buf;
                        int rc;
                        while ((rc = PQgetCopyData(self->conn, &buf, 1)) > 0)
                            PQfreemem(buf);
                        /* rc == -1: COPY done, rc == 0: would block */
                        if (rc == 0) {
                            self->draining_copy = (st == PGRES_COPY_BOTH) ? 3 : 1;
                            update_idle_ref(self);
                            continue;
                        }
                    }
                    if (st == PGRES_COPY_IN || st == PGRES_COPY_BOTH) {
                        int ce = PQputCopyEnd(self->conn, "skipped");
                        if (ce == -1) {
                            if (last_res) PQclear(last_res);
                            handle_conn_loss(self);
                            return;
                        }
                        /* ce == 0: END queued, flush pending; ce > 0: END sent.
                         * asyncStatus is already PGASYNC_BUSY — fall through to
                         * drain COMMAND_OK (or set draining_single_row if busy). */
                        check_flush(self);
                        if (self->magic != EV_PG_MAGIC || !self->conn) {
                            if (last_res) PQclear(last_res);
                            return;
                        }
                    }
                    /* Drain COMMAND_OK + NULL */
                    while (self->conn && !PQisBusy(self->conn)) {
                        PGresult *r = PQgetResult(self->conn);
                        if (NULL == r) { drained = 1; break; }
                        PQclear(r);
                    }
                    if (drained)
                        self->skip_results--;
                    else {
                        self->draining_single_row = 1;
                        update_idle_ref(self);
                    }
                    continue;
                }
                self->copy_mode = 1;
                {
                    int consumed = deliver_result(self, res);
                    if (self->magic != EV_PG_MAGIC) {
                        if (last_res) PQclear(last_res);
                        return;
                    }
                    if (consumed) {
                        self->copy_mode = 0;
                        update_idle_ref(self);
                    }
                }
                if (last_res) { PQclear(last_res); last_res = NULL; }
                if (self->copy_mode) break;
                /* COPY completed inside callback (e.g. get_copy_data
                 * returned -1); continue to drain COMMAND_OK */
                continue;
            }
            if (st == PGRES_SINGLE_TUPLE
#ifdef LIBPQ_HAS_CHUNK_MODE
                || st == PGRES_TUPLES_CHUNK
#endif
               ) {
                if (self->skip_results > 0) {
                    /* Skipped single-row stream — drain remaining rows */
                    int drained = 0;
                    PQclear(res);
                    while (self->conn && !PQisBusy(self->conn)) {
                        PGresult *drain = PQgetResult(self->conn);
                        if (NULL == drain) { drained = 1; break; }
                        PQclear(drain);
                    }
                    if (drained)
                        self->skip_results--;
                    else {
                        self->draining_single_row = 1;
                        update_idle_ref(self);
                    }
                    continue;
                }



( run in 0.765 second using v1.01-cache-2.11-cpan-5a3173703d6 )