Thread-Queue-Multiplex

 view release on metacpan or  search on metacpan

lib/Thread/Queue/Multiplex.pm  view on Meta::CPAN

for the root thread).

=item *

adding a shared hash to hold the list of message ID's
mapped to a threads::shared array to containing
C<[message ID, flags, refcount, @params]>, where
C<flags> indicates the urgent and/or simplex status
of the request, and C<refcount> indicates the number
of subscribers assigned to the request. A special C<refcount>
value of -1 indicates that only the first subscriber to
retrieve/process the request should respond (to mimic the
behavior of L<Thread::Queueu::Duplex>), which is
specified by the publisher using any of the C<enqueue>
methods with a subscriber ID of -1.

=item *

adding a shared hash to hold the list of message ID's
mapped to a L<threads::shared> hash containing
a reference count of subscribers for the message,
and a map of subscriber IDs to their responses.
This "pending response" hash is used to accumulate
all subscriber responses; when the reference count of
a message is zero, the hash of responses is posted to
the final response message mapping hash.

=item *

adding a shared hash to hold the map of thread ID's to
subscriber ID's. B<Note:> Each thread can have only a single
subscriber.

=item *

changing the message mapping hash to map a unique message ID
to a hash of unique subscriber ID's, mapped to their response (if any),
i.e.,

	$msg_map = {
		$msgid => {
			$subID1 => $subID1_response,
			$subID2 => $subID2_response,
			etc.
		}
	}

=item *

when the publisher dequeues the response to a message, it receives
a copy of the subscriber mapping hash, and is responsible for iterating
over the hash to read each subscriber's results

=back

A normal processing sequence for Thread::Queue::Multiplex might be:

	#
	#	Thread A (the client):
	#
		...marshal parameters for a coroutine...
		my $id = $tqm->publish('function_name', \@paramlist);
		my $results = $tqm->dequeue_response($id);
		while (($subID, $subresult) = each %$results) {
		...process $results...
		}
	#
	#	Thread B (a subscriber):
	#
		while (1) {
			my $call = $tqm->dequeue;
			my ($id, $func, @params) = @$call;
			$tqm->respond($id, $self->$func(@params));
		}

=head1 FUNCTIONS AND METHODS

=over 4

=item $tqm = B<Thread::Queue::Multiplex-E<gt>new>([MaxPending => $limit])

Constructor. Creates a new empty queue.
If the C<MaxPending> value is a non-zero value, the number
of pending requests will be limited to C<$limit>, and any further
attempt to queue a request will block until the pending count
drops below C<$limit>. This limit may be applied or modified later
via the C<set_max_pending()> method (see below).

=item B<subscribe( >I<[ $subID ]>B< )> I<aka> B<listen()>

Subscribe to the queue. The listen() alias is provided for
compatibility with TQD apps. If C<$subID> is not provided, 1 plus
the current thread's TID is used as the subscriber ID.
Only a single subscriber per thread is permitted; undef will
be returned if the current thread already has a subscriber.

=item B<unsubscribe()> I<aka> B<ignore()>

Unsubscribe from the queue. The ignore() alias is provided for
compatibility with TQD apps. Note the subscriber for the current
thread is unsubscribed. I<Unsubscribing another thread is not
currently supported.>

=item @subIDs = $tqm->B<get_subscribers()>

Returns the current list of subscriber IDs.

=item $msgID = $tqm->B<publish(@request)>

L<enqueue>()s the C<@request> to all subscribers.

=item $results = $tqm->B<publish_and_wait(@request)>

Same as L<publish>, except that it waits for and returns
the response hash, rather than returning
immediately with the request ID.

=item $results = $tqm->B<publish_and_wait_until($timeout, @request)>

Same as L<publish>, except that it waits up to $timeout
seconds for all subscribers to respond, and returns the



( run in 0.717 second using v1.01-cache-2.11-cpan-524268b4103 )