IPC-MPS

 view release on metacpan or  search on metacpan

IPC/MPS/Event.pm  view on Meta::CPAN


			@rcv    = ();
			%r_bufs = ();
			%w_bufs = ();

			%pack   = ();
			%unpack = ();

			%closed = ();

			%fh2ww = ();

			($waited_vpid, $waited_msg, @waited_rv) = ();

			$self_parent_fh   = $parent;
			$self_parent_vpid = $self_vpid;

			$self_vpid        = $vpid;

			$fh2vpid{$self_parent_fh}   = $self_parent_vpid;
			$vpid2fh{$self_parent_vpid} = $self_parent_fh;
			$fh2fh{$self_parent_fh}     = $self_parent_fh;

			Event->io(fd => $self_parent_fh, poll => "r", cb => \&r_event_cb);

			$spawn->();

			exit;
		}
		else {
			$vpid2pid{$vpid} = $kid_pid;
		}
	}


	foreach (@spawn) {
		my ($vpid, $child, $parent, $spawn, $receive) = @$_;
		close $parent;
		$fh2vpid{$child} = $vpid;
		$vpid2fh{$vpid}  = $child;
		$fh2fh{$child}   = $child;
		Event->io(fd => $child, poll => "r", cb => \&r_event_cb);
	}
	@spawn = ();



	$receive->();



	unless ($ipc_loop) {
		$ipc_loop = 1;
		w_event_cb_reg();
		Event::loop();
		$ipc_loop = 0;
	}
}


sub wt($$) {
	($waited_vpid, $waited_msg) = @_;
	defined $waited_vpid or carp("Argument vpid required"), return;
	defined $waited_msg  or carp("Argument msg required"),  return;
	$waited_vpid = $self_parent_vpid if $waited_vpid == 0;
	foreach my $i (0 .. $#rcv) {
		my ($from, $msg, $args)= @{$rcv[$i]};
		if ($from eq $waited_vpid and $msg eq $waited_msg) {
			splice @rcv, $i, 1;
			return wantarray ? @$args : $$args[0];
		}
	}
	$DEBUG and print "Start waiting for '$waited_vpid -> $waited_msg' in $self_vpid (\$\$=$$)\n";
	w_event_cb_reg();
	Event::loop();
	my @rv = @waited_rv;
	($waited_vpid, $waited_msg, @waited_rv) = ();
	return wantarray ? @rv : $rv[0];
}


sub w_event_cb_reg {
	my ($to_vpid) = @_;

		foreach my $to (defined $to_vpid ? $to_vpid : keys %snd) {
			if (@{$snd{$to}}) {
				my $fh = $vpid2fh{$to};
				unless ($fh) {
					if (@spawn) {
						carp "Probably have forgotten to call receive." if not defined $to_vpid;
						next;
					} else {
						if ($self_parent_fh) {
							unless ($self_parent_closed) {
								$fh = $self_parent_fh;
							} else {
								next;
							}
						} else {
							carp "The addressee $to is unknown or has left in $self_vpid (\$\$=$$)\n";
							next;
						}
					}
				}
				unless (exists $w_bufs{$fh}) {
					my $packet;
					if (my $pack = $pack{$fh}) {
						$packet = $pack->(shift @{$snd{$to}});
					} else {
						$packet = freeze  shift @{$snd{$to}};
					}
					my $buf = join "", pack("N", length $packet), $packet;
					$w_bufs{$fh} = $buf;
					$DEBUG and (@{$snd{$to}} or delete $snd{$to});
					$fh2ww{$fh} = Event->io(fd => $fh, poll => "w", cb => \&w_event_cb);
				}
			}
		}
}




( run in 0.938 second using v1.01-cache-2.11-cpan-bbb979687b5 )