AnyEvent-MP

 view release on metacpan or  search on metacpan

Changes  view on Meta::CPAN

6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
TODO: maybe disbale warnings by default?
TODO: listener-scopes (10.0.0.1:4040@vpn) and connect-scopes ("vpn,public")
TODO: document env-variable usage
TODO: make node objects responsible for keepalive?
 
faq: can't se anything
faq: all is asynch
faq: how to interface to non-perl nodes?
 
TODO: check gproto, nproto, on connect
TODO: limiting reconnecting speed when unreachable? somehow use same interval timers as for seeding and keepalive?
TODO: multiple profiles? also some default profiles?
TODO: export keepalive?
TODO: $guard = con $cb->($up)
TODO: aemp readline support
TODO: gleeco re: AE::MP::DataConn -
TODO: version both in MP.pm and MP/Config.pm because of cpan indexer
 
2.02 Sun Jul 29 04:22:53 CEST 2018
        - hardcode version in MP.pm to help the CPAN indexer.

MP.pm  view on Meta::CPAN

38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# monitoring
mon $port, $cb->(@msg)      # callback is invoked on death
mon $port, $localport       # kill localport on abnormal death
mon $port, $localport, @msg # send message on death
 
# temporarily execute code in port context
peval $port, sub { die "kill the port!" };
 
# execute callbacks in $SELF port context
my $timer = AE::timer 1, 0, psub {
   die "kill the port, delayed";
};
 
# distributed database - modification
db_set $family => $subkey [=> $value# add a subkey
db_del $family => $subkey...           # delete one or more subkeys
db_reg $family => $port [=> $value]    # register a port
 
# distributed database - queries
db_family $family => $cb->(\%familyhash)

MP.pm  view on Meta::CPAN

582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
closure is executed, sets up the environment in the same way as in C<rcv>
callbacks, i.e. runtime errors will cause the port to get C<kil>ed.
 
The effect is basically as if it returned C<< sub { peval $SELF, sub {
BLOCK }, @_ } >>.
 
This is useful when you register callbacks from C<rcv> callbacks:
 
   rcv delayed_reply => sub {
      my ($delay, @reply) = @_;
      my $timer = AE::timer $delay, 0, psub {
         snd @reply, $SELF;
      };
   };
 
=cut
 
sub psub(&) {
   my $cb = shift;
 
   my $port = $SELF

MP.pm  view on Meta::CPAN

718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
      and ($cb += 0, Guard::guard { $node->unmonitor ($port, $cb) })
}
 
=item $guard = mon_guard $port, $ref, $ref...
 
Monitors the given C<$port> and keeps the passed references. When the port
is killed, the references will be freed.
 
Optionally returns a guard that will stop the monitoring.
 
This function is useful when you create e.g. timers or other watchers and
want to free them when the port gets killed (note the use of C<psub>):
 
  $port->rcv (start => sub {
     my $timer; $timer = mon_guard $port, AE::timer 1, 1, psub {
        undef $timer if 0.9 < rand;
     });
  });
 
=cut
 
sub mon_guard {
   my ($port, @refs) = @_;
 
   #TODO: mon-less form?

MP.pm  view on Meta::CPAN

859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
This is simply a utility function that comes in handy at times - the
AnyEvent::MP author is not convinced of the wisdom of having it, though,
so it may go away in the future.
 
=cut
 
sub after($@) {
   my ($timeout, @action) = @_;
 
   my $t; $t = AE::timer $timeout, 0, sub {
      undef $t;
      ref $action[0]
         ? $action[0]()
         : snd @action;
   };
}
 
#=item $cb2 = timeout $seconds, $cb[, @args]
 
=item cal $port, @msg, $callback[, $timeout]

MP.pm  view on Meta::CPAN

903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
my $timeout = ref $_[-1] ? undef : pop;
my $cb = pop;
 
my $port = port {
   undef $timeout;
   kil $SELF;
   &$cb;
};
 
if (defined $timeout) {
   $timeout = AE::timer $timeout, 0, sub {
      undef $timeout;
      kil $port;
      $cb->();
   };
} else {
   mon $_[0], sub {
      kil $port;
      $cb->();
   };
}

MP/DataConn.pm  view on Meta::CPAN

43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
our $ID = "a";
our %STATE;
 
# another node tells us to await a connection
sub _expect {
   my ($id, $port, $timeout, $initfunc, @initdata) = @_;
 
   $STATE{$id} = {
      id   => $id,
      to   => (AE::timer $timeout, 0, sub {
         $STATE{$id}{done}(undef);
      }),
      done => sub {
         my ($hdl, $error) = @_;
 
         %{delete $STATE{$id}} = ();
 
         if (defined $hdl) {
            (AnyEvent::MP::Kernel::load_func $initfunc)->(@initdata, $hdl);
         } else {

MP/DataConn.pm  view on Meta::CPAN

86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
my $state = $STATE{$id}
   or return;
 
my $addr = $AnyEvent::MP::Global::addr{$node};
 
@$addr
   or return $state->{done}(undef, "$node: no listeners found");
 
# I love hardcoded constants  !
$state->{next} = AE::timer 0, 2, sub {
   my $endpoint = shift @$addr
      or return delete $state->{next};
 
   my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
      or return;
 
   my $transport; $transport = AnyEvent::MP::Transport::mp_connect
      $host, $port,
      protocol => "aemp-dataconn",
      local_greeting => { dataconn_id => $id },

MP/DataConn.pm  view on Meta::CPAN

165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
my $port = $SELF
   or Carp::croak "AnyEvent::MP::DataConn::connect_to must be called in port context";
 
$node = node_of $node;
 
my $id = (++$ID) . "\@$NODE";
 
# damn, why do my simple state hashes resemble objects so quickly
my $state = $STATE{$id} = {
   id   => (++$ID) . "\@$NODE",
   to   => (AE::timer $timeout, 0, sub {
      $STATE{$id}{done}(undef, "$node: unable to establish connection within $timeout seconds");
   }),
   done => sub {
      my ($hdl, $error) = @_;
 
      delete $AnyEvent::MP::Global::ON_SETUP{$id};
      %{delete $STATE{$id}} = ();
 
      if (defined $hdl) {
         $cb->($hdl);

MP/Kernel.pm  view on Meta::CPAN

164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
our $DELAY_TIMER;
our @DELAY_QUEUE;
 
our $delay_run = sub {
   (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
};
 
sub delay($) {
   push @DELAY_QUEUE, shift;
   $DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
}
 
=item $AnyEvent::MP::Kernel::SRCNODE
 
During execution of a message callback, this variable contains the node ID
of the origin node.
 
The main use of this variable is for debugging output - there are probably
very few other cases where you need to know the source node ID.

MP/Kernel.pm  view on Meta::CPAN

516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
   if (@seeds) {
      # start connection attempt for every seed we are not connected to yet
      seed_connect $_
         for grep !exists $SEED_CONNECT{$_}, @seeds;
 
      $SEED_RETRY = $SEED_RETRY * 2;
      $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
         if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
 
      $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
 
   } else {
      # all seeds connected or connecting, no need to restart timer
      undef $SEED_WATCHER;
   }
}
 
sub seed_again {
   $SEED_RETRY = (1 + rand) * 0.6;
   $SEED_WATCHER ||= AE::timer 0, 0, \&seed_all;
}
 
# sets new seed list, starts connecting
sub set_seeds(@) {
   %SEED_NODE     = ();
   %NODE_SEED     = ();
   %SEED_CONNECT  = ();
 
   @SEED_NODE{@_} = ();

MP/Kernel.pm  view on Meta::CPAN

583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
                . (join " ", keys %KEEPALIVE_DOWN)
                . ".";
 
   (add_node $_)->connect
      for keys %KEEPALIVE_DOWN;
 
   $KEEPALIVE_RETRY = $KEEPALIVE_RETRY * 2;
   $KEEPALIVE_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
      if $KEEPALIVE_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
 
   $KEEPALIVE_WATCHER = AE::timer $KEEPALIVE_RETRY, 0, \&keepalive_all;
}
 
sub keepalive_again {
   $KEEPALIVE_RETRY = (1 + rand) * 0.3;
   keepalive_all;
}
 
sub keepalive_add {
   return if $KEEPALIVE{$_[0]}++;

MP/Node.pm  view on Meta::CPAN

123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
return if $self->{transport};
return if $self->{connect_w};
 
# we unweaken the node reference, in case it was weakened before
$AnyEvent::MP::Kernel::NODE{$self->{id}}
   = $AnyEvent::MP::Kernel::NODE{$self->{id}};
 
Scalar::Util::weaken $self;
 
$self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
   $self->transport_error (transport_error => $self->{id}, "connect timeout");
};
 
# maybe @$addresses?
my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};
 
if ($addresses) {
   $self->connect_to ($addresses);
} else {
   # on global nodes, all bets are off now - we either know the node, or we don't

MP/Node.pm  view on Meta::CPAN

165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
my $monitor  = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
 
$interval = ($monitor - $interval) / @$addresses
   if ($monitor - $interval) / @$addresses < $interval;
 
$interval = 0.4 if $interval < 0.4;
 
my @endpoints = reverse @$addresses;
 
$self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
   my $endpoint = pop @endpoints
      or return;
 
   AE::log 9 => "connecting to $self->{id} at $endpoint";
 
   $self->{trial}{$endpoint} ||= do {
      my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
         or return AE::log critical => "$self->{id}: '$endpoint' is not a resolved node reference.";
 
      AnyEvent::MP::Transport::mp_connect

MP/Node.pm  view on Meta::CPAN

243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
sub transport_reset {
   my ($self) = @_;
 
   Scalar::Util::weaken $self;
 
   $self->{send} = sub {
      if (++$DELAY > 0) {
         my $msg = $_[0];
         push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) };
         $DELAY_W ||= AE::timer 0, 0, $send_delayed;
         return;
      }
 
      local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
      AnyEvent::MP::Kernel::_inject (@{ $_[0] });
   };
}
 
sub transport_connect {
   my ($self, $tp) = @_;

MP/Node.pm  view on Meta::CPAN

265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
   AE::log 9 => "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})";
}
 
sub kill {
   my (undef, @args) = @_;
 
   # we _always_ delay kil's, to avoid calling mon callbacks
   # from anything but the event loop context.
   $DELAY = 1;
   push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) };
   $DELAY_W ||= AE::timer 0, 0, $send_delayed;
}
 
sub monitor {
   # maybe always delay, too?
   if ($DELAY_W) {
      my @args = @_;
      push @DELAY, sub { AnyEvent::MP::Kernel::_monitor (@args) };
      return;
   }
   &AnyEvent::MP::Kernel::_monitor;

README  view on Meta::CPAN

36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# monitoring
mon $port, $cb->(@msg)      # callback is invoked on death
mon $port, $localport       # kill localport on abnormal death
mon $port, $localport, @msg # send message on death
 
# temporarily execute code in port context
peval $port, sub { die "kill the port!" };
 
# execute callbacks in $SELF port context
my $timer = AE::timer 1, 0, psub {
   die "kill the port, delayed";
};
 
# distributed database - modification
db_set $family => $subkey [=> $value# add a subkey
db_del $family => $subkey...           # delete one or more subkeys
db_reg $family => $port [=> $value]    # register a port
 
# distributed database - queries
db_family $family => $cb->(\%familyhash)

README  view on Meta::CPAN

405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
    "rcv" callbacks, i.e. runtime errors will cause the port to get
    "kil"ed.
 
    The effect is basically as if it returned "sub { peval $SELF, sub {
    BLOCK }, @_ }".
 
    This is useful when you register callbacks from "rcv" callbacks:
 
       rcv delayed_reply => sub {
          my ($delay, @reply) = @_;
          my $timer = AE::timer $delay, 0, psub {
             snd @reply, $SELF;
          };
       };
 
$guard = mon $port, $rcvport # kill $rcvport when $port dies
$guard = mon $port # kill $SELF when $port dies
$guard = mon $port, $cb->(@reason) # call $cb when $port dies
$guard = mon $port, $rcvport, @msg # send a message when $port dies
    Monitor the given port and do something when the port is killed or
    messages to it were lost, and optionally return a guard that can be

README  view on Meta::CPAN

490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
    Example: send us a restart message when another $port is killed.
 
       mon $port, $self => "restart";
 
$guard = mon_guard $port, $ref, $ref...
    Monitors the given $port and keeps the passed references. When the
    port is killed, the references will be freed.
 
    Optionally returns a guard that will stop the monitoring.
 
    This function is useful when you create e.g. timers or other
    watchers and want to free them when the port gets killed (note the
    use of "psub"):
 
      $port->rcv (start => sub {
         my $timer; $timer = mon_guard $port, AE::timer 1, 1, psub {
            undef $timer if 0.9 < rand;
         });
      });
 
kil $port[, @reason]
    Kill the specified port with the given @reason.
 
    If no @reason is specified, then the port is killed "normally" -
    monitor callback will be invoked, but the kil will not cause linked
    ports ("mon $mport, $lport" form) to get killed.

bin/aemp  view on Meta::CPAN

659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
   delete $to{$node};
 
   @neigh = grep $_ ne $NODE, @neigh;
 
   print $node, " -> ", (join " ", @neigh), "\n";
 
   for my $neigh (@neigh) {
      unless ($seen{$neigh}++) {
         $cv->begin;
         $to{$neigh} = AE::timer 15, 0, sub {
            print "$neigh (timeout)\n";
            $exit = 1;
            $cv->end;
         };
         AnyEvent::MP::Kernel::eval_on $neigh, "AnyEvent::MP::Kernel::up_nodes" => $SELF => $neigh;
      }
   }
 
   $cv->end;
};

bin/aemp  view on Meta::CPAN

749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
   };
   $cv->recv;
}
 
sub node_eval {
   my ($node, $expr) = @_;
 
   init;
 
   my $cv = AE::cv;
   my $to = AE::timer 5, 0, sub { exit 1 };
   AnyEvent::MP::Kernel::eval_on $node, $expr, port { &$cv };
   mon $node, $cv;
 
   my ($err, @res) = $cv->recv;
 
   die "$err @res" if length $err;
 
   print +(substr JSON::XS->new->encode (\@res), 1, -1), "\n";
}
 
sub docmd;
 
our %CMD = (
   snd => sub {
      my $port = shift @ARGV;
      init;
 
      snd $port, @ARGV; @ARGV = ();
 
      my $cv = AE::cv;
      my $to = AE::timer 5, 0, sub { exit 1 };
      mon $port, $cv;
      my $reply = port sub { &$cv };
      snd node_of $port, snd => $reply, "message sent successfully";
 
      print join " ", $cv->recv, "\n";
   },
 
   cal => sub {
      my $port = shift @ARGV;
      init;



( run in 0.549 second using v1.01-cache-2.11-cpan-87723dcf8b7 )