Async-Redis

 view release on metacpan or  search on metacpan

t/50-pubsub/unified-reader.t  view on Meta::CPAN

    run { $sub_c->connect };
    run { $pub_c->connect };
    my $ch = 'u-iter-' . $$;
    my $sub = run { $sub_c->subscribe($ch) };

    for my $i (1..3) { run { $pub_c->publish($ch, "m$i") } }

    my @got;
    for my $i (1..3) {
        my $m = run { $sub->next };
        push @got, $m->{data};
    }
    is \@got, ['m1', 'm2', 'm3'], 'FIFO order preserved';
    $sub->_close;
    $sub_c->disconnect;
    $pub_c->disconnect;
};

subtest 'iterator mode: multiple channels' => sub {
    my $sub_c = new_redis;
    my $pub_c = new_redis;
    run { $sub_c->connect };
    run { $pub_c->connect };
    my $cha = 'u-multi-a-' . $$;
    my $chb = 'u-multi-b-' . $$;
    my $sub = run { $sub_c->subscribe($cha, $chb) };
    run { $pub_c->publish($cha, 'va') };
    run { $pub_c->publish($chb, 'vb') };

    my %by_ch;
    for my $i (1..2) {
        my $m = run { $sub->next };
        $by_ch{$m->{channel}} = $m->{data};
    }
    is $by_ch{$cha}, 'va', 'channel a';
    is $by_ch{$chb}, 'vb', 'channel b';
    $sub->_close;
    $sub_c->disconnect;
    $pub_c->disconnect;
};

subtest 'callback mode: fire-and-forget invocations' => sub {
    my $sub_c = new_redis;
    my $pub_c = new_redis;
    run { $sub_c->connect };
    run { $pub_c->connect };
    my $ch = 'u-cb-' . $$;
    my $sub = run { $sub_c->subscribe($ch) };

    my @got;
    my $done = Future->new;
    $sub->on_message(sub {
        my ($self, $msg) = @_;
        push @got, $msg->{data};
        $done->done if @got == 3 && !$done->is_ready;
        return;
    });

    for my $i (1..3) { run { $pub_c->publish($ch, "cb$i") } }
    run { Future->wait_any($done, Future::IO->sleep(2)) };
    is \@got, ['cb1', 'cb2', 'cb3'], 'all 3 callbacks fired in order';
    $sub->_close;
    $sub_c->disconnect;
    $pub_c->disconnect;
};

subtest 'callback mode: Future-returning backpressure serializes invocations' => sub {
    my $sub_c = new_redis;
    my $pub_c = new_redis;
    run { $sub_c->connect };
    run { $pub_c->connect };
    my $ch = 'u-bp-' . $$;
    my $sub = run { $sub_c->subscribe($ch) };

    my @got;
    my @gates;
    $sub->on_message(sub {
        my ($self, $msg) = @_;
        push @got, $msg->{data};
        my $gate = Future->new;
        push @gates, $gate;
        return $gate;
    });

    for my $i (1..3) { run { $pub_c->publish($ch, "bp$i") } }
    # Pump event loop briefly so first message can arrive
    run { Future::IO->sleep(0.1) };
    is scalar(@got), 1, 'only first callback fired while gate pending';

    # Release gate for first, wait for second
    shift(@gates)->done if @gates;
    run { Future::IO->sleep(0.1) };
    shift(@gates)->done if @gates;
    run { Future::IO->sleep(0.1) };

    # All should eventually arrive
    for my $i (1..10) {
        last if @got >= 3;
        run { Future::IO->sleep(0.1) };
    }
    # Release any remaining gates
    $_->done for @gates;
    is scalar(@got), 3, 'all 3 eventually invoked';
    $sub->_close;
    $sub_c->disconnect;
    $pub_c->disconnect;
};

subtest 'pattern subscribe delivers pmessages with pattern field' => sub {
    my $sub_c = new_redis;
    my $pub_c = new_redis;
    run { $sub_c->connect };
    run { $pub_c->connect };
    my $pattern = 'u-pat-*-' . $$;
    my $sub = run { $sub_c->psubscribe($pattern) };
    my $ch = 'u-pat-hello-' . $$;
    run { $pub_c->publish($ch, 'patval') };

    my $m = run { $sub->next };
    is $m->{type}, 'pmessage', 'pmessage type';
    is $m->{channel}, $ch, 'actual channel';



( run in 1.108 second using v1.01-cache-2.11-cpan-140bd7fdf52 )