view release on metacpan or search on metacpan
build/lib/src/result_stream.c view on Meta::CPAN
neo4j_result_stream_t _result_stream;
neo4j_connection_t *connection;
neo4j_job_t job;
neo4j_logger_t *logger;
neo4j_memory_allocator_t *allocator;
neo4j_mpool_t mpool;
neo4j_mpool_t record_mpool;
unsigned int refcount;
unsigned int starting;
unsigned int streaming;
int statement_type;
struct neo4j_statement_plan *statement_plan;
struct neo4j_update_counts update_counts;
unsigned long long available_after;
unsigned long long consumed_after;
int failure;
struct neo4j_failure_details failure_details;
unsigned int nfields;
const char *const *fields;
result_record_t *records;
build/lib/src/result_stream.c view on Meta::CPAN
if (neo4j_session_pull_all(results->connection, -1, -1,
&(results->record_mpool), pull_all_callback, results))
{
neo4j_log_debug_errno(results->logger, "neo4j_session_pull_all failed");
goto failure;
}
(results->refcount)++;
results->starting = true;
results->streaming = true;
return &(results->_result_stream);
int errsv;
failure:
errsv = errno;
run_rs_close(&(results->_result_stream));
errno = errsv;
return NULL;
}
build/lib/src/result_stream.c view on Meta::CPAN
if (neo4j_session_discard_all(results->connection, -1, -1,
&(results->mpool), discard_all_callback, results))
{
neo4j_log_debug_errno(results->logger,
"neo4j_connection_discard_all failed");
goto failure;
}
(results->refcount)++;
results->starting = true;
results->streaming = true;
return &(results->_result_stream);
int errsv;
failure:
errsv = errno;
run_rs_close(&(results->_result_stream));
errno = errsv;
return NULL;
}
build/lib/src/result_stream.c view on Meta::CPAN
REQUIRE(results != NULL, NULL);
if (results->last_fetched != NULL)
{
result_record_release(results->last_fetched);
results->last_fetched = NULL;
}
if (results->records == NULL)
{
if (!results->streaming)
{
errno = results->failure;
return NULL;
}
assert(results->failure == 0);
results->awaiting_records = 1;
if (await(results, &(results->awaiting_records)))
{
errno = results->failure;
return NULL;
}
if (results->records == NULL)
{
assert(!results->streaming);
errno = results->failure;
return NULL;
}
}
result_record_t *record = results->records;
results->records = record->next;
--(results->records_depth);
if (results->records == NULL)
{
build/lib/src/result_stream.c view on Meta::CPAN
neo4j_result_t *run_rs_peek(neo4j_result_stream_t *self, unsigned int depth)
{
run_result_stream_t *results = container_of(self,
run_result_stream_t, _result_stream);
REQUIRE(results != NULL, NULL);
if (results->records_depth <= depth)
{
if (!results->streaming)
{
errno = results->failure;
return NULL;
}
assert(results->failure == 0);
results->awaiting_records = depth - results->records_depth + 1;
if (await(results, &(results->awaiting_records)))
{
errno = results->failure;
return NULL;
}
if (results->records_depth <= depth)
{
assert(!results->streaming);
errno = results->failure;
return NULL;
}
}
result_record_t *record = results->records;
assert(record != NULL);
for (unsigned int i = depth; i > 0; --i)
{
record = record->next;
build/lib/src/result_stream.c view on Meta::CPAN
return results->available_after;
}
unsigned long long run_rs_consumed_after(neo4j_result_stream_t *self)
{
run_result_stream_t *results = container_of(self,
run_result_stream_t, _result_stream);
REQUIRE(results != NULL, 0);
if (results->failure != 0 || await(results, &(results->streaming)))
{
assert(results->failure != 0);
errno = results->failure;
return 0;
}
return results->consumed_after;
}
int run_rs_statement_type(neo4j_result_stream_t *self)
{
run_result_stream_t *results = container_of(self,
run_result_stream_t, _result_stream);
REQUIRE(results != NULL, -1);
if (results->failure != 0 || await(results, &(results->streaming)))
{
assert(results->failure != 0);
errno = results->failure;
return -1;
}
return results->statement_type;
}
struct neo4j_statement_plan *run_rs_statement_plan(neo4j_result_stream_t *self)
{
run_result_stream_t *results = container_of(self,
run_result_stream_t, _result_stream);
REQUIRE(results != NULL, NULL);
if (results->failure != 0 || await(results, &(results->streaming)))
{
assert(results->failure != 0);
errno = results->failure;
return NULL;
}
if (results->statement_plan == NULL)
{
errno = NEO4J_NO_PLAN_AVAILABLE;
return NULL;
build/lib/src/result_stream.c view on Meta::CPAN
struct neo4j_update_counts run_rs_update_counts(neo4j_result_stream_t *self)
{
run_result_stream_t *results = container_of(self,
run_result_stream_t, _result_stream);
if (results == NULL)
{
errno = EINVAL;
goto failure;
}
if (results->failure != 0 || await(results, &(results->streaming)))
{
assert(results->failure != 0);
errno = results->failure;
goto failure;
}
return results->update_counts;
struct neo4j_update_counts counts;
failure:
build/lib/src/result_stream.c view on Meta::CPAN
return counts;
}
int run_rs_close(neo4j_result_stream_t *self)
{
run_result_stream_t *results = container_of(self,
run_result_stream_t, _result_stream);
REQUIRE(results != NULL, -1);
results->streaming = false;
assert(results->refcount > 0);
--(results->refcount);
int err = await(results, &(results->refcount));
// even if await fails, queued messages should still be drained
assert(results->refcount == 0);
if (results->connection != NULL)
{
neo4j_detach_job(results->connection, (neo4j_job_t *)&(results->job));
results->connection = NULL;
build/lib/src/result_stream.c view on Meta::CPAN
{
run_result_stream_t *results = container_of(job,
run_result_stream_t, job);
if (results == NULL || results->connection == NULL)
{
return;
}
job->next = NULL;
results->connection = NULL;
if (results->streaming && results->failure == 0)
{
set_failure(results, err);
}
}
int run_callback(void *cdata, neo4j_message_type_t type,
const neo4j_value_t *argv, uint16_t argc)
{
assert(cdata != NULL);
build/lib/src/result_stream.c view on Meta::CPAN
if (append_result(results, argv, argc))
{
neo4j_log_trace_errno(results->logger, "append_result failed");
set_failure(results, errno);
return -1;
}
return 1;
}
--(results->refcount);
results->streaming = false;
// not a record, so keep this memory along with the result stream
if (neo4j_mpool_merge(&(results->mpool), &(results->record_mpool)) < 0)
{
neo4j_log_trace_errno(results->logger, "neo4j_mpool_merge failed");
set_failure(results, errno);
return -1;
}
return stream_end(results, type, "PULL_ALL", argv, argc);
build/lib/src/result_stream.c view on Meta::CPAN
int discard_all_callback(void *cdata, neo4j_message_type_t type,
const neo4j_value_t *argv, uint16_t argc)
{
assert(cdata != NULL);
assert(argc == 0 || argv != NULL);
run_result_stream_t *results = (run_result_stream_t *)cdata;
--(results->refcount);
results->streaming = false;
return stream_end(results, type, "DISCARD_ALL", argv, argc);
}
int stream_end(run_result_stream_t *results, neo4j_message_type_t type,
const char *src_message_type, const neo4j_value_t *argv, uint16_t argc)
{
neo4j_logger_t *logger = results->logger;
neo4j_connection_t *connection = results->connection;
build/lib/src/result_stream.c view on Meta::CPAN
neo4j_log_error(results->logger,
"Invalid field in RECORD message received in %p"
" (got %s, expected List)", (void *)connection,
neo4j_typestr(arg_type));
errno = EPROTO;
return -1;
}
(results->nrecords)++;
if (!results->streaming)
{
// discard memory for the record
neo4j_mpool_drain(&(results->record_mpool));
return 0;
}
assert(connection != NULL);
neo4j_config_t *config = connection->config;
result_record_t *record = neo4j_mpool_calloc(&(results->record_mpool),
build/lib/src/result_stream.c view on Meta::CPAN
return 0;
}
void set_failure(run_result_stream_t *results, int error)
{
assert(results != NULL);
assert(error != 0);
results->failure = error;
results->streaming = false;
results->awaiting_records = 0;
memset(&(results->failure_details), 0, sizeof(results->failure_details));
}
build/lib/src/transaction.c view on Meta::CPAN
int tx_commit(neo4j_transaction_t *tx);
int tx_rollback(neo4j_transaction_t *tx);
neo4j_result_stream_t *tx_run(neo4j_transaction_t *tx, const char *statement, neo4j_value_t params, int send);
// rough out the transaction based calls
// there will be bookkeeping to do - handling errors when server state
// is mismatch with the request:
// - when server is READY but run_in_tx is exec'd
// - when server is STREAMING but run is exec'd <- may be handled by the results->starting,
// results->streaming flags
// - when server is TX_READY but run or send is exec'd
// - when server is TX_STREAMING but run or send is exec'd
// erroring when negotiated protocol is not 3+
// Note, BEGIN, COMMIT, ROLLBACK responses don't belong on a results stream. So maybe
// should have a transaction structure, analogous to the results structure, that
// stores info like success responses, failure responses, bookmarks.
// begin_tx - specify timeout and mode, but ignore bookmarks and metadata ATM
// must check neo4j_tx_failure(tx)