AnyEvent-MP
view release on metacpan or search on metacpan
678910111213141516171819202122232425TODO: maybe disbale warnings by
default
?
TODO: listener-scopes (10.0.0.1:4040
@vpn
) and
connect
-scopes (
"vpn,public"
)
TODO: document env-variable usage
TODO: make node objects responsible
for
keepalive?
faq: can't se anything
faq: all is asynch
faq: how to interface to non-perl nodes?
TODO: check gproto, nproto, on
connect
TODO: limiting reconnecting speed
when
unreachable? somehow
use
same interval timers as
for
seeding and keepalive?
TODO: multiple profiles? also some
default
profiles?
TODO: export keepalive?
TODO:
$guard
= con
$cb
->(
$up
)
TODO: aemp
readline
support
TODO: gleeco re: AE::MP::DataConn -
TODO: version both in MP.pm and MP/Config.pm because of cpan indexer
2.02 Sun Jul 29 04:22:53 CEST 2018
- hardcode version in MP.pm to help the CPAN indexer.
383940414243444546474849505152535455565758# monitoring
mon
$port
,
$cb
->(
@msg
)
# callback is invoked on death
mon
$port
,
$localport
# kill localport on abnormal death
mon
$port
,
$localport
,
@msg
# send message on death
# temporarily execute code in port context
peval
$port
,
sub
{
die
"kill the port!"
};
# execute callbacks in $SELF port context
my
$timer
= AE::timer 1, 0, psub {
die
"kill the port, delayed"
;
};
# distributed database - modification
db_set
$family
=>
$subkey
[=>
$value
]
# add a subkey
db_del
$family
=>
$subkey
...
# delete one or more subkeys
db_reg
$family
=>
$port
[=>
$value
]
# register a port
# distributed database - queries
db_family
$family
=>
$cb
->(\
%familyhash
)
582583584585586587588589590591592593594595596597598599600601602closure is executed, sets up the environment in the same way as in C<rcv>
callbacks, i.e. runtime errors will cause the port to get C<kil>ed.
The effect is basically as
if
it returned C<<
sub
{ peval
$SELF
,
sub
{
BLOCK },
@_
} >>.
This is useful
when
you register callbacks from C<rcv> callbacks:
rcv
delayed_reply
=>
sub
{
my
(
$delay
,
@reply
) =
@_
;
my
$timer
= AE::timer
$delay
, 0, psub {
snd
@reply
,
$SELF
;
};
};
=cut
sub psub(&) {
my $cb = shift;
my $port = $SELF
718719720721722723724725726727728729730731732733734735736737738739740741742
and (
$cb
+= 0, Guard::guard {
$node
->unmonitor (
$port
,
$cb
) })
}
=item $guard = mon_guard $port, $ref, $ref...
Monitors the given C<$port> and keeps the passed references. When the port
is killed, the references will be freed.
Optionally returns a guard that will stop the monitoring.
This function is useful when you create e.g. timers or other watchers and
want to free them when the port gets killed (note the use of C<psub>):
$port->rcv (start => sub {
my $timer; $timer = mon_guard $port, AE::timer 1, 1, psub {
undef $timer if 0.9 < rand;
});
});
=cut
sub
mon_guard {
my
(
$port
,
@refs
) =
@_
;
#TODO: mon-less form?
859860861862863864865866867868869870871872873874875876877878879This is simply a utility function that comes in handy at
times
- the
AnyEvent::MP author is not convinced of the wisdom of having it, though,
so it may go away in the future.
=cut
sub after($@) {
my ($timeout, @action) = @_;
my $t; $t = AE::timer $timeout, 0, sub {
undef $t;
ref $action[0]
? $action[0]()
: snd @action;
};
}
#=item $cb2 = timeout $seconds, $cb[, @args]
=item cal $port, @msg, $callback[, $timeout]
903904905906907908909910911912913914915916917918919920921922923my
$timeout
=
ref
$_
[-1] ?
undef
:
pop
;
my
$cb
=
pop
;
my
$port
= port {
undef
$timeout
;
kil
$SELF
;
&$cb
;
};
if
(
defined
$timeout
) {
$timeout
= AE::timer
$timeout
, 0,
sub
{
undef
$timeout
;
kil
$port
;
$cb
->();
};
}
else
{
mon
$_
[0],
sub
{
kil
$port
;
$cb
->();
};
}
MP/DataConn.pm view on Meta::CPAN
434445464748495051525354555657585960616263our
$ID
=
"a"
;
our
%STATE
;
# another node tells us to await a connection
sub
_expect {
my
(
$id
,
$port
,
$timeout
,
$initfunc
,
@initdata
) =
@_
;
$STATE
{
$id
} = {
id
=>
$id
,
to
=> (AE::timer
$timeout
, 0,
sub
{
$STATE
{
$id
}{done}(
undef
);
}),
done
=>
sub
{
my
(
$hdl
,
$error
) =
@_
;
%{
delete
$STATE
{
$id
}} = ();
if
(
defined
$hdl
) {
(AnyEvent::MP::Kernel::load_func
$initfunc
)->(
@initdata
,
$hdl
);
}
else
{
MP/DataConn.pm view on Meta::CPAN
8687888990919293949596979899100101102103104105106my
$state
=
$STATE
{
$id
}
or
return
;
my
$addr
=
$AnyEvent::MP::Global::addr
{
$node
};
@$addr
or
return
$state
->{done}(
undef
,
"$node: no listeners found"
);
# I love hardcoded constants !
$state
->{
next
} = AE::timer 0, 2,
sub
{
my
$endpoint
=
shift
@$addr
or
return
delete
$state
->{
next
};
my
(
$host
,
$port
) = AnyEvent::Socket::parse_hostport
$endpoint
or
return
;
my
$transport
;
$transport
= AnyEvent::MP::Transport::mp_connect
$host
,
$port
,
protocol
=>
"aemp-dataconn"
,
local_greeting
=> {
dataconn_id
=>
$id
},
MP/DataConn.pm view on Meta::CPAN
165166167168169170171172173174175176177178179180181182183184185my
$port
=
$SELF
or Carp::croak
"AnyEvent::MP::DataConn::connect_to must be called in port context"
;
$node
= node_of
$node
;
my
$id
= (++
$ID
) .
"\@$NODE"
;
# damn, why do my simple state hashes resemble objects so quickly
my
$state
=
$STATE
{
$id
} = {
id
=> (++
$ID
) .
"\@$NODE"
,
to
=> (AE::timer
$timeout
, 0,
sub
{
$STATE
{
$id
}{done}(
undef
,
"$node: unable to establish connection within $timeout seconds"
);
}),
done
=>
sub
{
my
(
$hdl
,
$error
) =
@_
;
delete
$AnyEvent::MP::Global::ON_SETUP
{
$id
};
%{
delete
$STATE
{
$id
}} = ();
if
(
defined
$hdl
) {
$cb
->(
$hdl
);
MP/Kernel.pm view on Meta::CPAN
164165166167168169170171172173174175176177178179180181182183our
$DELAY_TIMER
;
our
@DELAY_QUEUE
;
our
$delay_run
=
sub
{
(
shift
@DELAY_QUEUE
or
return
undef
$DELAY_TIMER
)->()
while
1;
};
sub
delay($) {
push
@DELAY_QUEUE
,
shift
;
$DELAY_TIMER
||= AE::timer 0, 0,
$delay_run
;
}
=item $AnyEvent::MP::Kernel::SRCNODE
During execution of a message callback, this variable contains the node ID
of the origin node.
The main use of this variable is for debugging output - there are probably
very few other cases where you need to know the source node ID.
MP/Kernel.pm view on Meta::CPAN
516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545
if
(
@seeds
) {
# start connection attempt for every seed we are not connected to yet
seed_connect
$_
for
grep
!
exists
$SEED_CONNECT
{
$_
},
@seeds
;
$SEED_RETRY
=
$SEED_RETRY
* 2;
$SEED_RETRY
=
$AnyEvent::MP::Kernel::CONFIG
->{monitor_timeout}
if
$SEED_RETRY
>
$AnyEvent::MP::Kernel::CONFIG
->{monitor_timeout};
$SEED_WATCHER
= AE::timer
$SEED_RETRY
, 0, \
&seed_all
;
}
else
{
# all seeds connected or connecting, no need to restart timer
undef
$SEED_WATCHER
;
}
}
sub
seed_again {
$SEED_RETRY
= (1 +
rand
) * 0.6;
$SEED_WATCHER
||= AE::timer 0, 0, \
&seed_all
;
}
# sets new seed list, starts connecting
sub
set_seeds(@) {
%SEED_NODE
= ();
%NODE_SEED
= ();
%SEED_CONNECT
= ();
@SEED_NODE
{
@_
} = ();
MP/Kernel.pm view on Meta::CPAN
583584585586587588589590591592593594595596597598599600601602
. (
join
" "
,
keys
%KEEPALIVE_DOWN
)
.
"."
;
(add_node
$_
)->
connect
for
keys
%KEEPALIVE_DOWN
;
$KEEPALIVE_RETRY
=
$KEEPALIVE_RETRY
* 2;
$KEEPALIVE_RETRY
=
$AnyEvent::MP::Kernel::CONFIG
->{monitor_timeout}
if
$KEEPALIVE_RETRY
>
$AnyEvent::MP::Kernel::CONFIG
->{monitor_timeout};
$KEEPALIVE_WATCHER
= AE::timer
$KEEPALIVE_RETRY
, 0, \
&keepalive_all
;
}
sub
keepalive_again {
$KEEPALIVE_RETRY
= (1 +
rand
) * 0.3;
keepalive_all;
}
sub
keepalive_add {
return
if
$KEEPALIVE
{
$_
[0]}++;
123124125126127128129130131132133134135136137138139140141142143return
if
$self
->{transport};
return
if
$self
->{connect_w};
# we unweaken the node reference, in case it was weakened before
$AnyEvent::MP::Kernel::NODE
{
$self
->{id}}
=
$AnyEvent::MP::Kernel::NODE
{
$self
->{id}};
Scalar::Util::weaken
$self
;
$self
->{connect_to} ||= AE::timer
$AnyEvent::MP::Kernel::CONFIG
->{connect_interval}, 0,
sub
{
$self
->transport_error (
transport_error
=>
$self
->{id},
"connect timeout"
);
};
# maybe @$addresses?
my
$addresses
=
$AnyEvent::MP::Kernel::GLOBAL_DB
{
"'l"
}{
$self
->{id}};
if
(
$addresses
) {
$self
->connect_to (
$addresses
);
}
else
{
# on global nodes, all bets are off now - we either know the node, or we don't
165166167168169170171172173174175176177178179180181182183184185my
$monitor
=
$AnyEvent::MP::Kernel::CONFIG
->{monitor_timeout};
my
$interval
=
$AnyEvent::MP::Kernel::CONFIG
->{connect_interval};
$interval
= (
$monitor
-
$interval
) /
@$addresses
if
(
$monitor
-
$interval
) /
@$addresses
<
$interval
;
$interval
= 0.4
if
$interval
< 0.4;
my
@endpoints
=
reverse
@$addresses
;
$self
->{connect_w} = AE::timer 0,
$interval
* (0.9 + 0.1 *
rand
),
sub
{
my
$endpoint
=
pop
@endpoints
or
return
;
AE::
log
9
=>
"connecting to $self->{id} at $endpoint"
;
$self
->{trial}{
$endpoint
} ||=
do
{
my
(
$host
,
$port
) = AnyEvent::Socket::parse_hostport
$endpoint
or
return
AE::
log
critical
=>
"$self->{id}: '$endpoint' is not a resolved node reference."
;
AnyEvent::MP::Transport::mp_connect
243244245246247248249250251252253254255256257258259260261262263sub
transport_reset {
my
(
$self
) =
@_
;
Scalar::Util::weaken
$self
;
$self
->{
send
} =
sub
{
if
(++
$DELAY
> 0) {
my
$msg
=
$_
[0];
push
@DELAY
,
sub
{ AnyEvent::MP::Kernel::_inject (
@$msg
) };
$DELAY_W
||= AE::timer 0, 0,
$send_delayed
;
return
;
}
local
$AnyEvent::MP::Kernel::SRCNODE
=
$AnyEvent::MP::Kernel::NODE
;
AnyEvent::MP::Kernel::_inject (@{
$_
[0] });
};
}
sub
transport_connect {
my
(
$self
,
$tp
) =
@_
;
265266267268269270271272273274275276277278279280281282283284285
AE::
log
9
=>
"I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})"
;
}
sub
kill
{
my
(
undef
,
@args
) =
@_
;
# we _always_ delay kil's, to avoid calling mon callbacks
# from anything but the event loop context.
$DELAY
= 1;
push
@DELAY
,
sub
{ AnyEvent::MP::Kernel::_kill (
@args
) };
$DELAY_W
||= AE::timer 0, 0,
$send_delayed
;
}
sub
monitor {
# maybe always delay, too?
if
(
$DELAY_W
) {
my
@args
=
@_
;
push
@DELAY
,
sub
{ AnyEvent::MP::Kernel::_monitor (
@args
) };
return
;
}
&AnyEvent::MP::Kernel::_monitor
;
363738394041424344454647484950515253545556# monitoring
mon
$port
,
$cb
->(
@msg
)
# callback is invoked on death
mon
$port
,
$localport
# kill localport on abnormal death
mon
$port
,
$localport
,
@msg
# send message on death
# temporarily execute code in port context
peval
$port
,
sub
{
die
"kill the port!"
};
# execute callbacks in $SELF port context
my
$timer
= AE::timer 1, 0, psub {
die
"kill the port, delayed"
;
};
# distributed database - modification
db_set
$family
=>
$subkey
[=>
$value
]
# add a subkey
db_del
$family
=>
$subkey
...
# delete one or more subkeys
db_reg
$family
=>
$port
[=>
$value
]
# register a port
# distributed database - queries
db_family
$family
=>
$cb
->(\
%familyhash
)
405406407408409410411412413414415416417418419420421422423424425
"rcv"
callbacks, i.e. runtime errors will cause the port to get
"kil"
ed.
The effect is basically as
if
it returned "
sub
{ peval
$SELF
,
sub
{
BLOCK },
@_
}".
This is useful
when
you register callbacks from
"rcv"
callbacks:
rcv
delayed_reply
=>
sub
{
my
(
$delay
,
@reply
) =
@_
;
my
$timer
= AE::timer
$delay
, 0, psub {
snd
@reply
,
$SELF
;
};
};
$guard
= mon
$port
,
$rcvport
# kill $rcvport when $port dies
$guard
= mon
$port
# kill $SELF when $port dies
$guard
= mon
$port
,
$cb
->(
@reason
)
# call $cb when $port dies
$guard
= mon
$port
,
$rcvport
,
@msg
# send a message when $port dies
Monitor the
given
port and
do
something
when
the port is killed or
messages to it were lost, and optionally
return
a guard that can be
490491492493494495496497498499500501502503504505506507508509510511512513514515
Example:
send
us a restart message
when
another
$port
is killed.
mon
$port
,
$self
=>
"restart"
;
$guard
= mon_guard
$port
,
$ref
,
$ref
...
Monitors the
given
$port
and keeps the passed references. When the
port is killed, the references will be freed.
Optionally returns a guard that will stop the monitoring.
This function is useful
when
you create e.g. timers or other
watchers and want to free them
when
the port gets killed (note the
$port
->rcv (
start
=>
sub
{
my
$timer
;
$timer
= mon_guard
$port
, AE::timer 1, 1, psub {
undef
$timer
if
0.9 <
rand
;
});
});
kil
$port
[,
@reason
]
Kill the specified port
with
the
given
@reason
.
If
no
@reason
is specified, then the port is killed
"normally"
-
monitor callback will be invoked, but the kil will not cause linked
ports (
"mon $mport, $lport"
form) to get killed.
659660661662663664665666667668669670671672673674675676677678679
delete
$to
{
$node
};
@neigh
=
grep
$_
ne
$NODE
,
@neigh
;
$node
,
" -> "
, (
join
" "
,
@neigh
),
"\n"
;
for
my
$neigh
(
@neigh
) {
unless
(
$seen
{
$neigh
}++) {
$cv
->begin;
$to
{
$neigh
} = AE::timer 15, 0,
sub
{
"$neigh (timeout)\n"
;
$exit
= 1;
$cv
->end;
};
AnyEvent::MP::Kernel::eval_on
$neigh
,
"AnyEvent::MP::Kernel::up_nodes"
=>
$SELF
=>
$neigh
;
}
}
$cv
->end;
};
749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790
};
$cv
->
recv
;
}
sub
node_eval {
my
(
$node
,
$expr
) =
@_
;
init;
my
$cv
= AE::cv;
my
$to
= AE::timer 5, 0,
sub
{
exit
1 };
AnyEvent::MP::Kernel::eval_on
$node
,
$expr
, port {
&$cv
};
mon
$node
,
$cv
;
my
(
$err
,
@res
) =
$cv
->
recv
;
die
"$err @res"
if
length
$err
;
+(
substr
JSON::XS->new->encode (\
@res
), 1, -1),
"\n"
;
}
sub
docmd;
our
%CMD
= (
snd
=>
sub
{
my
$port
=
shift
@ARGV
;
init;
snd
$port
,
@ARGV
;
@ARGV
= ();
my
$cv
= AE::cv;
my
$to
= AE::timer 5, 0,
sub
{
exit
1 };
mon
$port
,
$cv
;
my
$reply
= port
sub
{
&$cv
};
snd node_of
$port
,
snd
=>
$reply
,
"message sent successfully"
;
join
" "
,
$cv
->
recv
,
"\n"
;
},
cal
=>
sub
{
my
$port
=
shift
@ARGV
;
init;
( run in 0.549 second using v1.01-cache-2.11-cpan-87723dcf8b7 )