DBD-Patroni
view release on metacpan or search on metacpan
t/03-failover.t view on Meta::CPAN
my $dbname = $ENV{PGDATABASE} || 'testdb';
my $sslmode = $ENV{PGSSLMODE} || 'disable';
my $dsn = "dbi:Patroni:dbname=$dbname;sslmode=$sslmode;patroni_url=$patroni_urls";
# Helper to get cluster info
sub get_cluster_info {
my $ua = LWP::UserAgent->new( timeout => 5 );
for my $url ( split /,/, $patroni_urls ) {
my $resp = $ua->get($url);
next unless $resp->is_success;
my $data = eval { decode_json( $resp->decoded_content ) };
next unless $data && $data->{members};
my ($leader) = grep { $_->{role} eq 'leader' } @{ $data->{members} };
my @replicas = grep { $_->{role} ne 'leader' } @{ $data->{members} };
return {
leader => $leader,
replicas => \@replicas,
members => $data->{members},
};
}
return undef;
}
# Helper to trigger failover via Patroni API
sub trigger_failover {
my ($new_leader) = @_;
my $ua = LWP::UserAgent->new( timeout => 10 );
# Find current leader's API endpoint
my $info = get_cluster_info();
return 0 unless $info && $info->{leader};
my $leader_host = $info->{leader}{host};
my $failover_url = "http://${leader_host}:8008/failover";
my $resp = $ua->post(
$failover_url,
'Content-Type' => 'application/json',
Content => encode_json( { candidate => $new_leader } ),
);
diag( "Failover response: " . $resp->status_line );
diag( "Failover body: " . $resp->decoded_content ) if !$resp->is_success;
return $resp->is_success;
}
# Wait for all replicas to be running
sub wait_for_replicas {
my $max_attempts = shift || 60;
for my $i ( 1 .. $max_attempts ) {
my $info = get_cluster_info();
next unless $info;
# Count nodes that are running or streaming (replicas in sync)
my @ready =
grep { $_->{state} eq 'running' || $_->{state} eq 'streaming' }
@{ $info->{members} };
if ( @ready >= 3 ) {
diag("All 3 nodes are ready");
return 1;
}
# Show all states for debugging
my $states =
join( ", ", map { "$_->{name}:$_->{state}" } @{ $info->{members} } );
diag("Attempt $i/$max_attempts: $states");
sleep 5;
}
return 0;
}
# Wait for all nodes to be ready before starting tests
diag("Waiting for all cluster nodes to be ready...");
wait_for_replicas(60); # 60 attempts x 5 seconds = 5 minutes max
# Test 1: Detect current leader
subtest 'Detect current leader' => sub {
my $info = get_cluster_info();
ok( $info, 'Got cluster info' );
ok( $info->{leader}, 'Found leader' );
ok( @{ $info->{replicas} } >= 1, 'Found at least one replica' );
diag( "Current leader: " . $info->{leader}{host} );
diag( "Replicas: "
. join( ", ", map { $_->{host} } @{ $info->{replicas} } ) );
};
# Test 2: Connection survives leader change
subtest 'Connection survives leader change' => sub {
my $dbh = DBI->connect( $dsn, $user, $pass );
ok( $dbh, 'Initial connection' );
# Insert a test row
my $name = "failover_test_" . time();
$dbh->do( "INSERT INTO users (name) VALUES (?)", undef, $name );
# Get current leader
my $info = get_cluster_info();
my $old_leader = $info->{leader}{host};
diag("Old leader: $old_leader");
# Choose a new leader from replicas
my @replicas = @{ $info->{replicas} };
skip "Need at least one replica for failover test", 3 unless @replicas;
my $new_leader = $replicas[0]{name};
diag("Triggering failover to: $new_leader");
# Trigger failover
my $failover_ok = trigger_failover($new_leader);
ok( $failover_ok, 'Failover triggered' );
# Wait for failover to complete
t/03-failover.t view on Meta::CPAN
$sth->execute;
my ($msg) = $sth->fetchrow_array;
like( $msg, qr/After failover/, 'Read operation works after failover' );
$sth->finish;
$dbh->disconnect;
};
# Test 3: New connection after failover
subtest 'New connection after failover' => sub {
# Create a fresh connection after the failover
my $dbh = eval { DBI->connect( $dsn, $user, $pass ) };
ok( !$@, 'New connection after failover' ) or diag("Error: $@");
ok( $dbh, 'Got database handle' );
# Verify read/write works
my $name = "post_failover_" . time();
$dbh->do( "INSERT INTO users (name) VALUES (?)", undef, $name );
sleep 1;
my $sth = $dbh->prepare("SELECT name FROM users WHERE name = ?");
$sth->execute($name);
my ($result) = $sth->fetchrow_array;
is( $result, $name, 'Read/Write works on new connection' );
$sth->finish;
$dbh->disconnect;
};
# Test 4: Cached connection survives failover
subtest 'Cached connection survives failover' => sub {
# Wait for cluster to stabilize after previous tests
diag("Waiting for cluster to stabilize before cached connection test...");
wait_for_replicas(30);
# Get a cached connection using DBI->connect_cached
my $dbh1 = DBI->connect_cached( $dsn, $user, $pass, { RaiseError => 1 } );
ok( $dbh1, 'Got cached connection' );
# Verify it works
my $name1 = "cached_before_" . time();
$dbh1->do( "INSERT INTO users (name) VALUES (?)", undef, $name1 );
diag("Inserted: $name1");
# Get cluster state and trigger failover
my $info = get_cluster_info();
my $old_leader = $info->{leader}{host};
diag("Current leader before cached failover: $old_leader");
my @replicas = @{ $info->{replicas} };
skip "Need at least one replica for cached failover test", 4
unless @replicas;
# Find a replica that is ready (running or streaming)
my ($ready_replica) =
grep { $_->{state} eq 'running' || $_->{state} eq 'streaming' } @replicas;
skip "No ready replica for failover", 4 unless $ready_replica;
my $new_leader = $ready_replica->{name};
diag("Triggering failover to: $new_leader");
my $failover_ok = trigger_failover($new_leader);
ok( $failover_ok, 'Failover triggered for cached test' );
# Wait for failover
diag("Waiting for failover to complete...");
sleep 10;
# Verify leader changed
$info = get_cluster_info();
my $current_leader = $info->{leader}{host};
diag("New leader after cached failover: $current_leader");
isnt( $current_leader, $old_leader, 'Leader has changed (cached test)' );
# Get another cached connection - should return the same handle or reconnect
my $dbh2 = DBI->connect_cached( $dsn, $user, $pass, { RaiseError => 1 } );
ok( $dbh2, 'Got second cached connection' );
# Try to use the cached connection - it should auto-recover
my $name2 = "cached_after_" . time();
my $rv = eval {
$dbh2->do( "INSERT INTO users (name) VALUES (?)", undef, $name2 );
};
if ($@) {
diag("First cached attempt failed (expected): $@");
# Retry - the reconnect should happen
$rv = eval {
$dbh2->do( "INSERT INTO users (name) VALUES (?)", undef, $name2 );
};
}
ok( $rv, 'Write via cached connection works after failover' );
# Verify data was written
my $sth = $dbh2->prepare("SELECT name FROM users WHERE name = ?");
$sth->execute($name2);
my ($result) = $sth->fetchrow_array;
$sth->finish;
is( $result, $name2, 'Read via cached connection works after failover' );
};
# Test 5: Recovery from read-only error (simulates connecting to replica as leader)
subtest 'Recovery from read-only error' => sub {
# Get cluster info to find a replica
my $info = get_cluster_info();
skip "Need replicas for read-only test", 3
unless $info && @{ $info->{replicas} };
my $replica = $info->{replicas}[0];
diag("Testing read-only recovery using replica: $replica->{host}");
# Create a DBD::Patroni connection normally
my $dbh = DBI->connect( $dsn, $user, $pass );
ok( $dbh, 'Got initial connection' );
# Manually replace the leader connection with a replica connection
# This simulates what happens when the leader becomes a replica after failover
my $replica_dsn =
"dbi:Pg:dbname=$dbname;host=$replica->{host};port=$replica->{port};sslmode=$sslmode";
my $replica_dbh =
DBI->connect( $replica_dsn, $user, $pass, { RaiseError => 1 } );
skip "Cannot connect to replica directly", 2 unless $replica_dbh;
# Save the real leader and replace with replica (access internal attribute)
my $real_leader = $dbh->{patroni_leader_dbh};
$dbh->{patroni_leader_dbh} = $replica_dbh;
diag("Replaced leader connection with replica connection");
# Try to write - this should fail with read-only error and trigger automatic recovery
# The _with_retry mechanism should:
# 1. Detect the read-only error
# 2. Call _rediscover_cluster to reconnect to the real leader
# 3. Retry the operation and succeed
my $rv = eval {
$dbh->do( "INSERT INTO logs (message) VALUES (?)",
undef, "read-only test recovered" );
};
if ($@) {
diag("Write attempt error: $@");
}
ok( $rv, 'Write succeeded after read-only recovery' );
# Verify the data was written
my $sth = $dbh->prepare(
"SELECT message FROM logs WHERE message LIKE 'read-only test%' ORDER BY id DESC LIMIT 1"
);
$sth->execute;
my ($msg) = $sth->fetchrow_array;
$sth->finish;
like( $msg, qr/read-only test/, 'Data was written after recovery' );
# Cleanup
$replica_dbh->disconnect if $replica_dbh;
$dbh->disconnect;
};
# Test 6: Verify cluster state (informational - cluster may still be recovering)
subtest 'Verify cluster state after tests' => sub {
my $info = get_cluster_info();
ok( $info, 'Cluster is accessible' );
ok( $info->{leader}, 'Cluster has a leader' );
# Count running or streaming nodes
my $ready = grep { $_->{state} eq 'running' || $_->{state} eq 'streaming' }
@{ $info->{members} };
diag("Final cluster state:");
diag( " Leader: " . $info->{leader}{host} );
diag(" Ready members: $ready");
for my $m ( @{ $info->{members} } ) {
diag(" $m->{name}: $m->{state}");
}
# After failover, some nodes may still be recovering - just check we have a leader
ok( 1, 'Cluster state logged' );
};
done_testing();
( run in 0.506 second using v1.01-cache-2.11-cpan-140bd7fdf52 )