EV-Etcd
view release on metacpan or search on metacpan
eg/distributed_mutex.pl view on Meta::CPAN
#!/usr/bin/env perl
#
# distributed_mutex.pl - Acquire-with-timeout / hold / release pattern for the
# etcd Lock service, with explicit handling for the "lease died while holding"
# case (process froze long enough that etcd revoked the lease â caller's
# critical section is no longer protected).
#
# Usage:
# $ perl eg/distributed_mutex.pl /jobs/migration "doing work for 5s"
#
# Run two copies in parallel: the second one waits behind the first.
#
use v5.10;
use strict;
use warnings;
use lib 'blib/lib', 'blib/arch';
use EV;
use EV::Etcd;
my $name = $ARGV[0] // '/jobs/example';
my $work_msg = $ARGV[1] // 'critical section';
my $work_time = $ARGV[2] // 5; # seconds the lock is held
my $lock_ttl = 10; # seconds; must exceed work_time + slack
my $client = EV::Etcd->new(endpoints => ['127.0.0.1:2379'], max_retries => 5);
my ($lease_id, $lock_key, $keepalive, $work_timer, $lease_died);
# 1. Lease for the lock â if we crash, the lock auto-releases after lock_ttl
$client->lease_grant($lock_ttl, sub {
my ($r, $err) = @_;
die "lease_grant: $err->{message}\n" if $err;
$lease_id = $r->{id};
say "[$$] lease=$lease_id ttl=${lock_ttl}s";
# 2. Refresh the lease while we work
$keepalive = $client->lease_keepalive($lease_id, sub {
my (undef, $kerr) = @_;
return unless $kerr;
# Lease died â our lock is gone. Bail out before we corrupt anything.
$lease_died = 1;
warn "[$$] keepalive lost ($kerr->{message}) â aborting work\n";
graceful_exit(2);
});
# 3. Acquire the lock. The etcd Lock RPC blocks server-side until
# granted. There's no client-side cancel for unary calls, so the
# timer below abandons the response (the late callback is a no-op
# under the $acquired guard). The lease will still expire on its
# own ttl if we exit, releasing any partially-granted lock.
my $acquired;
my $start = time;
say "[$$] acquiring lock at $name ...";
$client->lock($name, $lease_id, sub {
my ($lr, $lerr) = @_;
return if $acquired++; # timer already fired
if ($lerr) {
warn "[$$] lock failed: $lerr->{message}\n";
return graceful_exit(3);
}
$lock_key = $lr->{key};
my $waited = time - $start;
say "[$$] acquired (waited ${waited}s) â running: $work_msg";
# 4. Do the work. Real code would do its critical section inside
# an ev_timer or similar so it remains async with the keepalive.
$work_timer = EV::timer($work_time, 0, sub {
return if $lease_died;
say "[$$] work complete â releasing lock";
graceful_exit(0);
});
});
# Optional caller-side timeout: cancel after N seconds if we never get the
# lock. Without this the process can wait forever behind contention.
my $acquire_timeout = 30;
my $timer = EV::timer($acquire_timeout, 0, sub {
return if $acquired++;
warn "[$$] lock acquire timed out after ${acquire_timeout}s\n";
graceful_exit(4);
});
});
my $sigint = EV::signal('INT', sub { say "[$$] SIGINT"; graceful_exit(130) });
my $sigterm = EV::signal('TERM', sub { say "[$$] SIGTERM"; graceful_exit(143) });
EV::run;
# --------------------------------------------------------------------------
sub graceful_exit {
my $code = shift;
undef $work_timer;
# Order matters: release the lock (best-effort), revoke the lease, exit.
my $finish = sub {
$lease_id ? $client->lease_revoke($lease_id, sub { EV::break })
: EV::break;
};
if ($lock_key && !$lease_died) {
$client->unlock($lock_key, sub {
my (undef, $uerr) = @_;
warn "[$$] unlock failed: $uerr->{message}\n" if $uerr;
$finish->();
});
} else {
$finish->();
}
my $bail = EV::timer(2, 0, sub { EV::break });
EV::run;
exit $code;
}
( run in 0.370 second using v1.01-cache-2.11-cpan-ceb78f64989 )