EV-Hiredis

 view release on metacpan or  search on metacpan

deps/hiredis/test.c  view on Meta::CPAN


    /* Start event dispatching loop */
    test_cond(event_base_dispatch(base) == 0);
    event_free(timeout);
    event_base_free(base);

    /* Verify test checkpoints */
    assert(state.checkpoint == 3);
}

/* Unexpected push message, will trigger a failure */
void unexpected_push_cb(redisAsyncContext *ac, void *r) {
    (void) ac; (void) r;
    printf("Unexpected call to the PUSH callback!\n");
    exit(1);
}

static void test_pubsub_handling_resp3(struct config config) {
    test("Subscribe, handle published message and unsubscribe using RESP3: ");
    /* Setup event dispatcher with a testcase timeout */
    base = event_base_new();
    struct event *timeout = evtimer_new(base, timeout_cb, NULL);
    assert(timeout != NULL);

    evtimer_assign(timeout,base,timeout_cb,NULL);
    struct timeval timeout_tv = {.tv_sec = 10};
    evtimer_add(timeout, &timeout_tv);

    /* Connect */
    redisOptions options = get_redis_tcp_options(config);
    redisAsyncContext *ac = redisAsyncConnectWithOptions(&options);
    assert(ac != NULL && ac->err == 0);
    redisLibeventAttach(ac,base);

    /* Not expecting any push messages in this test */
    redisAsyncSetPushCallback(ac, unexpected_push_cb);

    /* Switch protocol */
    redisAsyncCommand(ac,NULL,NULL,"HELLO 3");

    /* Start subscribe */
    TestState state = {.options = &options, .resp3 = 1};
    redisAsyncCommand(ac,subscribe_cb,&state,"subscribe mychannel");

    /* Make sure non-subscribe commands are handled in RESP3 */
    redisAsyncCommand(ac,integer_cb,&state,"LPUSH mylist foo");
    redisAsyncCommand(ac,integer_cb,&state,"LPUSH mylist foo");
    redisAsyncCommand(ac,integer_cb,&state,"LPUSH mylist foo");
    /* Handle an array with 3 elements as a non-subscribe command */
    redisAsyncCommand(ac,array_cb,&state,"LRANGE mylist 0 2");

    /* Start event dispatching loop */
    test_cond(event_base_dispatch(base) == 0);
    event_free(timeout);
    event_base_free(base);

    /* Verify test checkpoints */
    assert(state.checkpoint == 6);
}

/* Subscribe callback for test_command_timeout_during_pubsub:
 * - a subscribe response triggers a published message
 * - the published message triggers a command that times out
 * - the command timeout triggers a disconnect */
void subscribe_with_timeout_cb(redisAsyncContext *ac, void *r, void *privdata) {
    redisReply *reply = r;
    TestState *state = privdata;

    /* The non-clean disconnect should trigger the
     * subscription callback with a NULL reply. */
    if (reply == NULL) {
        state->checkpoint++;
        event_base_loopbreak(base);
        return;
    }

    assert(reply->type == (state->resp3 ? REDIS_REPLY_PUSH : REDIS_REPLY_ARRAY) &&
           reply->elements == 3);

    if (strcmp(reply->element[0]->str,"subscribe") == 0) {
        assert(strcmp(reply->element[1]->str,"mychannel") == 0 &&
               reply->element[2]->str == NULL);
        publish_msg(state->options,"mychannel","Hello!");
        state->checkpoint++;
    } else if (strcmp(reply->element[0]->str,"message") == 0) {
        assert(strcmp(reply->element[1]->str,"mychannel") == 0 &&
               strcmp(reply->element[2]->str,"Hello!") == 0);
        state->checkpoint++;

        /* Send a command that will trigger a timeout */
        redisAsyncCommand(ac,null_cb,state,"DEBUG SLEEP 3");
        redisAsyncCommand(ac,null_cb,state,"LPUSH mylist foo");
    } else {
        printf("Unexpected pubsub command: %s\n", reply->element[0]->str);
        exit(1);
    }
}

static void test_command_timeout_during_pubsub(struct config config) {
    test("Command timeout during Pub/Sub: ");
    /* Setup event dispatcher with a testcase timeout */
    base = event_base_new();
    struct event *timeout = evtimer_new(base,timeout_cb,NULL);
    assert(timeout != NULL);

    evtimer_assign(timeout,base,timeout_cb,NULL);
    struct timeval timeout_tv = {.tv_sec = 10};
    evtimer_add(timeout,&timeout_tv);

    /* Connect */
    redisOptions options = get_redis_tcp_options(config);
    redisAsyncContext *ac = redisAsyncConnectWithOptions(&options);
    assert(ac != NULL && ac->err == 0);
    redisLibeventAttach(ac,base);

    /* Configure a command timout */
    struct timeval command_timeout = {.tv_sec = 2};
    redisAsyncSetTimeout(ac,command_timeout);

    /* Not expecting any push messages in this test */
    redisAsyncSetPushCallback(ac,unexpected_push_cb);

    /* Switch protocol */
    redisAsyncCommand(ac,NULL,NULL,"HELLO 3");

    /* Start subscribe */
    TestState state = {.options = &options, .resp3 = 1};
    redisAsyncCommand(ac,subscribe_with_timeout_cb,&state,"subscribe mychannel");

    /* Start event dispatching loop */
    assert(event_base_dispatch(base) == 0);
    event_free(timeout);
    event_base_free(base);

    /* Verify test checkpoints */
    test_cond(state.checkpoint == 5);
}

/* Subscribe callback for test_pubsub_multiple_channels */
void subscribe_channel_a_cb(redisAsyncContext *ac, void *r, void *privdata) {
    redisReply *reply = r;
    TestState *state = privdata;

    assert(reply != NULL && reply->type == REDIS_REPLY_ARRAY &&
           reply->elements == 3);

    if (strcmp(reply->element[0]->str,"subscribe") == 0) {
        assert(strcmp(reply->element[1]->str,"A") == 0);
        publish_msg(state->options,"A","Hello!");
        state->checkpoint++;
    } else if (strcmp(reply->element[0]->str,"message") == 0) {
        assert(strcmp(reply->element[1]->str,"A") == 0 &&
               strcmp(reply->element[2]->str,"Hello!") == 0);
        state->checkpoint++;

        /* Unsubscribe to channels, including channel X & Z which we don't subscribe to */
        redisAsyncCommand(ac,unexpected_cb,
                          (void*)"unsubscribe should not call unexpected_cb()",
                          "unsubscribe B X A A Z");
        /* Unsubscribe to patterns, none which we subscribe to */
        redisAsyncCommand(ac,unexpected_cb,
                          (void*)"punsubscribe should not call unexpected_cb()",
                          "punsubscribe");
        /* Send a regular command after unsubscribing, then disconnect */
        state->disconnect = 1;
        redisAsyncCommand(ac,integer_cb,state,"LPUSH mylist foo");
    } else if (strcmp(reply->element[0]->str,"unsubscribe") == 0) {
        assert(strcmp(reply->element[1]->str,"A") == 0);
        state->checkpoint++;
    } else {
        printf("Unexpected pubsub command: %s\n", reply->element[0]->str);
        exit(1);
    }
}

/* Subscribe callback for test_pubsub_multiple_channels */
void subscribe_channel_b_cb(redisAsyncContext *ac, void *r, void *privdata) {
    redisReply *reply = r;

deps/hiredis/test.c  view on Meta::CPAN

    printf("\nTesting against TCP connection (%s:%d):\n", cfg.tcp.host, cfg.tcp.port);
    cfg.type = CONN_TCP;
    test_blocking_connection(cfg);
    test_blocking_connection_timeouts(cfg);
    test_blocking_io_errors(cfg);
    test_invalid_timeout_errors(cfg);
    test_append_formatted_commands(cfg);
    test_tcp_options(cfg);
    if (throughput) test_throughput(cfg);

    printf("\nTesting against Unix socket connection (%s): ", cfg.unix_sock.path);
    if (test_unix_socket) {
        printf("\n");
        cfg.type = CONN_UNIX;
        test_blocking_connection(cfg);
        test_blocking_connection_timeouts(cfg);
        test_blocking_io_errors(cfg);
        test_invalid_timeout_errors(cfg);
        if (throughput) test_throughput(cfg);
    } else {
        test_skipped();
    }

#ifdef HIREDIS_TEST_SSL
    if (cfg.ssl.port && cfg.ssl.host) {

        redisInitOpenSSL();
        _ssl_ctx = redisCreateSSLContext(cfg.ssl.ca_cert, NULL, cfg.ssl.cert, cfg.ssl.key, NULL, NULL);
        assert(_ssl_ctx != NULL);

        printf("\nTesting against SSL connection (%s:%d):\n", cfg.ssl.host, cfg.ssl.port);
        cfg.type = CONN_SSL;

        test_blocking_connection(cfg);
        test_blocking_connection_timeouts(cfg);
        test_blocking_io_errors(cfg);
        test_invalid_timeout_errors(cfg);
        test_append_formatted_commands(cfg);
        if (throughput) test_throughput(cfg);

        redisFreeSSLContext(_ssl_ctx);
        _ssl_ctx = NULL;
    }
#endif

#ifdef HIREDIS_TEST_ASYNC
    cfg.type = CONN_TCP;
    printf("\nTesting asynchronous API against TCP connection (%s:%d):\n", cfg.tcp.host, cfg.tcp.port);
    cfg.type = CONN_TCP;

    int major;
    redisContext *c = do_connect(cfg);
    get_redis_version(c, &major, NULL);
    disconnect(c, 0);

    test_pubsub_handling(cfg);
    test_pubsub_multiple_channels(cfg);
    test_monitor(cfg);
    if (major >= 6) {
        test_pubsub_handling_resp3(cfg);
        test_command_timeout_during_pubsub(cfg);
    }
#endif /* HIREDIS_TEST_ASYNC */

    cfg.type = CONN_TCP;
    printf("\nTesting asynchronous API using polling_adapter TCP (%s:%d):\n", cfg.tcp.host, cfg.tcp.port);
    test_async_polling(cfg);
    if (test_unix_socket) {
        cfg.type = CONN_UNIX;
        printf("\nTesting asynchronous API using polling_adapter UNIX (%s):\n", cfg.unix_sock.path);
        test_async_polling(cfg);
    }

    if (test_inherit_fd) {
        printf("\nTesting against inherited fd (%s): ", cfg.unix_sock.path);
        if (test_unix_socket) {
            printf("\n");
            cfg.type = CONN_FD;
            test_blocking_connection(cfg);
        } else {
            test_skipped();
        }
    }

    if (fails || (skips_as_fails && skips)) {
        printf("*** %d TESTS FAILED ***\n", fails);
        if (skips) {
            printf("*** %d TESTS SKIPPED ***\n", skips);
        }
        return 1;
    }

    printf("ALL TESTS PASSED (%d skipped)\n", skips);
    return 0;
}



( run in 1.472 second using v1.01-cache-2.11-cpan-39bf76dae61 )