Neo4j-Client

 view release on metacpan or  search on metacpan

build/lib/src/result_stream.c  view on Meta::CPAN

    neo4j_result_stream_t _result_stream;

    neo4j_connection_t *connection;
    neo4j_job_t job;
    neo4j_logger_t *logger;
    neo4j_memory_allocator_t *allocator;
    neo4j_mpool_t mpool;
    neo4j_mpool_t record_mpool;
    unsigned int refcount;
    unsigned int starting;
    unsigned int streaming;
    int statement_type;
    struct neo4j_statement_plan *statement_plan;
    struct neo4j_update_counts update_counts;
    unsigned long long available_after;
    unsigned long long consumed_after;
    int failure;
    struct neo4j_failure_details failure_details;
    unsigned int nfields;
    const char *const *fields;
    result_record_t *records;

build/lib/src/result_stream.c  view on Meta::CPAN


    if (neo4j_session_pull_all(results->connection, -1, -1,
            &(results->record_mpool), pull_all_callback, results))
    {
        neo4j_log_debug_errno(results->logger, "neo4j_session_pull_all failed");
        goto failure;
    }
    (results->refcount)++;

    results->starting = true;
    results->streaming = true;
    return &(results->_result_stream);

    int errsv;
failure:
    errsv = errno;
    run_rs_close(&(results->_result_stream));
    errno = errsv;
    return NULL;
}

build/lib/src/result_stream.c  view on Meta::CPAN

    if (neo4j_session_discard_all(results->connection, -1, -1,
            &(results->mpool), discard_all_callback, results))
    {
        neo4j_log_debug_errno(results->logger,
                "neo4j_connection_discard_all failed");
        goto failure;
    }
    (results->refcount)++;

    results->starting = true;
    results->streaming = true;
    return &(results->_result_stream);

    int errsv;
failure:
    errsv = errno;
    run_rs_close(&(results->_result_stream));
    errno = errsv;
    return NULL;
}

build/lib/src/result_stream.c  view on Meta::CPAN

    REQUIRE(results != NULL, NULL);

    if (results->last_fetched != NULL)
    {
        result_record_release(results->last_fetched);
        results->last_fetched = NULL;
    }

    if (results->records == NULL)
    {
        if (!results->streaming)
        {
            errno = results->failure;
            return NULL;
        }
        assert(results->failure == 0);
        results->awaiting_records = 1;
        if (await(results, &(results->awaiting_records)))
        {
            errno = results->failure;
            return NULL;
        }
        if (results->records == NULL)
        {
            assert(!results->streaming);
            errno = results->failure;
            return NULL;
        }
    }

    result_record_t *record = results->records;
    results->records = record->next;
    --(results->records_depth);
    if (results->records == NULL)
    {

build/lib/src/result_stream.c  view on Meta::CPAN



neo4j_result_t *run_rs_peek(neo4j_result_stream_t *self, unsigned int depth)
{
    run_result_stream_t *results = container_of(self,
            run_result_stream_t, _result_stream);
    REQUIRE(results != NULL, NULL);

    if (results->records_depth <= depth)
    {
        if (!results->streaming)
        {
            errno = results->failure;
            return NULL;
        }

        assert(results->failure == 0);
        results->awaiting_records = depth - results->records_depth + 1;
        if (await(results, &(results->awaiting_records)))
        {
            errno = results->failure;
            return NULL;
        }

        if (results->records_depth <= depth)
        {
            assert(!results->streaming);
            errno = results->failure;
            return NULL;
        }
    }

    result_record_t *record = results->records;
    assert(record != NULL);
    for (unsigned int i = depth; i > 0; --i)
    {
        record = record->next;

build/lib/src/result_stream.c  view on Meta::CPAN

    return results->available_after;
}


unsigned long long run_rs_consumed_after(neo4j_result_stream_t *self)
{
    run_result_stream_t *results = container_of(self,
            run_result_stream_t, _result_stream);
    REQUIRE(results != NULL, 0);

    if (results->failure != 0 || await(results, &(results->streaming)))
    {
        assert(results->failure != 0);
        errno = results->failure;
        return 0;
    }

    return results->consumed_after;
}


int run_rs_statement_type(neo4j_result_stream_t *self)
{
    run_result_stream_t *results = container_of(self,
            run_result_stream_t, _result_stream);
    REQUIRE(results != NULL, -1);

    if (results->failure != 0 || await(results, &(results->streaming)))
    {
        assert(results->failure != 0);
        errno = results->failure;
        return -1;
    }

    return results->statement_type;
}


struct neo4j_statement_plan *run_rs_statement_plan(neo4j_result_stream_t *self)
{
    run_result_stream_t *results = container_of(self,
            run_result_stream_t, _result_stream);
    REQUIRE(results != NULL, NULL);

    if (results->failure != 0 || await(results, &(results->streaming)))
    {
        assert(results->failure != 0);
        errno = results->failure;
        return NULL;
    }

    if (results->statement_plan == NULL)
    {
        errno = NEO4J_NO_PLAN_AVAILABLE;
        return NULL;

build/lib/src/result_stream.c  view on Meta::CPAN

struct neo4j_update_counts run_rs_update_counts(neo4j_result_stream_t *self)
{
    run_result_stream_t *results = container_of(self,
            run_result_stream_t, _result_stream);
    if (results == NULL)
    {
        errno = EINVAL;
        goto failure;
    }

    if (results->failure != 0 || await(results, &(results->streaming)))
    {
        assert(results->failure != 0);
        errno = results->failure;
        goto failure;
    }

    return results->update_counts;

    struct neo4j_update_counts counts;
failure:

build/lib/src/result_stream.c  view on Meta::CPAN

    return counts;
}


int run_rs_close(neo4j_result_stream_t *self)
{
    run_result_stream_t *results = container_of(self,
            run_result_stream_t, _result_stream);
    REQUIRE(results != NULL, -1);

    results->streaming = false;
    assert(results->refcount > 0);
    --(results->refcount);
    int err = await(results, &(results->refcount));
    // even if await fails, queued messages should still be drained
    assert(results->refcount == 0);

    if (results->connection != NULL)
    {
        neo4j_detach_job(results->connection, (neo4j_job_t *)&(results->job));
        results->connection = NULL;

build/lib/src/result_stream.c  view on Meta::CPAN

{
    run_result_stream_t *results = container_of(job,
            run_result_stream_t, job);
    if (results == NULL || results->connection == NULL)
    {
        return;
    }

    job->next = NULL;
    results->connection = NULL;
    if (results->streaming && results->failure == 0)
    {
        set_failure(results, err);
    }
}


int run_callback(void *cdata, neo4j_message_type_t type,
        const neo4j_value_t *argv, uint16_t argc)
{
    assert(cdata != NULL);

build/lib/src/result_stream.c  view on Meta::CPAN

        if (append_result(results, argv, argc))
        {
            neo4j_log_trace_errno(results->logger, "append_result failed");
            set_failure(results, errno);
            return -1;
        }
        return 1;
    }

    --(results->refcount);
    results->streaming = false;

    // not a record, so keep this memory along with the result stream
    if (neo4j_mpool_merge(&(results->mpool), &(results->record_mpool)) < 0)
    {
        neo4j_log_trace_errno(results->logger, "neo4j_mpool_merge failed");
        set_failure(results, errno);
        return -1;
    }

    return stream_end(results, type, "PULL_ALL", argv, argc);

build/lib/src/result_stream.c  view on Meta::CPAN



int discard_all_callback(void *cdata, neo4j_message_type_t type,
        const neo4j_value_t *argv, uint16_t argc)
{
    assert(cdata != NULL);
    assert(argc == 0 || argv != NULL);
    run_result_stream_t *results = (run_result_stream_t *)cdata;

    --(results->refcount);
    results->streaming = false;

    return stream_end(results, type, "DISCARD_ALL", argv, argc);
}


int stream_end(run_result_stream_t *results, neo4j_message_type_t type,
        const char *src_message_type, const neo4j_value_t *argv, uint16_t argc)
{
    neo4j_logger_t *logger = results->logger;
    neo4j_connection_t *connection = results->connection;

build/lib/src/result_stream.c  view on Meta::CPAN

        neo4j_log_error(results->logger,
                "Invalid field in RECORD message received in %p"
                " (got %s, expected List)", (void *)connection,
                neo4j_typestr(arg_type));
        errno = EPROTO;
        return -1;
    }

    (results->nrecords)++;

    if (!results->streaming)
    {
        // discard memory for the record
        neo4j_mpool_drain(&(results->record_mpool));
        return 0;
    }

    assert(connection != NULL);
    neo4j_config_t *config = connection->config;

    result_record_t *record = neo4j_mpool_calloc(&(results->record_mpool),

build/lib/src/result_stream.c  view on Meta::CPAN


    return 0;
}


void set_failure(run_result_stream_t *results, int error)
{
    assert(results != NULL);
    assert(error != 0);
    results->failure = error;
    results->streaming = false;
    results->awaiting_records = 0;
    memset(&(results->failure_details), 0, sizeof(results->failure_details));
}

build/lib/src/transaction.c  view on Meta::CPAN

int tx_commit(neo4j_transaction_t *tx);
int tx_rollback(neo4j_transaction_t *tx);
neo4j_result_stream_t *tx_run(neo4j_transaction_t *tx, const char *statement, neo4j_value_t params, int send);


// rough out the transaction based calls
// there will be bookkeeping to do - handling errors when server state
// is mismatch with the request:
// - when server is READY but run_in_tx is exec'd
// - when server is STREAMING but run is exec'd <- may be handled by the results->starting,
//   results->streaming flags
// - when server is TX_READY but run or send is exec'd
// - when server is TX_STREAMING but run or send is exec'd
// erroring when negotiated protocol is not 3+

// Note, BEGIN, COMMIT, ROLLBACK responses don't belong on a results stream. So maybe
// should have a transaction structure, analogous to the results structure, that
// stores info like success responses, failure responses, bookmarks.

// begin_tx - specify timeout and mode, but ignore bookmarks and metadata ATM
// must check neo4j_tx_failure(tx)



( run in 0.251 second using v1.01-cache-2.11-cpan-4d50c553e7e )