EV-Etcd

 view release on metacpan or  search on metacpan

t/streaming.t  view on Meta::CPAN

#!/usr/bin/env perl
use strict;
use warnings;
use lib 'blib/lib', 'blib/arch';
use Test::More;

# Skip if EV not available
BEGIN {
    eval { require EV };
    plan skip_all => 'EV required' if $@;
}

use EV;
use EV::Etcd;

# Check if etcd is available
my $etcd_available = 0;
eval {
    my $client = EV::Etcd->new(
        endpoints => ['127.0.0.1:2379'],
        timeout => 2,
    );
    $client->status(sub {
        my ($resp, $err) = @_;
        $etcd_available = 1 if !$err;
        EV::break;
    });
    my $t = EV::timer(3, 0, sub { EV::break });
    EV::run;
};

plan skip_all => 'etcd not available on 127.0.0.1:2379' unless $etcd_available;

plan tests => 11;

my $client = EV::Etcd->new(
    endpoints => ['127.0.0.1:2379'],
);

my $test_prefix = "/test-streaming-$$-" . time();

# === lease_keepalive streaming tests ===

# Test 1-4: lease_keepalive receives response
my $lease_id;
my $keepalive_count = 0;

$client->lease_grant(10, sub {
    my ($resp, $err) = @_;
    ok(!$err, 'lease_grant succeeded');
    $lease_id = $resp->{id};
    diag("Granted lease: id=$lease_id, ttl=$resp->{ttl}");
    EV::break;
});
my $t1 = EV::timer(5, 0, sub { fail('lease_grant timeout'); EV::break });
EV::run;
undef $t1;  # Cancel timer

SKIP: {
    skip "no lease id", 4 unless $lease_id;

    my $keepalive_handle = $client->lease_keepalive($lease_id, sub {
        my ($resp, $err) = @_;
        if ($err) {
            diag("Keepalive error: " . (ref($err) ? $err->{message} : $err));
            return;
        }
        $keepalive_count++;
        diag("Keepalive response #$keepalive_count: ttl=$resp->{ttl}");
        if ($keepalive_count >= 1) {
            EV::break;
        }
    });

    ok(defined $keepalive_handle, 'lease_keepalive returns handle');
    isa_ok($keepalive_handle, 'EV::Etcd::Keepalive', 'keepalive handle');

    # Wait for at least one keepalive response
    my $keepalive_timer = EV::timer 5, 0, sub {
        diag("Keepalive timer expired, received $keepalive_count responses");
        EV::break;
    };
    EV::run;

    ok($keepalive_count >= 1, "received at least 1 keepalive response (got $keepalive_count)");

    # Cleanup lease (this will end the keepalive)
    $client->lease_revoke($lease_id, sub {
        my ($resp, $err) = @_;
        diag($err ? "Revoke failed" : "Lease revoked");
    });
    pass('cleanup initiated');
}

# === Watch streaming tests ===

# Test 5-9: watch receives multiple events
my $watch_key = "$test_prefix/watch-stream-test";
my $watch_count = 0;
my $watch_target = 3;

my $watch_handle;
$watch_handle = $client->watch($watch_key, sub {
    my ($resp, $err) = @_;
    if ($err) {
        diag("Watch error: " . (ref($err) ? $err->{message} : $err));
        return;
    }
    if ($resp->{created}) {
        diag("Watch created with id=$resp->{watch_id}");
        return;
    }
    my $event_count = scalar @{$resp->{events} || []};
    $watch_count += $event_count;
    diag("Watch received $event_count event(s), total=$watch_count");
    if ($watch_count >= $watch_target) {
        EV::break;
    }
});

ok(defined $watch_handle, 'watch returns handle');
isa_ok($watch_handle, 'EV::Etcd::Watch', 'watch handle');

# Send multiple put events
for my $i (1..$watch_target) {
    $client->put($watch_key, "value-$i", sub {
        my ($resp, $err) = @_;
        diag("Put $i " . ($err ? "failed" : "completed"));
    });
}

my $watch_timer = EV::timer 10, 0, sub {
    diag("Watch timer expired, received $watch_count events");
    EV::break;
};
EV::run;

ok($watch_count >= 1, "watch received at least 1 event (got $watch_count)");
cmp_ok($watch_count, '>=', $watch_target, "watch received all $watch_target events (streaming works)");

# Test watch cancel - uses callback-based cancel API
my $cancel_done = 0;
$watch_handle->cancel(sub {
    my ($resp, $err) = @_;
    $cancel_done = 1;
    diag("Watch cancel callback: " . ($err ? "error: $err" : "success"));
    EV::break;
});
my $cancel_timer = EV::timer 5, 0, sub {
    diag("Cancel timer expired");
    EV::break;
};
EV::run;

ok($cancel_done, 'watch cancel completed');

# Cleanup
$client->delete("$test_prefix/", { prefix => 1 }, sub {
    diag("Cleanup completed");
});
pass('cleanup completed');

done_testing();



( run in 1.422 second using v1.01-cache-2.11-cpan-75ffa21a3d4 )