view release on metacpan or search on metacpan
port rcv mon mon_guard psub peval spawn cal
db_set db_del db_reg
db_mon db_family db_keys db_values
after
);
our $SELF;
sub _self_die() {
my $msg = $@;
$msg =~ s/\n+$// unless ref $msg;
kil $SELF, die => $msg;
}
=item $thisnode = NODE / $NODE
The C<NODE> function returns, and the C<$NODE> variable contains, the node
ID of the node running in the current process. This value is initialised by
a call to C<configure>.
If you want to stop/destroy the port, simply C<kil> it:
my $port = port {
my @msg = @_;
...
kil $SELF;
};
=cut
sub rcv($@);
my $KILME = sub {
(my $tag = substr $_[0], 0, 30) =~ s/([^\x20-\x7e])/./g;
kil $SELF, unhandled_message => "no callback found for message '$tag'";
};
sub port(;&) {
my $id = $UNIQ . ++$ID;
my $port = "$NODE#$id";
rcv $port, shift || $KILME;
$port
}
=item rcv $local_port, $callback->(@msg)
(e.g. for an rpc reply) and unregister it after a message was received.
rcv $port, $otherport => sub {
my @reply = @_;
rcv $SELF, $otherport;
};
=cut
sub rcv($@) {
my $port = shift;
my ($nodeid, $portid) = split /#/, $port, 2;
$nodeid eq $NODE
or Carp::croak "$port: rcv can only be called on local ports, caught";
while (@_) {
if (ref $_[0]) {
if (my $self = $PORT_DATA{$portid}) {
"AnyEvent::MP::Port" eq ref $self
my $port = port { ... };
peval $port, sub {
init
or die "unable to init";
};
=cut
sub peval($$) {
local $SELF = shift;
my $cb = shift;
if (wantarray) {
my @res = eval { &$cb };
_self_die if $@;
@res
} else {
my $res = eval { &$cb };
_self_die if $@;
rcv delayed_reply => sub {
my ($delay, @reply) = @_;
my $timer = AE::timer $delay, 0, psub {
snd @reply, $SELF;
};
};
=cut
sub psub(&) {
my $cb = shift;
my $port = $SELF
or Carp::croak "psub can only be called from within rcv or psub callbacks, not";
sub {
local $SELF = $port;
if (wantarray) {
my @res = eval { &$cb };
my $init = shift;
# rcv will create the actual port
local $SELF = "$NODE#$port";
eval {
&{ load_func $init }
};
_self_die if $@;
}
sub spawn(@) {
my ($nodeid, undef) = split /#/, shift, 2;
my $id = $RUNIQ . ++$ID;
$_[0] =~ /::/
or Carp::croak "spawn init function must be a fully-qualified name, caught";
snd_to_func $nodeid, "AnyEvent::MP::_spawn" => $id, @_;
"$nodeid#$id"
Either sends the given message, or call the given callback, after the
specified number of seconds.
This is simply a utility function that comes in handy at times - the
AnyEvent::MP author is not convinced of the wisdom of having it, though,
so it may go away in the future.
=cut
sub after($@) {
my ($timeout, @action) = @_;
my $t; $t = AE::timer $timeout, 0, sub {
undef $t;
ref $action[0]
? $action[0]()
: snd @action;
};
}
If no time-out is given (or it is C<undef>), then the local port will
monitor the remote port instead, so it eventually gets cleaned-up.
Currently this function returns the temporary port, but this "feature"
might go in future versions unless you can make a convincing case that
this is indeed useful for something.
=cut
sub cal(@) {
my $timeout = ref $_[-1] ? undef : pop;
my $cb = pop;
my $port = port {
undef $timeout;
kil $SELF;
&$cb;
};
if (defined $timeout) {
If C<$value> is missing, C<undef> is used. If C<$port> is missing, then
C<$SELF> is used.
This function is most useful to register a port in some port group (which
is just another name for a database family), and have it removed when the
port is gone. This works best when the port is a local port.
=cut
sub db_reg($$;$) {
my $family = shift;
my $port = @_ ? shift : $SELF;
my $clr = sub { db_del $family => $port };
mon $port, $clr;
db_set $family => $port => $_[0];
defined wantarray
and &Guard::guard ($clr)
MP/Config.pm view on Meta::CPAN
unlink "$CONFIG_FILE~";
link $CONFIG_FILE, "$CONFIG_FILE~";
rename "$CONFIG_FILE~new~", $CONFIG_FILE
or Carp::croak "$CONFIG_FILE: $!";
}
sub config {
\%CFG
}
sub _find_profile($);
sub _find_profile($) {
my ($name) = @_;
if (defined $name) {
my $profile = $CFG{profile}{$name};
return _find_profile $profile->{parent}, %$profile;
} else {
return %CFG;
}
}
sub find_profile($;%) {
my ($name, %kv) = @_;
my $norc = delete $kv{norc};
my $force = delete $kv{force};
%kv = (
monitor_timeout => 30,
connect_interval => 2,
framing_format => [qw(cbor json storable)], # framing types we offer and accept, in order of preference
auth_offer => [qw(tls_sha3_512 hmac_sha3_512)], # what we will send
MP/DataConn.pm view on Meta::CPAN
warn "connection established, wait for a line...\n"
$hdl->push_read (line => sub {
warn "received a line: $_[1]\n";
undef $hdl;
});
}
=cut
sub connect_to($$$$@) {
my $cb = pop;
my ($node, $timeout, $initfunc, @initdata) = @_;
my $port = $SELF
or Carp::croak "AnyEvent::MP::DataConn::connect_to must be called in port context";
$node = node_of $node;
my $id = (++$ID) . "\@$NODE";
MP/Global.pm view on Meta::CPAN
# tell subscribers we have changed the family
if (%$set || %local_set || @del_local) {
@$set{keys %local_set} = values %local_set;
snd $_ => g_chg2 => $family, $set, \@del_local
for keys %{ $GLOBAL_MON{$family} };
}
}
# set the whole (node-local) database - previous value must be empty
sub g_set($$) {
my ($node, $db) = @_;
while (my ($f, $k) = each %$db) {
g_upd $node, $f, $k;
}
}
# delete all keys from a database
sub g_clr($) {
my ($node) = @_;
my $db = $LOCAL_DBS{$node};
while (my ($f, $k) = each %$db) {
g_upd $node, $f, undef, [keys %$k];
}
delete $LOCAL_DBS{$node};
}
MP/Global.pm view on Meta::CPAN
snd $SRCNODE, g_reply => $id, [keys %{ $GLOBAL_DB{$family} } ];
};
$NODE_REQ{g_db_values} = sub {
my ($family, $id) = @_;
snd $SRCNODE, g_reply => $id, [values %{ $GLOBAL_DB{$family} } ];
};
# monitoring
sub g_disconnect($) {
my ($node) = @_;
delete $GLOBAL_NODE{$node}; # also done in Kernel.pm, but doing it here avoids overhead
db_del "'g" => $node;
db_del "'l" => $node;
g_clr $node;
if (my $mon = delete $GLOBAL_SLAVE{$node}) {
while (my ($f, $fv) = each %$mon) {
MP/Global.pm view on Meta::CPAN
};
#############################################################################
# compatibility functions for aemp 1.0
package AnyEvent::MP::Global;
use base "Exporter";
our @EXPORT = qw(grp_reg grp_get grp_mon);
sub grp_reg($$) {
&db_reg
}
sub grp_get($) {
my @ports = keys %{ $AnyEvent::MP::Kernel::GLOBAL_DB{$_[0]} };
@ports ? \@ports : undef
}
sub grp_mon($$) {
my ($grp, $cb) = @_;
db_mon $grp => sub {
my ($ports, $add, $chg, $del) = @_;
$cb->([keys %$ports], $add, $del);
};
}
=head1 SEE ALSO
MP/Kernel.pm view on Meta::CPAN
);
our @EXPORT = qw(
snd_to_func snd_on eval_on
port_is_local
up_nodes mon_nodes node_is_up
);
our @CARP_NOT = (AnyEvent::MP::);
sub load_func($) {
my $func = $_[0];
unless (defined &$func) {
my $pkg = $func;
do {
$pkg =~ s/::[^:]+$//
or return sub { die "unable to resolve function '$func'" };
local $@;
unless (eval "require $pkg; 1") {
MP/Kernel.pm view on Meta::CPAN
or return sub { die $error };
}
} until defined &$func;
}
\&$func
}
my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
sub nonce($) {
join "", map chr rand 256, 1 .. $_[0]
}
sub nonce62($) {
join "", map $alnum[rand 62], 1 .. $_[0]
}
our $CONFIG; # this node's configuration
our $SECURE;
our $RUNIQ; # remote uniq value
our $UNIQ; # per-process/node unique cookie
our $NODE;
our $ID = "a";
MP/Kernel.pm view on Meta::CPAN
) . nonce62 4
;
# ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
$RUNIQ = nonce62 10;
$RUNIQ =~ s/(.)$/\U$1/;
$NODE = "";
}
sub NODE() {
$NODE
}
sub node_of($) {
my ($node, undef) = split /#/, $_[0], 2;
$node
}
BEGIN {
*TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
? sub () { 1 }
: sub () { 0 };
}
our $DELAY_TIMER;
our @DELAY_QUEUE;
our $delay_run = sub {
(shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
};
sub delay($) {
push @DELAY_QUEUE, shift;
$DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
}
=item $AnyEvent::MP::Kernel::SRCNODE
During execution of a message callback, this variable contains the node ID
of the origin node.
The main use of this variable is for debugging output - there are probably
MP/Kernel.pm view on Meta::CPAN
my ($node) = @_;
length $node
or Carp::croak "'undef' or the empty string are not valid node/port IDs";
# registers itself in %NODE
new AnyEvent::MP::Node::Remote $node
}
}
sub snd(@) {
my ($nodeid, $portid) = split /#/, shift, 2;
warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;
($NODE{$nodeid} || add_node $nodeid)
->{send} (["$portid", @_]);
}
sub port_is_local($) {
my ($nodeid, undef) = split /#/, $_[0], 2;
$nodeid eq $NODE
}
=item snd_to_func $node, $func, @args
Expects a node ID and a name of a function. Asynchronously tries to call
this function with the given arguments on that node.
This function can be used to implement C<spawn>-like interfaces.
=cut
sub snd_to_func($$;@) {
my $nodeid = shift;
# on $NODE, we artificially delay... (for spawn)
# this is very ugly - maybe we should simply delay ALL messages,
# to avoid deep recursion issues. but that's so... slow...
$AnyEvent::MP::Node::Self::DELAY = 1
if $nodeid ne $NODE;
($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
}
=item snd_on $node, @msg
Executes C<snd> with the given C<@msg> (which must include the destination
port) on the given node.
=cut
sub snd_on($@) {
my $node = shift;
snd $node, snd => @_;
}
=item eval_on $node, $string[, @reply]
Evaluates the given string as Perl expression on the given node. When
@reply is specified, then it is used to construct a reply message with
C<"$@"> and any results from the eval appended.
=cut
sub eval_on($$;@) {
my $node = shift;
snd $node, eval => @_;
}
sub kil(@) {
my ($nodeid, $portid) = split /#/, shift, 2;
length $portid
or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
($NODE{$nodeid} || add_node $nodeid)
->kill ("$portid", @_);
}
#############################################################################
MP/Kernel.pm view on Meta::CPAN
Returns true if the given node is "up", that is, the kernel thinks it has
a working connection to it.
More precisely, if the node is up, returns C<1>. If the node is currently
connecting or otherwise known but not connected, returns C<0>. If nothing
is known about the node, returns C<undef>.
=cut
sub node_is_up($) {
($_[0] eq $NODE) || ($NODE{$_[0]} or return)->{transport}
? 1 : 0
}
=item @nodes = up_nodes
Return the node IDs of all nodes that are currently connected (excluding
the node itself).
=cut
sub up_nodes() {
map $_->{id}, grep $_->{transport}, values %NODE
}
=item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
Registers a callback that is called each time a node goes up (a connection
is established) or down (the connection is lost).
Node up messages can only be followed by node down messages for the same
node, and vice versa.
MP/Kernel.pm view on Meta::CPAN
Example: make sure you call function C<newnode> for all nodes that are up
or go up (and down).
newnode $_, 1 for up_nodes;
mon_nodes \&newnode;
=cut
our %MON_NODES;
sub mon_nodes($) {
my ($cb) = @_;
$MON_NODES{$cb+0} = $cb;
defined wantarray
and Guard::guard { delete $MON_NODES{$cb+0} }
}
sub _inject_nodeevent($$;@) {
my ($node, $up, @reason) = @_;
AE::log 7 => "$node->{id} is " . ($up ? "up." : "down (@reason).");
for my $cb (values %MON_NODES) {
eval { $cb->($node->{id}, $up, @reason); 1 }
or AE::log die => $@;
}
}
MP/Kernel.pm view on Meta::CPAN
undef $SEED_WATCHER;
}
}
sub seed_again {
$SEED_RETRY = (1 + rand) * 0.6;
$SEED_WATCHER ||= AE::timer 0, 0, \&seed_all;
}
# sets new seed list, starts connecting
sub set_seeds(@) {
%SEED_NODE = ();
%NODE_SEED = ();
%SEED_CONNECT = ();
@SEED_NODE{@_} = ();
seed_again;
}
# normal nodes only record global node connections
MP/Kernel.pm view on Meta::CPAN
&{ $NODE_REQ{g_slave} }
};
#############################################################################
# local database operations
# canonical probably not needed
our $sv_eq_coder = JSON::XS->new->utf8->allow_nonref;
# are the two scalars equal? very very ugly and slow, need better way
sub sv_eq($$) {
ref $_[0] || ref $_[1]
? (JSON::XS::encode $sv_eq_coder, $_[0]) eq (JSON::XS::encode $sv_eq_coder, $_[1])
: $_[0] eq $_[1]
&& defined $_[0] == defined $_[1]
}
# local database management
sub db_del($@) {
my $family = shift;
my @del = grep exists $LOCAL_DB{$family}{$_}, @_;
return unless @del;
delete @{ $LOCAL_DB{$family} }{@del};
snd $MASTER, g_upd => $family => undef, \@del
if defined $MASTER;
}
sub db_set($$;$) {
my ($family, $subkey) = @_;
# if (ref $_[1]) {
# # bulk
# my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
# $LOCAL_DB{$_[0]} = $_[1];
# snd $MASTER, g_upd => $_[0] => $_[1], \@del
# if defined $MASTER;
# } else {
# single-key
MP/Kernel.pm view on Meta::CPAN
sub db_values {
my ($family, $cb) = @_;
global_call g_db_values => $family, $cb;
}
# database monitoring
our %LOCAL_MON; # f, reply
our %MON_DB; # f, k, value
sub db_mon($@) {
my ($family, $cb) = @_;
if (my $db = $MON_DB{$family}) {
# we already monitor, so create a "dummy" change event
# this is postponed, which might be too late (we could process
# change events), so disable the callback at first
$LOCAL_MON{$family}{$cb+0} = sub { };
AE::postpone {
return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
MP/Kernel.pm view on Meta::CPAN
};
#############################################################################
# configure
sub nodename {
require POSIX;
(POSIX::uname ())[1]
}
sub _resolve($) {
my ($nodeid) = @_;
my $cv = AE::cv;
my @res;
$cv->begin (sub {
my %seen;
my @refs;
for (sort { $a->[0] <=> $b->[0] } @res) {
push @refs, $_->[1] unless $seen{$_->[1]}++
MP/Kernel.pm view on Meta::CPAN
}
$cv->end;
$cv
}
our @POST_CONFIGURE;
# not yet documented
sub post_configure(&) {
die "AnyEvent::MP::Kernel::post_configure must be called in void context" if defined wantarray;
push @POST_CONFIGURE, @_;
(shift @POST_CONFIGURE)->() while $NODE && @POST_CONFIGURE;
}
sub configure(@) {
unshift @_, "profile" if @_ & 1;
my (%kv) = @_;
my $profile = delete $kv{profile};
$profile = nodename
unless defined $profile;
$CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
MP/Transport.pm view on Meta::CPAN
Creates a listener on the given host/port using
C<AnyEvent::Socket::tcp_server>.
See C<new>, below, for constructor arguments.
Defaults for peerhost, peerport and fh are provided.
=cut
sub mp_server($$;%) {
my ($host, $port, %arg) = @_;
AnyEvent::Socket::tcp_server $host, $port, sub {
my ($fh, $host, $port) = @_;
my $tp = new AnyEvent::MP::Transport
fh => $fh,
peerhost => $host,
peerport => $port,
%arg,
MP/Transport.pm view on Meta::CPAN
on_connect => sub { successful-connect-callback },
greeting => { key => value },
# tls support
tls_ctx => AnyEvent::TLS,
peername => $peername, # for verification
;
=cut
sub hmac_sha3_512_hex($$) {
Digest::HMAC::hmac_hex $_[1], $_[0], \&Digest::SHA3::sha3_512, 72
}
sub new {
my ($class, %arg) = @_;
my $self = bless \%arg, $class;
{
Scalar::Util::weaken (my $self = $self);