Thread-Queue-Multiplex
view release on metacpan or search on metacpan
lib/Thread/Queue/Multiplex.pm view on Meta::CPAN
for the root thread).
=item *
adding a shared hash to hold the list of message ID's
mapped to a threads::shared array to containing
C<[message ID, flags, refcount, @params]>, where
C<flags> indicates the urgent and/or simplex status
of the request, and C<refcount> indicates the number
of subscribers assigned to the request. A special C<refcount>
value of -1 indicates that only the first subscriber to
retrieve/process the request should respond (to mimic the
behavior of L<Thread::Queueu::Duplex>), which is
specified by the publisher using any of the C<enqueue>
methods with a subscriber ID of -1.
=item *
adding a shared hash to hold the list of message ID's
mapped to a L<threads::shared> hash containing
a reference count of subscribers for the message,
and a map of subscriber IDs to their responses.
This "pending response" hash is used to accumulate
all subscriber responses; when the reference count of
a message is zero, the hash of responses is posted to
the final response message mapping hash.
=item *
adding a shared hash to hold the map of thread ID's to
subscriber ID's. B<Note:> Each thread can have only a single
subscriber.
=item *
changing the message mapping hash to map a unique message ID
to a hash of unique subscriber ID's, mapped to their response (if any),
i.e.,
$msg_map = {
$msgid => {
$subID1 => $subID1_response,
$subID2 => $subID2_response,
etc.
}
}
=item *
when the publisher dequeues the response to a message, it receives
a copy of the subscriber mapping hash, and is responsible for iterating
over the hash to read each subscriber's results
=back
A normal processing sequence for Thread::Queue::Multiplex might be:
#
# Thread A (the client):
#
...marshal parameters for a coroutine...
my $id = $tqm->publish('function_name', \@paramlist);
my $results = $tqm->dequeue_response($id);
while (($subID, $subresult) = each %$results) {
...process $results...
}
#
# Thread B (a subscriber):
#
while (1) {
my $call = $tqm->dequeue;
my ($id, $func, @params) = @$call;
$tqm->respond($id, $self->$func(@params));
}
=head1 FUNCTIONS AND METHODS
=over 4
=item $tqm = B<Thread::Queue::Multiplex-E<gt>new>([MaxPending => $limit])
Constructor. Creates a new empty queue.
If the C<MaxPending> value is a non-zero value, the number
of pending requests will be limited to C<$limit>, and any further
attempt to queue a request will block until the pending count
drops below C<$limit>. This limit may be applied or modified later
via the C<set_max_pending()> method (see below).
=item B<subscribe( >I<[ $subID ]>B< )> I<aka> B<listen()>
Subscribe to the queue. The listen() alias is provided for
compatibility with TQD apps. If C<$subID> is not provided, 1 plus
the current thread's TID is used as the subscriber ID.
Only a single subscriber per thread is permitted; undef will
be returned if the current thread already has a subscriber.
=item B<unsubscribe()> I<aka> B<ignore()>
Unsubscribe from the queue. The ignore() alias is provided for
compatibility with TQD apps. Note the subscriber for the current
thread is unsubscribed. I<Unsubscribing another thread is not
currently supported.>
=item @subIDs = $tqm->B<get_subscribers()>
Returns the current list of subscriber IDs.
=item $msgID = $tqm->B<publish(@request)>
L<enqueue>()s the C<@request> to all subscribers.
=item $results = $tqm->B<publish_and_wait(@request)>
Same as L<publish>, except that it waits for and returns
the response hash, rather than returning
immediately with the request ID.
=item $results = $tqm->B<publish_and_wait_until($timeout, @request)>
Same as L<publish>, except that it waits up to $timeout
seconds for all subscribers to respond, and returns the
( run in 0.717 second using v1.01-cache-2.11-cpan-524268b4103 )