EV-Etcd
view release on metacpan or search on metacpan
lib/EV/Etcd.pm view on Meta::CPAN
require XSLoader;
XSLoader::load('EV::Etcd', $VERSION);
# Save reference to XS txn before we override it
my $_xs_txn = \&txn;
# Wrapper for txn to accept named parameters
no warnings 'redefine';
*txn = sub {
my $self = shift;
# Positional fast-path: (\@compare, \@success, \@failure, $cb)
if (@_ == 4
&& ref($_[0]) eq 'ARRAY'
&& ref($_[1]) eq 'ARRAY'
&& ref($_[2]) eq 'ARRAY'
&& ref($_[3]) eq 'CODE') {
return $_xs_txn->($self, @_);
}
# Extract trailing bare coderef only if arg count is odd
my $callback;
if (@_ % 2 == 1 && ref($_[-1]) eq 'CODE') {
$callback = pop;
}
my %args = @_;
$callback //= $args{callback};
for my $k (qw(compare success failure)) {
next unless defined $args{$k};
ref($args{$k}) eq 'ARRAY'
or Carp::croak("txn '$k' must be an array reference");
}
my $compare = $args{compare} // [];
my $success = $args{success} // [];
my $failure = $args{failure} // [];
return $_xs_txn->($self, $compare, $success, $failure, $callback);
};
use warnings 'redefine';
1;
__END__
=encoding utf8
=head1 NAME
EV::Etcd - Async etcd v3 client using native gRPC and EV/libev
=head1 SYNOPSIS
use v5.10;
use EV;
use EV::Etcd;
my $client = EV::Etcd->new(
endpoints => ['127.0.0.1:2379'],
);
# Async put
$client->put('/my/key', 'value', sub {
my ($resp, $err) = @_;
die $err->{message} if $err;
say "Put succeeded, revision: $resp->{header}{revision}";
});
# Async get
$client->get('/my/key', sub {
my ($resp, $err) = @_;
die $err->{message} if $err;
say "Value: $resp->{kvs}[0]{value}";
});
# Watch
$client->watch('/my/key', sub {
my ($resp, $err) = @_;
return warn "Watch error: $err->{message}\n" if $err;
for my $event (@{$resp->{events}}) {
say "Event: $event->{type} on $event->{kv}{key}";
}
});
EV::run;
=head1 DESCRIPTION
EV::Etcd provides a high-performance async client for etcd v3 using native
gRPC Core C API integrated with the EV event loop.
=head1 METHODS
=head2 new
my $client = EV::Etcd->new(%options);
Options:
=over 4
=item endpoints
ArrayRef of etcd endpoints (host:port). Optional; defaults to
C<['127.0.0.1:2379']>. When more than one is provided, the client uses the
first endpoint and rotates to subsequent endpoints on connection failure.
=item timeout
RPC timeout in seconds. Default is 30 seconds. Minimum value is 1 second.
=item max_retries
Maximum number of reconnection attempts for streaming operations (watch,
lease_keepalive, election_observe) after a connection failure. Default is 3.
Set to 0 to disable automatic reconnection.
=item health_interval
Interval in seconds for health monitoring. Default is 0 (disabled).
When enabled, the client periodically checks the gRPC channel connectivity
state and calls the on_health_change callback when the connection state changes.
=item on_health_change
Callback called when the connection health status changes. Receives two
arguments: a boolean indicating health status (1=healthy, 0=unhealthy) and
the current endpoint string.
my $client = EV::Etcd->new(
endpoints => ['127.0.0.1:2379'],
health_interval => 5,
on_health_change => sub {
my ($is_healthy, $endpoint) = @_;
warn $is_healthy ? "Connected to $endpoint" : "Disconnected from $endpoint";
},
);
=item auth_token
Pre-set authentication token. Use this to create an authenticated client
without calling authenticate() first. Useful when you already have a valid
token from a previous session.
my $client = EV::Etcd->new(
endpoints => ['127.0.0.1:2379'],
auth_token => $saved_token,
);
=back
=head1 ENCODING
Keys and values are stored by etcd as raw bytes; this module does not perform
any character encoding. If you pass a Perl string with the UTF-8 flag set
(e.g. a literal containing non-ASCII characters under C<use utf8>), the UTF-8
byte representation is what gets stored. Values returned by C<get> are byte
strings without the UTF-8 flag â string-equality with the original literal
will fail unless you decode explicitly.
For character data, encode/decode at the boundary using L<Encode>:
use Encode qw(encode_utf8 decode_utf8);
$client->put($key, encode_utf8($value), sub { ... });
$client->get($key, sub {
my ($resp) = @_;
my $value = decode_utf8($resp->{kvs}[0]{value});
});
=head1 ERROR HANDLING
Errors are returned as hash references with the following structure:
{
code => 14, # gRPC status code (integer)
status => "UNAVAILABLE", # gRPC status name (string)
message => "Connection refused", # Error message
source => "get", # Which operation failed
retryable => 1, # Whether the error is retryable
}
The C<retryable> field indicates whether the error is transient (status codes:
UNAVAILABLE, RESOURCE_EXHAUSTED, ABORTED, DEADLINE_EXCEEDED).
Streaming operations (watch, keepalive, observe) automatically reconnect
on transient failures according to the C<max_retries> configuration.
Unary RPCs (get, put, delete, etc.) do not retry automatically; use the
C<retryable> field to implement application-level retry logic.
=head1 KEY-VALUE OPERATIONS
=head2 put
$client->put($key, $value, $callback);
$client->put($key, $value, \%opts, $callback);
Put a key-value pair into etcd.
Options:
=over 4
=item lease
Lease ID to associate with the key.
lib/EV/Etcd.pm view on Meta::CPAN
=item range_end
The end of the key range. Use C<undef> for a single key, or use the
special value C<"\x00"> after the last byte of the prefix to match all
keys with that prefix.
=item callback
Called with C<($response, $error)> when complete.
=back
Example:
# Grant read access to a single key
$client->role_grant_permission('readonly', 'READ', '/config/setting', undef, sub {
my ($resp, $err) = @_;
say "Permission granted" unless $err;
});
# Grant read/write access to all keys under /app/
# Range end is /app0 (the byte after / is 0)
$client->role_grant_permission('readwrite', 'READWRITE', '/app/', '/app0', sub {
my ($resp, $err) = @_;
say "Permission granted" unless $err;
});
=head3 role_revoke_permission
$client->role_revoke_permission($role_name, $key, $range_end, $callback);
Revoke a permission from a role.
Arguments:
=over 4
=item role_name
The role to revoke the permission from.
=item key
The key or key prefix of the permission to revoke.
=item range_end
The end of the key range of the permission to revoke.
=item callback
Called with C<($response, $error)> when complete.
=back
=head2 Complete Authentication Example
use EV;
use EV::Etcd;
my $client = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);
# Setup authentication (run once, as root)
sub setup_auth {
# Create a role with permissions
$client->role_add('app-role', sub {
my ($resp, $err) = @_;
# Grant read/write on /app/ prefix
$client->role_grant_permission('app-role', 'READWRITE', '/app/', '/app0', sub {
my ($resp, $err) = @_;
# Create user
$client->user_add('appuser', 'apppassword', sub {
my ($resp, $err) = @_;
# Assign role to user
$client->user_grant_role('appuser', 'app-role', sub {
my ($resp, $err) = @_;
say "Auth setup complete";
EV::break;
});
});
});
});
}
# Normal usage with authentication
sub use_with_auth {
$client->authenticate('appuser', 'apppassword', sub {
my ($resp, $err) = @_;
die "Auth failed: $err->{message}" if $err;
# Now all operations use the auth token
$client->put('/app/key', 'value', sub {
my ($resp, $err) = @_;
say "Put succeeded" unless $err;
EV::break;
});
});
}
use_with_auth();
EV::run;
=head1 MAINTENANCE SERVICE
EV::Etcd provides access to etcd's maintenance operations for cluster
administration and monitoring.
=head2 status
$client->status($callback);
Get the status of the etcd member this client is connected to. Useful for
health checks and cluster monitoring. Response keys:
=over 4
=item version
( run in 0.508 second using v1.01-cache-2.11-cpan-524268b4103 )