Neo4j-Client

 view release on metacpan or  search on metacpan

build/lib/src/result_stream.c  view on Meta::CPAN

}


int neo4j_close_results(neo4j_result_stream_t *results)
{
    REQUIRE(results != NULL, -1);
    return results->close(results);
}


neo4j_value_t neo4j_result_field(const neo4j_result_t *result,
        unsigned int index)
{
    REQUIRE(result != NULL, neo4j_null);
    return result->field(result, index);
}


neo4j_result_t *neo4j_retain(neo4j_result_t *result)
{
    REQUIRE(result != NULL, NULL);
    return result->retain(result);
}


void neo4j_release(neo4j_result_t *result)
{
    assert(result != NULL);
    result->release(result);
}



typedef struct run_result_stream run_result_stream_t;

typedef struct result_record result_record_t;
struct result_record
{
    neo4j_result_t _result;

    unsigned int refcount;
    neo4j_mpool_t mpool;
    neo4j_value_t list;
    // TODO: add skip list for faster peeking
    result_record_t *next;
};


struct run_result_stream
{
    neo4j_result_stream_t _result_stream;

    neo4j_connection_t *connection;
    neo4j_job_t job;
    neo4j_logger_t *logger;
    neo4j_memory_allocator_t *allocator;
    neo4j_mpool_t mpool;
    neo4j_mpool_t record_mpool;
    unsigned int refcount;
    unsigned int starting;
    unsigned int streaming;
    int statement_type;
    struct neo4j_statement_plan *statement_plan;
    struct neo4j_update_counts update_counts;
    unsigned long long available_after;
    unsigned long long consumed_after;
    int failure;
    struct neo4j_failure_details failure_details;
    unsigned int nfields;
    const char *const *fields;
    result_record_t *records;
    result_record_t *records_tail;
    unsigned long long records_depth;
    result_record_t *last_fetched;
    unsigned long long nrecords;
    unsigned int awaiting_records;
};


static run_result_stream_t *run_rs_open(neo4j_connection_t *connection);
static int run_rs_check_failure(neo4j_result_stream_t *self);
static const char *run_rs_error_code(neo4j_result_stream_t *self);
static const char *run_rs_error_message(neo4j_result_stream_t *self);
const struct neo4j_failure_details *run_rs_failure_details(
        neo4j_result_stream_t *results);
static unsigned int run_rs_nfields(neo4j_result_stream_t *results);
static const char *run_rs_fieldname(neo4j_result_stream_t *self,
        unsigned int index);
static neo4j_result_t *run_rs_fetch_next(neo4j_result_stream_t *self);
static neo4j_result_t *run_rs_peek(neo4j_result_stream_t *self,
        unsigned int depth);
static unsigned long long run_rs_count(neo4j_result_stream_t *self);
static unsigned long long run_rs_available_after(neo4j_result_stream_t *self);
static unsigned long long run_rs_consumed_after(neo4j_result_stream_t *self);
static int run_rs_statement_type(neo4j_result_stream_t *self);
static struct neo4j_statement_plan *run_rs_statement_plan(
        neo4j_result_stream_t *self);
static struct neo4j_update_counts run_rs_update_counts(
        neo4j_result_stream_t *self);
static int run_rs_close(neo4j_result_stream_t *self);

static neo4j_value_t run_result_field(const neo4j_result_t *self,
        unsigned int index);
static neo4j_result_t *run_result_retain(neo4j_result_t *self);
static void run_result_release(neo4j_result_t *self);

static void abort_job(neo4j_job_t *job, int err);
static int run_callback(void *cdata, neo4j_message_type_t type,
        const neo4j_value_t *argv, uint16_t argc);
static int pull_all_callback(void *cdata, neo4j_message_type_t type,
        const neo4j_value_t *argv, uint16_t argc);
static int discard_all_callback(void *cdata, neo4j_message_type_t type,
        const neo4j_value_t *argv, uint16_t argc);
static int stream_end(run_result_stream_t *results, neo4j_message_type_t type,
        const char *src_message_type, const neo4j_value_t *argv, uint16_t argc);
static int await(run_result_stream_t *results, const unsigned int *condition);
static int append_result(run_result_stream_t *results,
        const neo4j_value_t *argv, uint16_t argc);
void result_record_release(result_record_t *record);
static int set_eval_failure(run_result_stream_t *results,
        const char *src_message_type, const neo4j_value_t *argv, uint16_t argc);

build/lib/src/result_stream.c  view on Meta::CPAN

    {
      g_extra_ents[i] = p->entries[i];
    }
  g_extra_map = neo4j_map(g_extra_ents,i);
  return 0;
}

neo4j_result_stream_t *neo4j_run_in_db(neo4j_connection_t *connection,
        const char *statement, neo4j_value_t params, const char *dbname)
{
   REQUIRE(connection != NULL, NULL);
   REQUIRE(statement != NULL, NULL);
   REQUIRE(neo4j_type(params) == NEO4J_MAP || neo4j_is_null(params), NULL);
   REQUIRE(dbname != NULL, NULL);

   if (connection->version < 4)
     {
       errno = NEO4J_FEATURE_UNAVAILABLE;
       char buf[128];
       sprintf(buf,"named dbs not available in protocol version %d", connection->version);
       neo4j_log_error_errno(connection->logger, (const char*) buf);
       return NULL;
     }
   g_extra_ents[0] = neo4j_map_entry("db", neo4j_string(dbname));
   g_extra_map = neo4j_map(g_extra_ents,1);
   neo4j_result_stream_t *rs = neo4j_run(connection, statement, params);
   g_extra_map = neo4j_null;
   return rs;
}

neo4j_result_stream_t *neo4j_run(neo4j_connection_t *connection,
        const char *statement, neo4j_value_t params)
{
    REQUIRE(connection != NULL, NULL);
    REQUIRE(statement != NULL, NULL);
    REQUIRE(neo4j_type(params) == NEO4J_MAP || neo4j_is_null(params), NULL);
    REQUIRE(neo4j_type(g_extra_map) == NEO4J_MAP || neo4j_is_null(g_extra_map), NULL);
    run_result_stream_t *results = run_rs_open(connection);
    if (results == NULL)
    {
        return NULL;
    }

    if (neo4j_session_run(connection, &(results->mpool), statement, params, g_extra_map,
            run_callback, results))
    {
        neo4j_log_debug_errno(results->logger, "neo4j_session_run failed");
        goto failure;
    }
    (results->refcount)++;

    if (neo4j_session_pull_all(results->connection, -1, -1,
            &(results->record_mpool), pull_all_callback, results))
    {
        neo4j_log_debug_errno(results->logger, "neo4j_session_pull_all failed");
        goto failure;
    }
    (results->refcount)++;

    results->starting = true;
    results->streaming = true;
    return &(results->_result_stream);

    int errsv;
failure:
    errsv = errno;
    run_rs_close(&(results->_result_stream));
    errno = errsv;
    return NULL;
}


neo4j_result_stream_t *neo4j_send_to_db(neo4j_connection_t *connection,
         const char *statement, neo4j_value_t params, const char *dbname)
{
  REQUIRE(connection != NULL, NULL);
  REQUIRE(statement != NULL, NULL);
  REQUIRE(neo4j_type(params) == NEO4J_MAP || neo4j_is_null(params), NULL);
  REQUIRE(dbname != NULL, NULL);

  if (connection->version < 4)
    {
      errno = NEO4J_FEATURE_UNAVAILABLE;
      char buf[128];
      sprintf(buf,"named dbs not available in protocol version %d", connection->version);
      neo4j_log_error_errno(connection->logger, (const char*) buf);
      return NULL;
    }
  g_extra_ents[0] = neo4j_map_entry("db", neo4j_string(dbname));
  g_extra_map = neo4j_map(g_extra_ents,1);
  neo4j_result_stream_t *rs = neo4j_send(connection, statement, params);
  g_extra_map = neo4j_null;
  return rs;
}

neo4j_result_stream_t *neo4j_send(neo4j_connection_t *connection,
        const char *statement, neo4j_value_t params)
{
    REQUIRE(connection != NULL, NULL);
    REQUIRE(statement != NULL, NULL);
    REQUIRE(neo4j_type(params) == NEO4J_MAP || neo4j_is_null(params), NULL);

    run_result_stream_t *results = run_rs_open(connection);
    if (results == NULL)
    {
        return NULL;
    }

    if (neo4j_session_run(connection, &(results->mpool), statement, params, g_extra_map,
            run_callback, results))
    {
        neo4j_log_debug_errno(results->logger, "neo4j_connection_run failed");
        goto failure;
    }
    (results->refcount)++;

    if (neo4j_session_discard_all(results->connection, -1, -1,
            &(results->mpool), discard_all_callback, results))
    {
        neo4j_log_debug_errno(results->logger,
                "neo4j_connection_discard_all failed");
        goto failure;
    }
    (results->refcount)++;

    results->starting = true;
    results->streaming = true;
    return &(results->_result_stream);

    int errsv;
failure:
    errsv = errno;
    run_rs_close(&(results->_result_stream));
    errno = errsv;
    return NULL;
}

run_result_stream_t *run_rs_open(neo4j_connection_t *connection)
{
    assert(connection != NULL);
    neo4j_config_t *config = connection->config;

    run_result_stream_t *results = neo4j_calloc(config->allocator,
            NULL, 1, sizeof(run_result_stream_t));

    results->connection = connection;
    results->logger = neo4j_get_logger(config, "results");
    results->allocator = config->allocator;
    results->mpool = neo4j_std_mpool(config);
    results->record_mpool = neo4j_std_mpool(config);
    results->statement_type = -1;
    results->refcount = 1;

    results->job.abort = abort_job;
    if (neo4j_attach_job(connection, &(results->job)))
    {
        neo4j_log_debug_errno(results->logger,
                "failed to attach job to connection");
        goto failure;
    }

    neo4j_result_stream_t *result_stream = &(results->_result_stream);
    result_stream->check_failure = run_rs_check_failure;
    result_stream->error_code = run_rs_error_code;
    result_stream->error_message = run_rs_error_message;
    result_stream->failure_details = run_rs_failure_details;
    result_stream->nfields = run_rs_nfields;
    result_stream->fieldname = run_rs_fieldname;
    result_stream->fetch_next = run_rs_fetch_next;
    result_stream->peek = run_rs_peek;
    result_stream->count = run_rs_count;
    result_stream->available_after = run_rs_available_after;
    result_stream->consumed_after = run_rs_consumed_after;
    result_stream->statement_type = run_rs_statement_type;
    result_stream->statement_plan = run_rs_statement_plan;
    result_stream->update_counts = run_rs_update_counts;
    result_stream->close = run_rs_close;
    return results;

    int errsv;
failure:
    errsv = errno;
    run_rs_close(&(results->_result_stream));
    errno = errsv;
    return NULL;
}

build/lib/src/result_stream.c  view on Meta::CPAN

            run_result_stream_t, _result_stream);
    REQUIRE(results != NULL, NULL);
    return &(results->failure_details);
}


unsigned int run_rs_nfields(neo4j_result_stream_t *self)
{
    run_result_stream_t *results = container_of(self,
            run_result_stream_t, _result_stream);
    REQUIRE(results != NULL, 0);

    if (results->failure != 0 || await(results, &(results->starting)))
    {
        assert(results->failure != 0);
        errno = results->failure;
        return 0;
    }
    return results->nfields;
}


const char *run_rs_fieldname(neo4j_result_stream_t *self,
        unsigned int index)
{
    run_result_stream_t *results = container_of(self,
            run_result_stream_t, _result_stream);
    REQUIRE(results != NULL, NULL);

    if (results->failure != 0 || await(results, &(results->starting)))
    {
        assert(results->failure != 0);
        errno = results->failure;
        return NULL;
    }
    assert(results->fields != NULL);
    if (index >= results->nfields)
    {
        errno = EINVAL;
        return NULL;
    }
    assert(results->fields != NULL);
    return results->fields[index];
}


neo4j_result_t *run_rs_fetch_next(neo4j_result_stream_t *self)
{
    run_result_stream_t *results = container_of(self,
            run_result_stream_t, _result_stream);
    REQUIRE(results != NULL, NULL);

    if (results->last_fetched != NULL)
    {
        result_record_release(results->last_fetched);
        results->last_fetched = NULL;
    }

    if (results->records == NULL)
    {
        if (!results->streaming)
        {
            errno = results->failure;
            return NULL;
        }
        assert(results->failure == 0);
        results->awaiting_records = 1;
        if (await(results, &(results->awaiting_records)))
        {
            errno = results->failure;
            return NULL;
        }
        if (results->records == NULL)
        {
            assert(!results->streaming);
            errno = results->failure;
            return NULL;
        }
    }

    result_record_t *record = results->records;
    results->records = record->next;
    --(results->records_depth);
    if (results->records == NULL)
    {
        assert(results->records_depth == 0);
        results->records_tail = NULL;
    }
    record->next = NULL;

    results->last_fetched = record;
    return &(record->_result);
}


neo4j_result_t *run_rs_peek(neo4j_result_stream_t *self, unsigned int depth)
{
    run_result_stream_t *results = container_of(self,
            run_result_stream_t, _result_stream);
    REQUIRE(results != NULL, NULL);

    if (results->records_depth <= depth)
    {
        if (!results->streaming)
        {
            errno = results->failure;
            return NULL;
        }

        assert(results->failure == 0);
        results->awaiting_records = depth - results->records_depth + 1;
        if (await(results, &(results->awaiting_records)))
        {
            errno = results->failure;
            return NULL;
        }

        if (results->records_depth <= depth)
        {
            assert(!results->streaming);
            errno = results->failure;
            return NULL;
        }
    }

    result_record_t *record = results->records;
    assert(record != NULL);
    for (unsigned int i = depth; i > 0; --i)
    {
        record = record->next;
        assert(record != NULL);
    }
    return &(record->_result);
}


unsigned long long run_rs_count(neo4j_result_stream_t *self)
{
    run_result_stream_t *results = container_of(self,
            run_result_stream_t, _result_stream);
    REQUIRE(results != NULL, 0);
    return results->nrecords;
}


unsigned long long run_rs_available_after(neo4j_result_stream_t *self)
{
    run_result_stream_t *results = container_of(self,
            run_result_stream_t, _result_stream);
    REQUIRE(results != NULL, 0);

    if (results->failure != 0 || await(results, &(results->starting)))
    {
        assert(results->failure != 0);
        errno = results->failure;
        return 0;
    }

    return results->available_after;
}


unsigned long long run_rs_consumed_after(neo4j_result_stream_t *self)
{
    run_result_stream_t *results = container_of(self,
            run_result_stream_t, _result_stream);
    REQUIRE(results != NULL, 0);

    if (results->failure != 0 || await(results, &(results->streaming)))
    {
        assert(results->failure != 0);
        errno = results->failure;
        return 0;
    }

    return results->consumed_after;
}


int run_rs_statement_type(neo4j_result_stream_t *self)
{
    run_result_stream_t *results = container_of(self,
            run_result_stream_t, _result_stream);
    REQUIRE(results != NULL, -1);

    if (results->failure != 0 || await(results, &(results->streaming)))
    {
        assert(results->failure != 0);
        errno = results->failure;
        return -1;
    }

    return results->statement_type;
}


struct neo4j_statement_plan *run_rs_statement_plan(neo4j_result_stream_t *self)
{
    run_result_stream_t *results = container_of(self,
            run_result_stream_t, _result_stream);
    REQUIRE(results != NULL, NULL);

    if (results->failure != 0 || await(results, &(results->streaming)))
    {
        assert(results->failure != 0);
        errno = results->failure;
        return NULL;
    }

    if (results->statement_plan == NULL)
    {
        errno = NEO4J_NO_PLAN_AVAILABLE;
        return NULL;
    }
    return neo4j_statement_plan_retain(results->statement_plan);
}


struct neo4j_update_counts run_rs_update_counts(neo4j_result_stream_t *self)
{
    run_result_stream_t *results = container_of(self,
            run_result_stream_t, _result_stream);
    if (results == NULL)
    {
        errno = EINVAL;
        goto failure;
    }

    if (results->failure != 0 || await(results, &(results->streaming)))
    {
        assert(results->failure != 0);
        errno = results->failure;
        goto failure;
    }

    return results->update_counts;

    struct neo4j_update_counts counts;
failure:
    memset(&counts, 0, sizeof(counts));
    return counts;
}


int run_rs_close(neo4j_result_stream_t *self)
{
    run_result_stream_t *results = container_of(self,
            run_result_stream_t, _result_stream);
    REQUIRE(results != NULL, -1);

    results->streaming = false;
    assert(results->refcount > 0);
    --(results->refcount);
    int err = await(results, &(results->refcount));
    // even if await fails, queued messages should still be drained
    assert(results->refcount == 0);

    if (results->connection != NULL)
    {
        neo4j_detach_job(results->connection, (neo4j_job_t *)&(results->job));
        results->connection = NULL;
    }

    if (results->last_fetched != NULL)
    {
        result_record_release(results->last_fetched);
        results->last_fetched = NULL;
    }
    while (results->records != NULL)
    {
        result_record_t *next = results->records->next;
        result_record_release(results->records);
        results->records = next;
    }

    neo4j_statement_plan_release(results->statement_plan);
    results->statement_plan = NULL;
    neo4j_logger_release(results->logger);
    results->logger = NULL;
    neo4j_mpool_drain(&(results->record_mpool));
    neo4j_mpool_drain(&(results->mpool));
    neo4j_free(results->allocator, results);
    return err;
}


neo4j_value_t run_result_field(const neo4j_result_t *self,
        unsigned int index)
{
    const result_record_t *record = container_of(self,
            result_record_t, _result);
    REQUIRE(record != NULL, neo4j_null);
    return neo4j_list_get(record->list, index);
}


neo4j_result_t *run_result_retain(neo4j_result_t *self)
{
    result_record_t *record = container_of(self,
            result_record_t, _result);
    REQUIRE(record != NULL, NULL);
    (record->refcount)++;
    return self;
}


void run_result_release(neo4j_result_t *self)
{
    result_record_t *record = container_of(self,
            result_record_t, _result);
    result_record_release(record);
}


void abort_job(neo4j_job_t *job, int err)
{
    run_result_stream_t *results = container_of(job,
            run_result_stream_t, job);
    if (results == NULL || results->connection == NULL)
    {
        return;
    }

    job->next = NULL;
    results->connection = NULL;
    if (results->streaming && results->failure == 0)
    {
        set_failure(results, err);
    }
}


int run_callback(void *cdata, neo4j_message_type_t type,
        const neo4j_value_t *argv, uint16_t argc)
{
    assert(cdata != NULL);
    assert(argc == 0 || argv != NULL);
    run_result_stream_t *results = (run_result_stream_t *)cdata;
    neo4j_logger_t *logger = results->logger;
    neo4j_connection_t *connection = results->connection;

    results->starting = false;
    --(results->refcount);

    if (type == NULL || connection == NULL)
    {
        return 0;
    }

#ifndef NEOCLIENT_BUILD    
    if (type == NEO4J_FAILURE_MESSAGE)
#else
    if ( MESSAGE_TYPE_IS(type,FAILURE) )
#endif	
    {
        return set_eval_failure(results, "RUN", argv, argc);
    }
    
#ifndef NEOCLIENT_BUILD    
    if (type == NEO4J_IGNORED_MESSAGE)
#else
    if ( MESSAGE_TYPE_IS(type,IGNORED) )
#endif
    {
        if (results->failure == 0)
        {
            set_failure(results, NEO4J_STATEMENT_PREVIOUS_FAILURE);
        }
        return 0;
    }

    char description[128];
    snprintf(description, sizeof(description), "%s in %p (response to RUN)",
            neo4j_message_type_str(type), (void *)connection);

#ifndef NEOCLIENT_BUILD
    if (type != NEO4J_SUCCESS_MESSAGE)
#else
    if ( !MESSAGE_TYPE_IS(type,SUCCESS) )
#endif
    {
        neo4j_log_error(logger, "Unexpected %s", description);
        set_failure(results, errno = EPROTO);
        return -1;
    }

    const neo4j_value_t *metadata = neo4j_validate_metadata(argv, argc,
            description, logger);
    if (metadata == NULL)
    {
        set_failure(results, errno);
        return -1;
    }

    if (neo4j_log_is_enabled(connection->logger, NEO4J_LOG_TRACE))
    {
        neo4j_metadata_log(logger, NEO4J_LOG_TRACE, description, *metadata);
    }

    if (neo4j_meta_fieldnames(&(results->fields), &(results->nfields),
                *metadata, &(results->mpool), description, logger))
    {
        set_failure(results, errno);
        return -1;
    }

    long long available_after =
            neo4j_meta_result_available_after(*metadata, description, logger);
    if (available_after < 0)
    {
        set_failure(results, errno);
        return -1;
    }
    results->available_after = available_after;

    return 0;
}


int pull_all_callback(void *cdata, neo4j_message_type_t type,
        const neo4j_value_t *argv, uint16_t argc)
{
    assert(cdata != NULL);
    assert(argc == 0 || argv != NULL);
    run_result_stream_t *results = (run_result_stream_t *)cdata;

#ifndef NEOCLIENT_BUILD
    if (type == NEO4J_RECORD_MESSAGE)
#else
    if ( MESSAGE_TYPE_IS(type,RECORD) )
#endif
    {
        if (append_result(results, argv, argc))
        {
            neo4j_log_trace_errno(results->logger, "append_result failed");
            set_failure(results, errno);
            return -1;
        }
        return 1;
    }

    --(results->refcount);
    results->streaming = false;

    // not a record, so keep this memory along with the result stream
    if (neo4j_mpool_merge(&(results->mpool), &(results->record_mpool)) < 0)
    {
        neo4j_log_trace_errno(results->logger, "neo4j_mpool_merge failed");
        set_failure(results, errno);
        return -1;
    }

    return stream_end(results, type, "PULL_ALL", argv, argc);
}


int discard_all_callback(void *cdata, neo4j_message_type_t type,
        const neo4j_value_t *argv, uint16_t argc)
{
    assert(cdata != NULL);
    assert(argc == 0 || argv != NULL);
    run_result_stream_t *results = (run_result_stream_t *)cdata;

    --(results->refcount);
    results->streaming = false;

    return stream_end(results, type, "DISCARD_ALL", argv, argc);
}


int stream_end(run_result_stream_t *results, neo4j_message_type_t type,
        const char *src_message_type, const neo4j_value_t *argv, uint16_t argc)
{
    neo4j_logger_t *logger = results->logger;
    neo4j_connection_t *connection = results->connection;

    if (connection == NULL || type == NULL)
    {
        return 0;
    }

    neo4j_config_t *config = connection->config;

#ifndef NEOCLIENT_BUILD
    if (type == NEO4J_IGNORED_MESSAGE)
#else
    if ( MESSAGE_TYPE_IS(type,IGNORED) )
#endif
    {
        if (results->failure == 0)
        {
            neo4j_log_error(logger,
                    "Unexpected IGNORED message received in %p"
                    " (in response to %s, yet no failure occurred)",
                    (void *)connection, src_message_type);
            set_failure(results, errno = EPROTO);
            return -1;
        }
        return 0;
    }

    assert(results->failure == 0);

#ifndef NEOCLIENT_BUILD
    if (type == NEO4J_FAILURE_MESSAGE)
#else
    if ( MESSAGE_TYPE_IS(type,FAILURE) )
#endif
    {
        return set_eval_failure(results, src_message_type, argv, argc);
    }

#ifndef NEOCLIENT_BUILD
    if (type != NEO4J_SUCCESS_MESSAGE)
#else
    if ( !MESSAGE_TYPE_IS(type,SUCCESS) )
#endif
    {
        neo4j_log_error(logger,
                "Unexpected %s message received in %p"
                " (in response to %s)", neo4j_message_type_str(type),
                (void *)connection, src_message_type);
        set_failure(results, errno = EPROTO);
        return -1;
    }

build/lib/src/result_stream.c  view on Meta::CPAN

    results->statement_plan = neo4j_meta_plan(*metadata, description,
            config, logger);
    if (results->statement_plan == NULL && errno != NEO4J_NO_PLAN_AVAILABLE)
    {
        set_failure(results, errno);
        return -1;
    }

    if (neo4j_meta_update_counts(&(results->update_counts), *metadata,
                description, logger))
    {
        set_failure(results, errno);
        return -1;
    }

    return 0;
}


int await(run_result_stream_t *results, const unsigned int *condition)
{
    if (*condition > 0 && neo4j_session_sync(results->connection, condition))
    {
        set_failure(results, errno);
        return -1;
    }
    return 0;
}


int append_result(run_result_stream_t *results,
        const neo4j_value_t *argv, uint16_t argc)
{
    assert(results != NULL);
    neo4j_connection_t *connection = results->connection;

    if (argc != 1)
    {
        neo4j_log_error(results->logger,
                "Invalid number of fields in RECORD message received in %p",
                (void *)connection);
        errno = EPROTO;
        return -1;
    }

    assert(argv != NULL);

    neo4j_type_t arg_type = neo4j_type(argv[0]);
    if (arg_type != NEO4J_LIST)
    {
        neo4j_log_error(results->logger,
                "Invalid field in RECORD message received in %p"
                " (got %s, expected List)", (void *)connection,
                neo4j_typestr(arg_type));
        errno = EPROTO;
        return -1;
    }

    (results->nrecords)++;

    if (!results->streaming)
    {
        // discard memory for the record
        neo4j_mpool_drain(&(results->record_mpool));
        return 0;
    }

    assert(connection != NULL);
    neo4j_config_t *config = connection->config;

    result_record_t *record = neo4j_mpool_calloc(&(results->record_mpool),
            1, sizeof(result_record_t));
    if (record == NULL)
    {
        return -1;
    }

    record->refcount = 1;

    // save memory for the record with the record
    record->mpool = results->record_mpool;
    results->record_mpool = neo4j_std_mpool(config);

    record->list = argv[0];
    record->next = NULL;

    neo4j_result_t *result = &(record->_result);
    result->field = run_result_field;
    result->retain = run_result_retain;
    result->release = run_result_release;

    if (results->records == NULL)
    {
        assert(results->records_tail == NULL);
        assert(results->records_depth == 0);
        results->records = record;
        results->records_tail = record;
    }
    else
    {
        results->records_tail->next = record;
        results->records_tail = record;
    }
    ++(results->records_depth);

    if (results->awaiting_records > 0)
    {
        --(results->awaiting_records);
    }

    return 0;
}


void result_record_release(result_record_t *record)
{
    assert(record->refcount > 0);
    if (--(record->refcount) == 0)
    {
        // record was allocated in its own pool, so draining the pool
        // deallocates the record - so we have to copy the pool out first
        // or it'll be deallocated whist still draining
        neo4j_mpool_t mpool = record->mpool;
        neo4j_mpool_drain(&mpool);
    }
}


int set_eval_failure(run_result_stream_t *results, const char *src_message_type,
        const neo4j_value_t *argv, uint16_t argc)
{
    assert(results != NULL);

    if (results->failure != 0)
    {
        return 0;
    }

    set_failure(results, NEO4J_STATEMENT_EVALUATION_FAILED);

    char description[128];
    snprintf(description, sizeof(description), "FAILURE in %p (response to %s)",
            (void *)(results->connection), src_message_type);

    const neo4j_value_t *metadata = neo4j_validate_metadata(argv, argc,
            description, results->logger);
    if (metadata == NULL)
    {
        set_failure(results, errno);
        return -1;
    }

    if (neo4j_log_is_enabled(results->logger, NEO4J_LOG_TRACE))
    {
        neo4j_metadata_log(results->logger, NEO4J_LOG_TRACE, description,
                *metadata);
    }

    if (neo4j_meta_failure_details(&(results->failure_details), *metadata,
                &(results->mpool), description, results->logger))
    {
        set_failure(results, errno);
        return -1;
    }

    return 0;
}


void set_failure(run_result_stream_t *results, int error)
{
    assert(results != NULL);
    assert(error != 0);
    results->failure = error;
    results->streaming = false;
    results->awaiting_records = 0;
    memset(&(results->failure_details), 0, sizeof(results->failure_details));
}



( run in 1.718 second using v1.01-cache-2.11-cpan-39bf76dae61 )