IPC-MPS
view release on metacpan or search on metacpan
IPC/MPS/Event.pm view on Meta::CPAN
@rcv = ();
%r_bufs = ();
%w_bufs = ();
%pack = ();
%unpack = ();
%closed = ();
%fh2ww = ();
($waited_vpid, $waited_msg, @waited_rv) = ();
$self_parent_fh = $parent;
$self_parent_vpid = $self_vpid;
$self_vpid = $vpid;
$fh2vpid{$self_parent_fh} = $self_parent_vpid;
$vpid2fh{$self_parent_vpid} = $self_parent_fh;
$fh2fh{$self_parent_fh} = $self_parent_fh;
Event->io(fd => $self_parent_fh, poll => "r", cb => \&r_event_cb);
$spawn->();
exit;
}
else {
$vpid2pid{$vpid} = $kid_pid;
}
}
foreach (@spawn) {
my ($vpid, $child, $parent, $spawn, $receive) = @$_;
close $parent;
$fh2vpid{$child} = $vpid;
$vpid2fh{$vpid} = $child;
$fh2fh{$child} = $child;
Event->io(fd => $child, poll => "r", cb => \&r_event_cb);
}
@spawn = ();
$receive->();
unless ($ipc_loop) {
$ipc_loop = 1;
w_event_cb_reg();
Event::loop();
$ipc_loop = 0;
}
}
sub wt($$) {
($waited_vpid, $waited_msg) = @_;
defined $waited_vpid or carp("Argument vpid required"), return;
defined $waited_msg or carp("Argument msg required"), return;
$waited_vpid = $self_parent_vpid if $waited_vpid == 0;
foreach my $i (0 .. $#rcv) {
my ($from, $msg, $args)= @{$rcv[$i]};
if ($from eq $waited_vpid and $msg eq $waited_msg) {
splice @rcv, $i, 1;
return wantarray ? @$args : $$args[0];
}
}
$DEBUG and print "Start waiting for '$waited_vpid -> $waited_msg' in $self_vpid (\$\$=$$)\n";
w_event_cb_reg();
Event::loop();
my @rv = @waited_rv;
($waited_vpid, $waited_msg, @waited_rv) = ();
return wantarray ? @rv : $rv[0];
}
sub w_event_cb_reg {
my ($to_vpid) = @_;
foreach my $to (defined $to_vpid ? $to_vpid : keys %snd) {
if (@{$snd{$to}}) {
my $fh = $vpid2fh{$to};
unless ($fh) {
if (@spawn) {
carp "Probably have forgotten to call receive." if not defined $to_vpid;
next;
} else {
if ($self_parent_fh) {
unless ($self_parent_closed) {
$fh = $self_parent_fh;
} else {
next;
}
} else {
carp "The addressee $to is unknown or has left in $self_vpid (\$\$=$$)\n";
next;
}
}
}
unless (exists $w_bufs{$fh}) {
my $packet;
if (my $pack = $pack{$fh}) {
$packet = $pack->(shift @{$snd{$to}});
} else {
$packet = freeze shift @{$snd{$to}};
}
my $buf = join "", pack("N", length $packet), $packet;
$w_bufs{$fh} = $buf;
$DEBUG and (@{$snd{$to}} or delete $snd{$to});
$fh2ww{$fh} = Event->io(fd => $fh, poll => "w", cb => \&w_event_cb);
}
}
}
}
( run in 0.938 second using v1.01-cache-2.11-cpan-bbb979687b5 )