Coro-MP

 view release on metacpan or  search on metacpan

README  view on Meta::CPAN

       spawn $NODE, "::initfunc";
       sub ::initfunc {
          rcv_async $SELF, sub {
             ...
          };
       }

       # simple "tag" receives:
       my ($pid) = get "pid", 30
          or die "no pid message received after 30s";

       # conditional receive
       my ($tag, @data) = get_cond { $_[0] =~ /^group1_/ };
       my @next_msg = get_cond { 1 } 30; # 30s timeout

       # run thread in port context
       peval_async $port, {
          die "kill the port\n";
       };

       # synchronous "cal"
       my @retval = syncol 30, $port, tag => $data;

DESCRIPTION
    This module (-family) implements a simple message passing framework.

    Despite its simplicity, you can securely message other processes running
    on the same or other hosts, and you can supervise entities remotely.

    This module depends heavily on AnyEvent::MP, in fact, many functions
    exported by this module are identical to AnyEvent::MP functions. This
    module family is simply the Coro API to AnyEvent::MP.

    Care has been taken to stay compatible with AnyEvent::MP, even if
    sometimes this required a less natural API ("spawn" should indeed spawn
    a thread, not just call an initfunc for example).

    For an introduction to AnyEvent::MP, see the AnyEvent::MP::Intro manual
    page.

VARIABLES/FUNCTIONS
    NODE, $NODE, node_of, configure
    $SELF, *SELF, SELF, %SELF, @SELF...
    snd, mon, kil, psub
        These variables and functions work exactly as in AnyEvent::MP, in
        fact, they are exactly the same functions, and are used in much the
        same way.

    rcv This function works exactly as "AnyEvent::MP::rcv", and is in fact
        compatible with Coro::MP ports. However, the canonical way to
        receive messages with Coro::MP is to use "get" or "get_cond".

    port
        This function is exactly the same as "AnyEvent::MP::port" and
        creates new ports. You can attach a thread to them by calling
        "rcv_async" or you can do a create and attach in one operation using
        "port_async".

    peval
        This function works exactly as "AnyEvent::MP::psub" - you could use
        it to run callbacks within a port context (good for monitoring), but
        you cannot "get" messages unless the callback executes within the
        thread attached to the port.

        Since creating a thread with port context requires somewhta annoying
        syntax, there is a "peval_async" function that handles that for you
        - note that within such a thread, you still cannot "get" messages.

    spawn
        This function is identical to "AnyEvent::MP::spawn". This means that
        it doesn't spawn a new thread as one would expect, but simply calls
        an init function. The init function, however, can attach a new
        thread easily:

           sub initfun {
              my (@args) = @_;

              rcv_async $SELF, sub {
                 # thread-code
              };
           }

    cal This function is identical to "AnyEvent::MP::cal". The easiest way
        to make a synchronous call is to use Coro's rouse functionality:

           # send 1, 2, 3 to $port and wait up to 30s for reply
           cal $port, 1, 2, 3, rouse_cb, 30;
           my @reply = rouse_wait;

        You can also use "syncal" if you want, and are ok with learning yet
        another function with a weird name:

           my @reply = syncal 30, $port, 1, 2, 3;

    $local_port = port_async { ... }
        Creates a new local port, and returns its ID. A new thread is
        created and attached to the port (see "rcv_async", below, for
        details).

    rcv_async $port, $threadcb
        This function creates and attaches a thread on a port. The thread is
        set to execute $threadcb and is put into the ready queue. The thread
        will receive all messages not filtered away by tagged receive
        callbacks (as set by "AnyEvent::MP::rcv") - it simply replaces the
        default callback of an AnyEvent::MP port.

        The special variable $SELF will be set to $port during thread
        execution.

        When $threadcb returns or the thread is canceled, the return/cancel
        values become the "kil" reason.

        It is not allowed to call "rcv_async" more than once on a given
        port.

    @msg = get $tag
    @msg = get $tag, $timeout
        Find, dequeue and return the next message with the specified $tag.
        If no matching message is currently queued, wait up to $timeout
        seconds (or forever if no $timeout has been specified or it is
        "undef") for one to arrive.

        Returns the message with the initial tag removed. In case of a
        timeout, the empty list. The function *must* be called in list
        context.

        Note that empty messages cannot be distinguished from a timeout when
        using "rcv".

        Example: send a "log" message to $SELF and then get and print it.

           snd $SELF, log => "text";
           my ($text) = get "log";
           print "log message: $text\n";

        Example: receive "p1" and "p2" messages, regardless of the order
        they arrive in on the port.

           my @p1 = get "p1";
           my @21 = get "p2";

        Example: assume a message with tag "now" is already in the queue and
        fetch it. If no message was there, do not wait, but die.

           my @msg = get "now", 0
              or die "expected now emssage to be there, but it wasn't";

    @msg = get_cond { condition... } [$timeout]
        Similarly to "get", looks for a matching message. Unlike "get",
        "matching" is not defined by a tag alone, but by a predicate, a
        piece of code that is executed on each candidate message in turn,
        with @_ set to the message contents.

        The predicate code is supposed to return the empty list if the
        message didn't match. If it returns anything else, then the message
        is removed from the queue and returned to the caller.

        In addition, if the predicate returns a code reference, then it is
        immediately called invoked on the removed message.

        If a $timeout is specified and is not "undef", then, after this many
        seconds have been passed without a matching message arriving, the
        empty list will be returned.



( run in 0.643 second using v1.01-cache-2.11-cpan-39bf76dae61 )