AnyEvent-MP
view release on metacpan or search on metacpan
MP/Kernel.pm view on Meta::CPAN
{
# ~54 bits, for local port names, lowercase $ID appended
my $now = AE::now;
$UNIQ =
(join "",
map $alnum[$_],
$$ / 62 % 62,
$$ % 62,
(int $now ) % 62,
(int $now * 100) % 62,
(int $now * 10000) % 62,
) . nonce62 4
;
# ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
$RUNIQ = nonce62 10;
$RUNIQ =~ s/(.)$/\U$1/;
$NODE = "";
}
sub NODE() {
$NODE
}
sub node_of($) {
my ($node, undef) = split /#/, $_[0], 2;
$node
}
BEGIN {
*TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
? sub () { 1 }
: sub () { 0 };
}
our $DELAY_TIMER;
our @DELAY_QUEUE;
our $delay_run = sub {
(shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
};
sub delay($) {
push @DELAY_QUEUE, shift;
$DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
}
=item $AnyEvent::MP::Kernel::SRCNODE
During execution of a message callback, this variable contains the node ID
of the origin node.
The main use of this variable is for debugging output - there are probably
very few other cases where you need to know the source node ID.
=cut
sub _inject {
warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;
&{ $PORT{+shift} or return };
}
# this function adds a node-ref, so you can send stuff to it
# it is basically the central routing component.
sub add_node {
$NODE{$_[0]} || do {
my ($node) = @_;
length $node
or Carp::croak "'undef' or the empty string are not valid node/port IDs";
# registers itself in %NODE
new AnyEvent::MP::Node::Remote $node
}
}
sub snd(@) {
my ($nodeid, $portid) = split /#/, shift, 2;
warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;
($NODE{$nodeid} || add_node $nodeid)
->{send} (["$portid", @_]);
}
sub port_is_local($) {
my ($nodeid, undef) = split /#/, $_[0], 2;
$nodeid eq $NODE
}
=item snd_to_func $node, $func, @args
Expects a node ID and a name of a function. Asynchronously tries to call
this function with the given arguments on that node.
This function can be used to implement C<spawn>-like interfaces.
=cut
sub snd_to_func($$;@) {
my $nodeid = shift;
# on $NODE, we artificially delay... (for spawn)
# this is very ugly - maybe we should simply delay ALL messages,
# to avoid deep recursion issues. but that's so... slow...
$AnyEvent::MP::Node::Self::DELAY = 1
if $nodeid ne $NODE;
($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
}
=item snd_on $node, @msg
Executes C<snd> with the given C<@msg> (which must include the destination
port) on the given node.
=cut
sub snd_on($@) {
my $node = shift;
snd $node, snd => @_;
}
=item eval_on $node, $string[, @reply]
Evaluates the given string as Perl expression on the given node. When
@reply is specified, then it is used to construct a reply message with
C<"$@"> and any results from the eval appended.
=cut
sub eval_on($$;@) {
my $node = shift;
snd $node, eval => @_;
}
sub kil(@) {
my ($nodeid, $portid) = split /#/, shift, 2;
MP/Kernel.pm view on Meta::CPAN
global_req_add "g_find $_[0]", [g_find => $_[0]];
}
# reply for g_find started in Node.pm
$NODE_REQ{g_found} = sub {
global_req_del "g_find $_[0]";
my $node = $NODE{$_[0]} or return;
$node->connect_to ($_[1]);
};
sub master_set {
$MASTER = $_[0];
AE::log 8 => "new master node: $MASTER.";
$MASTER_MON = mon_nodes sub {
if ($_[0] eq $MASTER && !$_[1]) {
undef $MASTER;
master_search ();
}
};
snd $MASTER, g_slave => \%LOCAL_DB;
# (re-)send queued requests
snd $MASTER, @$_
for values %GLOBAL_REQ;
}
sub master_search {
AE::log 9 => "starting search for master node.";
#TODO: should also look for other global nodes, but we don't know them
for (keys %NODE_SEED) {
if (node_is_up $_) {
master_set $_;
return;
}
}
$MASTER_MON = mon_nodes sub {
return unless $_[1]; # we are only interested in node-ups
return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
master_set $_[0];
};
}
# other node wants to make us the master, so start the global service
$NODE_REQ{g_slave} = sub {
# load global module and redo the request
require AnyEvent::MP::Global;
&{ $NODE_REQ{g_slave} }
};
#############################################################################
# local database operations
# canonical probably not needed
our $sv_eq_coder = JSON::XS->new->utf8->allow_nonref;
# are the two scalars equal? very very ugly and slow, need better way
sub sv_eq($$) {
ref $_[0] || ref $_[1]
? (JSON::XS::encode $sv_eq_coder, $_[0]) eq (JSON::XS::encode $sv_eq_coder, $_[1])
: $_[0] eq $_[1]
&& defined $_[0] == defined $_[1]
}
# local database management
sub db_del($@) {
my $family = shift;
my @del = grep exists $LOCAL_DB{$family}{$_}, @_;
return unless @del;
delete @{ $LOCAL_DB{$family} }{@del};
snd $MASTER, g_upd => $family => undef, \@del
if defined $MASTER;
}
sub db_set($$;$) {
my ($family, $subkey) = @_;
# if (ref $_[1]) {
# # bulk
# my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
# $LOCAL_DB{$_[0]} = $_[1];
# snd $MASTER, g_upd => $_[0] => $_[1], \@del
# if defined $MASTER;
# } else {
# single-key
unless (exists $LOCAL_DB{$family}{$subkey} && sv_eq $LOCAL_DB{$family}{$subkey}, $_[2]) {
$LOCAL_DB{$family}{$subkey} = $_[2];
snd $MASTER, g_upd => $family => { $subkey => $_[2] }
if defined $MASTER;
}
# }
defined wantarray
and Guard::guard { db_del $family => $subkey }
}
# database query
sub db_family {
my ($family, $cb) = @_;
global_call g_db_family => $family, $cb;
}
sub db_keys {
my ($family, $cb) = @_;
global_call g_db_keys => $family, $cb;
}
sub db_values {
my ($family, $cb) = @_;
global_call g_db_values => $family, $cb;
}
# database monitoring
our %LOCAL_MON; # f, reply
( run in 0.624 second using v1.01-cache-2.11-cpan-39bf76dae61 )