EV-Pg

 view release on metacpan or  search on metacpan

Pg.xs  view on Meta::CPAN

                : st == PGRES_PIPELINE_ABORTED ? "pipeline aborted"
                : st == PGRES_BAD_RESPONSE     ? "bad server response"
                : st == PGRES_EMPTY_QUERY      ? "empty query"
                : "unknown error";
            if (st == PGRES_FATAL_ERROR)
                STORE_LAST_HV(self->last_error_fields, build_error_fields(res));
            PUSHs(&PL_sv_undef);
            PUSHs(sv_2mortal(newSVpv(msg, 0)));
        }
        else if (st == PGRES_PIPELINE_SYNC) {
            PUSHs(sv_2mortal(newSViv(1)));
        }
        else if (st == PGRES_COPY_IN || st == PGRES_COPY_OUT || st == PGRES_COPY_BOTH) {
            const char *tag = (st == PGRES_COPY_IN) ? "COPY_IN"
                            : (st == PGRES_COPY_OUT) ? "COPY_OUT"
                            : "COPY_BOTH";
            PUSHs(sv_2mortal(newSVpv(tag, 0)));
        }
        else if (cbt->is_describe) {
            HV *meta = newHV();
            int nf = PQnfields(res);
            int np = PQnparams(res);
            int i;

            (void)hv_store(meta, "nfields", 7, newSViv(nf), 0);
            (void)hv_store(meta, "nparams", 7, newSViv(np), 0);

            if (nf > 0) {
                AV *fields = newAV();
                av_extend(fields, nf - 1);
                for (i = 0; i < nf; i++) {
                    HV *fld = newHV();
                    (void)hv_store(fld, "name", 4, newSVpv(PQfname(res, i), 0), 0);
                    (void)hv_store(fld, "type", 4, newSVuv(PQftype(res, i)), 0);
                    av_push(fields, newRV_noinc((SV*)fld));
                }
                (void)hv_store(meta, "fields", 6, newRV_noinc((SV*)fields), 0);
            }

            if (np > 0) {
                AV *ptypes = newAV();
                av_extend(ptypes, np - 1);
                for (i = 0; i < np; i++) {
                    av_push(ptypes, newSVuv(PQparamtype(res, i)));
                }
                (void)hv_store(meta, "paramtypes", 10, newRV_noinc((SV*)ptypes), 0);
            }

            PUSHs(sv_2mortal(newRV_noinc((SV*)meta)));
        }
        else if (st == PGRES_TUPLES_OK || st == PGRES_SINGLE_TUPLE
#ifdef LIBPQ_HAS_CHUNK_MODE
                 || st == PGRES_TUPLES_CHUNK
#endif
                ) {
            int nrows = PQntuples(res);
            int ncols = PQnfields(res);
            AV *rows = newAV();
            int r, c;
            /* Defer metadata building until result_meta is called.
             * In streaming mode, capture only the first result per query
             * (meta_fresh is cleared by advance_cb_queue). */
            if (st == PGRES_TUPLES_OK || !self->meta_fresh) {
                RELEASE_LAST_HV(self->last_result_meta);
                self->meta_fresh = 1;
            }
            if (nrows > 0) av_extend(rows, nrows - 1);
            for (r = 0; r < nrows; r++) {
                AV *row = newAV();
                if (ncols > 0) av_extend(row, ncols - 1);
                for (c = 0; c < ncols; c++) {
                    if (PQgetisnull(res, r, c)) {
                        av_push(row, newSV(0));
                    } else {
                        av_push(row, newSVpvn(PQgetvalue(res, r, c),
                                              PQgetlength(res, r, c)));
                    }
                }
                av_push(rows, newRV_noinc((SV*)row));
            }
            PUSHs(sv_2mortal(newRV_noinc((SV*)rows)));
        }
        else {
            /* COMMAND_OK — pass cmd_tuples string */
            const char *ct = PQcmdTuples(res);
            RELEASE_LAST_HV(self->last_result_meta);
            PUSHs(sv_2mortal(newSVpv(ct ? ct : "", 0)));
        }

        PUTBACK;

        /* Defer PQclear — keep result for lazy result_meta.
         * Must happen before callback so result_meta works inside callbacks. */
        if (self->meta_res) PQclear(self->meta_res);
        self->meta_res = res;

        CALL_SV_GUARDED(cbt->cb, "callback");
        FREETMPS;
        LEAVE;
    }

    {
        int consumed = (self->delivering_cbt == NULL);
        self->delivering_cbt = NULL;
        self->callback_depth--;
        return consumed;
    }
}

static void advance_cb_queue(ev_pg_t *self) {
    ev_pg_cb_t *cbt;

    if (!self->cb_head) return;

    cbt = self->cb_head;
    self->cb_head = cbt->next;
    if (!self->cb_head) self->cb_tail = NULL;
    self->pending_count--;
    self->meta_fresh = 0;
    SvREFCNT_dec(cbt->cb);
    release_cbt(cbt);



( run in 0.863 second using v1.01-cache-2.11-cpan-140bd7fdf52 )