EV-Pg
view release on metacpan or search on metacpan
ev_pg_cb_t *cbt = self->cb_head;
if (!cbt->is_pipeline_sync) {
advance_cb_queue(self);
} else {
update_idle_ref(self);
}
} else {
update_idle_ref(self);
}
}
if (self->copy_mode) break;
if (!self->cb_head) break;
continue;
}
{
ExecStatusType st = PQresultStatus(res);
if (st == PGRES_PIPELINE_SYNC) {
/* Skip stale pipeline syncs left by skip_pending */
if (self->skip_results > 0) {
self->skip_results--;
PQclear(res);
continue;
}
int consumed = deliver_result(self, res);
if (self->magic != EV_PG_MAGIC) {
if (last_res) PQclear(last_res);
return;
}
if (!consumed) advance_cb_queue(self);
continue;
}
if (st == PGRES_COPY_IN || st == PGRES_COPY_OUT || st == PGRES_COPY_BOTH) {
if (self->skip_results > 0) {
/* Skipped COPY â must use protocol-correct drain */
int drained = 0;
PQclear(res);
if (st == PGRES_COPY_OUT || st == PGRES_COPY_BOTH) {
char *buf;
int rc;
while ((rc = PQgetCopyData(self->conn, &buf, 1)) > 0)
PQfreemem(buf);
/* rc == -1: COPY done, rc == 0: would block */
if (rc == 0) {
self->draining_copy = (st == PGRES_COPY_BOTH) ? 3 : 1;
update_idle_ref(self);
continue;
}
}
if (st == PGRES_COPY_IN || st == PGRES_COPY_BOTH) {
int ce = PQputCopyEnd(self->conn, "skipped");
if (ce == -1) {
if (last_res) PQclear(last_res);
handle_conn_loss(self);
return;
}
/* ce == 0: END queued, flush pending; ce > 0: END sent.
* asyncStatus is already PGASYNC_BUSY â fall through to
* drain COMMAND_OK (or set draining_single_row if busy). */
check_flush(self);
if (self->magic != EV_PG_MAGIC || !self->conn) {
if (last_res) PQclear(last_res);
return;
}
}
/* Drain COMMAND_OK + NULL */
while (self->conn && !PQisBusy(self->conn)) {
PGresult *r = PQgetResult(self->conn);
if (NULL == r) { drained = 1; break; }
PQclear(r);
}
if (drained)
self->skip_results--;
else {
self->draining_single_row = 1;
update_idle_ref(self);
}
continue;
}
self->copy_mode = 1;
{
int consumed = deliver_result(self, res);
if (self->magic != EV_PG_MAGIC) {
if (last_res) PQclear(last_res);
return;
}
if (consumed) {
self->copy_mode = 0;
update_idle_ref(self);
}
}
if (last_res) { PQclear(last_res); last_res = NULL; }
if (self->copy_mode) break;
/* COPY completed inside callback (e.g. get_copy_data
* returned -1); continue to drain COMMAND_OK */
continue;
}
if (st == PGRES_SINGLE_TUPLE
#ifdef LIBPQ_HAS_CHUNK_MODE
|| st == PGRES_TUPLES_CHUNK
#endif
) {
if (self->skip_results > 0) {
/* Skipped single-row stream â drain remaining rows */
int drained = 0;
PQclear(res);
while (self->conn && !PQisBusy(self->conn)) {
PGresult *drain = PQgetResult(self->conn);
if (NULL == drain) { drained = 1; break; }
PQclear(drain);
}
if (drained)
self->skip_results--;
else {
self->draining_single_row = 1;
update_idle_ref(self);
}
continue;
}
( run in 0.765 second using v1.01-cache-2.11-cpan-5a3173703d6 )