Coro
view release on metacpan or search on metacpan
Coro/Intro.pod view on Meta::CPAN
The newly created thread uses rand to wake up the main thread by
calling its C<ready> method - or not.
use Coro;
my $wakeme = $Coro::current;
async {
$wakeme->ready if 0.5 > rand;
};
schedule;
Now, when you run it, one of two things happen: Either the C<async> thread
wakes up the main thread again, in which case the program silently exits,
or it doesn't, in which case you get something like this:
FATAL: deadlock detected.
PID SC RSS USES Description Where
31976480 -C 19k 0 [main::] [program:9]
32223768 UC 12k 1 [Coro.pm:691]
32225088 -- 2068 1 [coro manager] [Coro.pm:691]
32225184 N- 216 0 [unblock_sub scheduler] -
Why is that? Well, when the C<async> thread runs into the end of its
block, it will be terminated (via a call to C<Coro::terminate>) and the
scheduler is called again. Since the C<async> thread hasn't woken up the
main thread, and there aren't any other threads, there is nothing to wake
up, and the program cannot continue. Since there I<are> threads that
I<could> be running (main) but none are I<ready> to do so, Coro signals a
I<deadlock> - no progress is possible. Usually you also get a listing of
all threads, which might help you track down the problem.
However, there is an important case where progress I<is>, in fact,
possible, despite no threads being ready - namely in an event-based
program. In such a program, some threads could wait for I<external>
events, such as a timeout, or some data to arrive on a socket.
Since a deadlock in such a case would not be very useful, there is a
module named L<Coro::AnyEvent> that integrates threads into an event
loop. It configures Coro in a way that, instead of C<die>ing with an error
message, it instead runs the event loop in the hope of receiving an event
that will wake up some thread.
=head2 Semaphores and other locks
Using only C<ready>, C<cede> and C<schedule> to synchronise threads is
difficult, especially if many threads are ready at the same time. Coro
supports a number of primitives to help synchronising threads in easier
ways. The first such primitives is L<Coro::Semaphore>, which implements
counting semaphores (binary semaphores are available as L<Coro::Signal>,
and there are L<Coro::SemaphoreSet> and L<Coro::RWLock> primitives as
well).
Counting semaphores, in a sense, store a count of resources. You can
remove/allocate/reserve a resource by calling the C<< ->down >> method,
which decrements the counter, and you can add or free a resource by
calling the C<< ->up >> method, which increments the counter. If the
counter is C<0>, then C<< ->down >> cannot decrement the semaphore - it is
locked - and the thread will wait until a count becomes available again.
Here is an example:
use Coro;
my $sem = new Coro::Semaphore 0; # a locked semaphore
async {
print "unlocking semaphore\n";
$sem->up;
};
print "trying to lock semaphore\n";
$sem->down;
print "we got it!\n";
This program creates a I<locked> semaphore (a semaphore with count C<0>)
and tries to lock it (by trying to decrement it's counter in the C<down>
method). Since the semaphore count is already exhausted, this will block
the main thread until the semaphore becomes available.
This yields the CPU to the only other read thread in the process,t he
one created with C<async>, which unlocks the semaphore (and instantly
terminates itself by returning).
Since the semaphore is now available, the main program locks it and
continues: "we got it!".
Counting semaphores are most often used to lock resources, or to exclude
other threads from accessing or using a resource. For example, consider
a very costly function (that temporarily allocates a lot of ram, for
example). You wouldn't want to have many threads calling this function at
the same time, so you use a semaphore:
my $lock = new Coro::Semaphore; # unlocked initially - default is 1
sub costly_function {
$lock->down; # acquire semaphore
# do costly operation that blocks
$lock->up; # unlock it
}
No matter how many threads call C<costly_function>, only one will run
the body of it, all others will wait in the C<down> call. If you want to
limit the number of concurrent executions to five, you could create the
semaphore with an initial count of C<5>.
Why does the comment mention an "operation the blocks"? Again, that's
because coro's threads are cooperative: unless C<costly_function>
willingly gives up the CPU, other threads of control will simply not
run. This makes locking superfluous in cases where the function itself
never gives up the CPU, but when dealing with the outside world, this is
rare.
Now consider what happens when the code C<die>s after executing C<down>,
but before C<up>. This will leave the semaphore in a locked state, which
often isn't what you want - imagine the caller expecting a failure and
wrapping the call into an C<eval {}>.
So normally you would want to free the lock again if execution somehow
leaves the function, whether "normally" or via an exception. Here the
C<guard> method proves useful:
my $lock = new Coro::Semaphore; # unlocked initially
sub costly_function {
my $guard = $lock->guard; # acquire guard
# do costly operation that blocks
}
The C<guard> method C<down>s the semaphore and returns a so-called guard
object. Nothing happens as long as there are references to it (i.e. it is
in scope somehow), but when all references are gone, for example, when
C<costly_function> returns or throws an exception, it will automatically
call C<up> on the semaphore, no way to forget it. Even when the thread
gets C<cancel>ed by another thread will the guard object ensure that the
lock is freed.
This concludes this introduction to semaphores and locks. Apart from
L<Coro::Semaphore> and L<Coro::Signal>, there is also a reader-writer lock
(L<Coro::RWLock>) and a semaphore set (L<Coro::SemaphoreSet>). All of
these come with their own manpage.
=head2 Channels
Semaphores are fine, but usually you want to communicate by exchanging
data as well. Of course, you can just use some locks, and array of sorts
and use that to communicate, but there is a useful abstraction for
communicaiton between threads: L<Coro::Channel>. Channels are the Coro
equivalent of a unix pipe (and very similar to AmigaOS message ports :) -
you can put stuff into it on one side, and read data from it on the other.
Here is a simple example that creates a thread and sends numbers to
it. The thread calculates the square of each number and puts that into
another channel, which the main thread reads the result from:
use Coro;
my $calculate = new Coro::Channel;
my $result = new Coro::Channel;
async {
# endless loop
while () {
my $num = $calculate->get; # read a number
$num **= 2; # square it
$result->put ($num); # put the result into the result queue
}
};
for (1, 2, 5, 10, 77) {
$calculate->put ($_);
print "$_ ** 2 = ", $result->get, "\n";
}
Gives:
1 ** 2 = 1
2 ** 2 = 4
5 ** 2 = 25
10 ** 2 = 100
77 ** 2 = 5929
( run in 1.089 second using v1.01-cache-2.11-cpan-39bf76dae61 )