AnyEvent-MP

 view release on metacpan or  search on metacpan

MP.pm  view on Meta::CPAN

220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
   port rcv mon mon_guard psub peval spawn cal
   db_set db_del db_reg
   db_mon db_family db_keys db_values
 
   after
);
 
our $SELF;
 
sub _self_die() {
   my $msg = $@;
   $msg =~ s/\n+$// unless ref $msg;
   kil $SELF, die => $msg;
}
 
=item $thisnode = NODE / $NODE
 
The C<NODE> function returns, and the C<$NODE> variable contains, the node
ID of the node running in the current process. This value is initialised by
a call to C<configure>.

MP.pm  view on Meta::CPAN

410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
If you want to stop/destroy the port, simply C<kil> it:
 
   my $port = port {
      my @msg = @_;
      ...
      kil $SELF;
   };
 
=cut
 
sub rcv($@);
 
my $KILME = sub {
   (my $tag = substr $_[0], 0, 30) =~ s/([^\x20-\x7e])/./g;
   kil $SELF, unhandled_message => "no callback found for message '$tag'";
};
 
sub port(;&) {
   my $id = $UNIQ . ++$ID;
   my $port = "$NODE#$id";
 
   rcv $port, shift || $KILME;
 
   $port
}
 
=item rcv $local_port, $callback->(@msg)

MP.pm  view on Meta::CPAN

477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
(e.g. for an rpc reply) and unregister it after a message was received.
 
   rcv $port, $otherport => sub {
      my @reply = @_;
 
      rcv $SELF, $otherport;
   };
 
=cut
 
sub rcv($@) {
   my $port = shift;
   my ($nodeid, $portid) = split /#/, $port, 2;
 
   $nodeid eq $NODE
      or Carp::croak "$port: rcv can only be called on local ports, caught";
 
   while (@_) {
      if (ref $_[0]) {
         if (my $self = $PORT_DATA{$portid}) {
            "AnyEvent::MP::Port" eq ref $self

MP.pm  view on Meta::CPAN

554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
   my $port = port { ... };
 
   peval $port, sub {
      init
         or die "unable to init";
   };
 
=cut
 
sub peval($$) {
   local $SELF = shift;
   my $cb = shift;
 
   if (wantarray) {
      my @res = eval { &$cb };
      _self_die if $@;
      @res
   } else {
      my $res = eval { &$cb };
      _self_die if $@;

MP.pm  view on Meta::CPAN

589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
   rcv delayed_reply => sub {
      my ($delay, @reply) = @_;
      my $timer = AE::timer $delay, 0, psub {
         snd @reply, $SELF;
      };
   };
 
=cut
 
sub psub(&) {
   my $cb = shift;
 
   my $port = $SELF
      or Carp::croak "psub can only be called from within rcv or psub callbacks, not";
 
   sub {
      local $SELF = $port;
 
      if (wantarray) {
         my @res = eval { &$cb };

MP.pm  view on Meta::CPAN

829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
   my $init = shift;
 
   # rcv will create the actual port
   local $SELF = "$NODE#$port";
   eval {
      &{ load_func $init }
   };
   _self_die if $@;
}
 
sub spawn(@) {
   my ($nodeid, undef) = split /#/, shift, 2;
 
   my $id = $RUNIQ . ++$ID;
 
   $_[0] =~ /::/
      or Carp::croak "spawn init function must be a fully-qualified name, caught";
 
   snd_to_func $nodeid, "AnyEvent::MP::_spawn" => $id, @_;
 
   "$nodeid#$id"

MP.pm  view on Meta::CPAN

856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
Either sends the given message, or call the given callback, after the
specified number of seconds.
 
This is simply a utility function that comes in handy at times - the
AnyEvent::MP author is not convinced of the wisdom of having it, though,
so it may go away in the future.
 
=cut
 
sub after($@) {
   my ($timeout, @action) = @_;
 
   my $t; $t = AE::timer $timeout, 0, sub {
      undef $t;
      ref $action[0]
         ? $action[0]()
         : snd @action;
   };
}

MP.pm  view on Meta::CPAN

892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
If no time-out is given (or it is C<undef>), then the local port will
monitor the remote port instead, so it eventually gets cleaned-up.
 
Currently this function returns the temporary port, but this "feature"
might go in future versions unless you can make a convincing case that
this is indeed useful for something.
 
=cut
 
sub cal(@) {
   my $timeout = ref $_[-1] ? undef : pop;
   my $cb = pop;
 
   my $port = port {
      undef $timeout;
      kil $SELF;
      &$cb;
   };
 
   if (defined $timeout) {

MP.pm  view on Meta::CPAN

1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
If C<$value> is missing, C<undef> is used. If C<$port> is missing, then
C<$SELF> is used.
 
This function is most useful to register a port in some port group (which
is just another name for a database family), and have it removed when the
port is gone. This works best when the port is a local port.
 
=cut
 
sub db_reg($$;$) {
   my $family = shift;
   my $port = @_ ? shift : $SELF;
 
   my $clr = sub { db_del $family => $port };
   mon $port, $clr;
 
   db_set $family => $port => $_[0];
 
   defined wantarray
      and &Guard::guard ($clr)

MP/Config.pm  view on Meta::CPAN

57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
   unlink "$CONFIG_FILE~";
   link $CONFIG_FILE, "$CONFIG_FILE~";
   rename "$CONFIG_FILE~new~", $CONFIG_FILE
      or Carp::croak "$CONFIG_FILE: $!";
}
 
sub config {
   \%CFG
}
 
sub _find_profile($);
sub _find_profile($) {
   my ($name) = @_;
 
   if (defined $name) {
      my $profile = $CFG{profile}{$name};
      return _find_profile $profile->{parent}, %$profile;
   } else {
      return %CFG;
   }
}
 
sub find_profile($;%) {
   my ($name, %kv) = @_;
 
   my $norc  = delete $kv{norc};
   my $force = delete $kv{force};
 
   %kv = (
      monitor_timeout  => 30,
      connect_interval => 2,
      framing_format   => [qw(cbor json storable)], # framing types we offer and accept, in order of preference
      auth_offer       => [qw(tls_sha3_512 hmac_sha3_512)], # what we will send

MP/DataConn.pm  view on Meta::CPAN

151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
      warn "connection established, wait for a line...\n"
 
      $hdl->push_read (line => sub {
         warn "received a line: $_[1]\n";
         undef $hdl;
      });
   }
 
=cut
 
sub connect_to($$$$@) {
   my $cb = pop;
   my ($node, $timeout, $initfunc, @initdata) = @_;
 
   my $port = $SELF
      or Carp::croak "AnyEvent::MP::DataConn::connect_to must be called in port context";
 
   $node = node_of $node;
 
   my $id = (++$ID) . "\@$NODE";

MP/Global.pm  view on Meta::CPAN

135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
   # tell subscribers we have changed the family
   if (%$set || %local_set || @del_local) {
      @$set{keys %local_set} = values %local_set;
 
      snd $_ => g_chg2 => $family, $set, \@del_local
         for keys %{ $GLOBAL_MON{$family} };
   }
}
 
# set the whole (node-local) database - previous value must be empty
sub g_set($$) {
   my ($node, $db) = @_;
 
   while (my ($f, $k) = each %$db) {
      g_upd $node, $f, $k;
   }
}
 
# delete all keys from a database
sub g_clr($) {
   my ($node) = @_;
 
   my $db = $LOCAL_DBS{$node};
 
   while (my ($f, $k) = each %$db) {
      g_upd $node, $f, undef, [keys %$k];
   }
 
   delete $LOCAL_DBS{$node};
}

MP/Global.pm  view on Meta::CPAN

213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
   snd $SRCNODE, g_reply => $id, [keys %{ $GLOBAL_DB{$family} } ];
};
 
$NODE_REQ{g_db_values} = sub {
   my ($family, $id) = @_;
   snd $SRCNODE, g_reply => $id, [values %{ $GLOBAL_DB{$family} } ];
};
 
# monitoring
 
sub g_disconnect($) {
   my ($node) = @_;
 
   delete $GLOBAL_NODE{$node}; # also done in Kernel.pm, but doing it here avoids overhead
 
   db_del "'g" => $node;
   db_del "'l" => $node;
   g_clr $node;
 
   if (my $mon = delete $GLOBAL_SLAVE{$node}) {
      while (my ($f, $fv) = each %$mon) {

MP/Global.pm  view on Meta::CPAN

335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
};
 
#############################################################################
# compatibility functions for aemp 1.0
 
 
our @EXPORT = qw(grp_reg grp_get grp_mon);
 
sub grp_reg($$) {
   &db_reg
}
 
sub grp_get($) {
   my @ports = keys %{ $AnyEvent::MP::Kernel::GLOBAL_DB{$_[0]} };
 
   @ports ? \@ports : undef
}
 
sub grp_mon($$) {
   my ($grp, $cb) = @_;
 
   db_mon $grp => sub {
      my ($ports, $add, $chg, $del) = @_;
 
      $cb->([keys %$ports], $add, $del);
   };
}
 
=head1 SEE ALSO

MP/Kernel.pm  view on Meta::CPAN

65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
);
 
our @EXPORT = qw(
   snd_to_func snd_on eval_on
   port_is_local
   up_nodes mon_nodes node_is_up
);
 
our @CARP_NOT = (AnyEvent::MP::);
 
sub load_func($) {
   my $func = $_[0];
 
   unless (defined &$func) {
      my $pkg = $func;
      do {
         $pkg =~ s/::[^:]+$//
            or return sub { die "unable to resolve function '$func'" };
 
         local $@;
         unless (eval "require $pkg; 1") {

MP/Kernel.pm  view on Meta::CPAN

88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
               or return sub { die $error };
         }
      } until defined &$func;
   }
 
   \&$func
}
 
my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
 
sub nonce($) {
   join "", map chr rand 256, 1 .. $_[0]
}
 
sub nonce62($) {
   join "", map $alnum[rand 62], 1 .. $_[0]
}
 
our $CONFIG; # this node's configuration
our $SECURE;
 
our $RUNIQ; # remote uniq value
our $UNIQ# per-process/node unique cookie
our $NODE;
our $ID = "a";

MP/Kernel.pm  view on Meta::CPAN

139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
      ) . nonce62 4
   ;
 
   # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
   $RUNIQ = nonce62 10;
   $RUNIQ =~ s/(.)$/\U$1/;
 
   $NODE = "";
}
 
sub NODE() {
   $NODE
}
 
sub node_of($) {
   my ($node, undef) = split /#/, $_[0], 2;
 
   $node
}
 
BEGIN {
   *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
      ? sub () { 1 }
      : sub () { 0 };
}
 
our $DELAY_TIMER;
our @DELAY_QUEUE;
 
our $delay_run = sub {
   (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
};
 
sub delay($) {
   push @DELAY_QUEUE, shift;
   $DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
}
 
=item $AnyEvent::MP::Kernel::SRCNODE
 
During execution of a message callback, this variable contains the node ID
of the origin node.
 
The main use of this variable is for debugging output - there are probably

MP/Kernel.pm  view on Meta::CPAN

197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
      my ($node) = @_;
 
      length $node
         or Carp::croak "'undef' or the empty string are not valid node/port IDs";
 
      # registers itself in %NODE
      new AnyEvent::MP::Node::Remote $node
   }
}
 
sub snd(@) {
   my ($nodeid, $portid) = split /#/, shift, 2;
 
   warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;
 
   ($NODE{$nodeid} || add_node $nodeid)
      ->{send} (["$portid", @_]);
}
 
sub port_is_local($) {
   my ($nodeid, undef) = split /#/, $_[0], 2;
 
   $nodeid eq $NODE
}
 
=item snd_to_func $node, $func, @args
 
Expects a node ID and a name of a function. Asynchronously tries to call
this function with the given arguments on that node.
 
This function can be used to implement C<spawn>-like interfaces.
 
=cut
 
sub snd_to_func($$;@) {
   my $nodeid = shift;
 
   # on $NODE, we artificially delay... (for spawn)
   # this is very ugly - maybe we should simply delay ALL messages,
   # to avoid deep recursion issues. but that's so... slow...
   $AnyEvent::MP::Node::Self::DELAY = 1
      if $nodeid ne $NODE;
 
   ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
}
 
=item snd_on $node, @msg
 
Executes C<snd> with the given C<@msg> (which must include the destination
port) on the given node.
 
=cut
 
sub snd_on($@) {
   my $node = shift;
   snd $node, snd => @_;
}
 
=item eval_on $node, $string[, @reply]
 
Evaluates the given string as Perl expression on the given node. When
@reply is specified, then it is used to construct a reply message with
C<"$@"> and any results from the eval appended.
 
=cut
 
sub eval_on($$;@) {
   my $node = shift;
   snd $node, eval => @_;
}
 
sub kil(@) {
   my ($nodeid, $portid) = split /#/, shift, 2;
 
   length $portid
      or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
 
   ($NODE{$nodeid} || add_node $nodeid)
      ->kill ("$portid", @_);
}
 
#############################################################################

MP/Kernel.pm  view on Meta::CPAN

282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
Returns true if the given node is "up", that is, the kernel thinks it has
a working connection to it.
 
More precisely, if the node is up, returns C<1>. If the node is currently
connecting or otherwise known but not connected, returns C<0>. If nothing
is known about the node, returns C<undef>.
 
=cut
 
sub node_is_up($) {
   ($_[0] eq $NODE) || ($NODE{$_[0]} or return)->{transport}
      ? 1 : 0
}
 
=item @nodes = up_nodes
 
Return the node IDs of all nodes that are currently connected (excluding
the node itself).
 
=cut
 
sub up_nodes() {
   map $_->{id}, grep $_->{transport}, values %NODE
}
 
=item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
 
Registers a callback that is called each time a node goes up (a connection
is established) or down (the connection is lost).
 
Node up messages can only be followed by node down messages for the same
node, and vice versa.

MP/Kernel.pm  view on Meta::CPAN

325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
Example: make sure you call function C<newnode> for all nodes that are up
or go up (and down).
 
   newnode $_, 1 for up_nodes;
   mon_nodes \&newnode;
 
=cut
 
our %MON_NODES;
 
sub mon_nodes($) {
   my ($cb) = @_;
 
   $MON_NODES{$cb+0} = $cb;
 
   defined wantarray
      and Guard::guard { delete $MON_NODES{$cb+0} }
}
 
sub _inject_nodeevent($$;@) {
   my ($node, $up, @reason) = @_;
 
   AE::log 7 => "$node->{id} is " . ($up ? "up." : "down (@reason).");
 
   for my $cb (values %MON_NODES) {
      eval { $cb->($node->{id}, $up, @reason); 1 }
         or AE::log die => $@;
   }
}

MP/Kernel.pm  view on Meta::CPAN

530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
      undef $SEED_WATCHER;
   }
}
 
sub seed_again {
   $SEED_RETRY = (1 + rand) * 0.6;
   $SEED_WATCHER ||= AE::timer 0, 0, \&seed_all;
}
 
# sets new seed list, starts connecting
sub set_seeds(@) {
   %SEED_NODE     = ();
   %NODE_SEED     = ();
   %SEED_CONNECT  = ();
 
   @SEED_NODE{@_} = ();
 
   seed_again;
}
 
# normal nodes only record global node connections

MP/Kernel.pm  view on Meta::CPAN

774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
   &{ $NODE_REQ{g_slave} }
};
 
#############################################################################
# local database operations
 
# canonical probably not needed
our $sv_eq_coder = JSON::XS->new->utf8->allow_nonref;
 
# are the two scalars equal? very very ugly and slow, need better way
sub sv_eq($$) {
   ref $_[0] || ref $_[1]
      ? (JSON::XS::encode $sv_eq_coder, $_[0]) eq (JSON::XS::encode $sv_eq_coder, $_[1])
      : $_[0] eq $_[1]
        && defined $_[0] == defined $_[1]
}
 
# local database management
 
sub db_del($@) {
   my $family = shift;
 
   my @del = grep exists $LOCAL_DB{$family}{$_}, @_;
 
   return unless @del;
 
   delete @{ $LOCAL_DB{$family} }{@del};
   snd $MASTER, g_upd => $family => undef, \@del
      if defined $MASTER;
}
 
sub db_set($$;$) {
   my ($family, $subkey) = @_;
 
#   if (ref $_[1]) {
#      # bulk
#      my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
#      $LOCAL_DB{$_[0]} = $_[1];
#      snd $MASTER, g_upd => $_[0] => $_[1], \@del
#         if defined $MASTER;
#   } else {
      # single-key

MP/Kernel.pm  view on Meta::CPAN

839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
sub db_values {
   my ($family, $cb) = @_;
   global_call g_db_values => $family, $cb;
}
 
# database monitoring
 
our %LOCAL_MON; # f, reply
our %MON_DB;    # f, k, value
 
sub db_mon($@) {
   my ($family, $cb) = @_;
 
   if (my $db = $MON_DB{$family}) {
      # we already monitor, so create a "dummy" change event
      # this is postponed, which might be too late (we could process
      # change events), so disable the callback at first
      $LOCAL_MON{$family}{$cb+0} = sub { };
      AE::postpone {
         return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already

MP/Kernel.pm  view on Meta::CPAN

935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
};
 
#############################################################################
# configure
 
sub nodename {
   require POSIX;
   (POSIX::uname ())[1]
}
 
sub _resolve($) {
   my ($nodeid) = @_;
 
   my $cv = AE::cv;
   my @res;
 
   $cv->begin (sub {
      my %seen;
      my @refs;
      for (sort { $a->[0] <=> $b->[0] } @res) {
         push @refs, $_->[1] unless $seen{$_->[1]}++

MP/Kernel.pm  view on Meta::CPAN

1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
   }
 
   $cv->end;
 
   $cv
}
 
our @POST_CONFIGURE;
 
# not yet documented
sub post_configure(&) {
   die "AnyEvent::MP::Kernel::post_configure must be called in void context" if defined wantarray;
 
   push @POST_CONFIGURE, @_;
   (shift @POST_CONFIGURE)->() while $NODE && @POST_CONFIGURE;
}
 
sub configure(@) {
   unshift @_, "profile" if @_ & 1;
   my (%kv) = @_;
 
   my $profile = delete $kv{profile};
 
   $profile = nodename
      unless defined $profile;
 
   $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;

MP/Transport.pm  view on Meta::CPAN

54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
Creates a listener on the given host/port using
C<AnyEvent::Socket::tcp_server>.
 
See C<new>, below, for constructor arguments.
 
Defaults for peerhost, peerport and fh are provided.
 
=cut
 
sub mp_server($$;%) {
   my ($host, $port, %arg) = @_;
 
   AnyEvent::Socket::tcp_server $host, $port, sub {
      my ($fh, $host, $port) = @_;
 
      my $tp = new AnyEvent::MP::Transport
         fh       => $fh,
         peerhost => $host,
         peerport => $port,
         %arg,

MP/Transport.pm  view on Meta::CPAN

113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
      on_connect => sub { successful-connect-callback },
      greeting   => { key => value },
 
      # tls support
      tls_ctx    => AnyEvent::TLS,
      peername   => $peername, # for verification
   ;
 
=cut
 
sub hmac_sha3_512_hex($$) {
   Digest::HMAC::hmac_hex $_[1], $_[0], \&Digest::SHA3::sha3_512, 72
}
 
sub new {
   my ($class, %arg) = @_;
 
   my $self = bless \%arg, $class;
 
   {
      Scalar::Util::weaken (my $self = $self);



( run in 0.274 second using v1.01-cache-2.11-cpan-94b05bcf43c )