Boost-Geometry-Utils
view release on metacpan or search on metacpan
src/boost/graph/distributed/detail/queue.ipp view on Meta::CPAN
{
if (!polling)
outgoing_buffers.reset(
new outgoing_buffers_t(num_processes(process_group)));
setup_triggers();
}
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
BOOST_DISTRIBUTED_QUEUE_TYPE::
distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
const Buffer& buffer, const UnaryPredicate& pred,
bool polling)
: process_group(process_group, attach_distributed_object()),
owner(owner),
buffer(buffer),
pred(pred),
polling(polling)
{
if (!polling)
outgoing_buffers.reset(
new outgoing_buffers_t(num_processes(process_group)));
setup_triggers();
}
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
BOOST_DISTRIBUTED_QUEUE_TYPE::
distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
const UnaryPredicate& pred, bool polling)
: process_group(process_group, attach_distributed_object()),
owner(owner),
pred(pred),
polling(polling)
{
if (!polling)
outgoing_buffers.reset(
new outgoing_buffers_t(num_processes(process_group)));
setup_triggers();
}
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
void
BOOST_DISTRIBUTED_QUEUE_TYPE::push(const value_type& x)
{
typename ProcessGroup::process_id_type dest = get(owner, x);
if (outgoing_buffers)
outgoing_buffers->at(dest).push_back(x);
else if (dest == process_id(process_group))
buffer.push(x);
else
send(process_group, get(owner, x), msg_push, x);
}
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
bool
BOOST_DISTRIBUTED_QUEUE_TYPE::empty() const
{
/* Processes will stay here until the buffer is nonempty or
synchronization with the other processes indicates that all local
buffers are empty (and no messages are in transit).
*/
while (buffer.empty() && !do_synchronize()) ;
return buffer.empty();
}
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
typename BOOST_DISTRIBUTED_QUEUE_TYPE::size_type
BOOST_DISTRIBUTED_QUEUE_TYPE::size() const
{
empty();
return buffer.size();
}
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
void BOOST_DISTRIBUTED_QUEUE_TYPE::setup_triggers()
{
using boost::graph::parallel::simple_trigger;
simple_trigger(process_group, msg_push, this,
&distributed_queue::handle_push);
simple_trigger(process_group, msg_multipush, this,
&distributed_queue::handle_multipush);
}
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
void
BOOST_DISTRIBUTED_QUEUE_TYPE::
handle_push(int /*source*/, int /*tag*/, const value_type& value,
trigger_receive_context)
{
if (pred(value)) buffer.push(value);
}
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
void
BOOST_DISTRIBUTED_QUEUE_TYPE::
handle_multipush(int /*source*/, int /*tag*/,
const std::vector<value_type>& values,
trigger_receive_context)
{
for (std::size_t i = 0; i < values.size(); ++i)
if (pred(values[i])) buffer.push(values[i]);
}
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
bool
BOOST_DISTRIBUTED_QUEUE_TYPE::do_synchronize() const
{
#ifdef PBGL_ACCOUNTING
++num_synchronizations;
#endif
using boost::parallel::all_reduce;
using std::swap;
typedef typename ProcessGroup::process_id_type process_id_type;
if (outgoing_buffers) {
// Transfer all of the push requests
process_id_type id = process_id(process_group);
process_id_type np = num_processes(process_group);
for (process_id_type dest = 0; dest < np; ++dest) {
outgoing_buffer_t& outgoing = outgoing_buffers->at(dest);
std::size_t size = outgoing.size();
if (size != 0) {
if (dest != id) {
send(process_group, dest, msg_multipush, outgoing);
} else {
for (std::size_t i = 0; i < size; ++i)
buffer.push(outgoing[i]);
}
outgoing.clear();
}
}
}
synchronize(process_group);
unsigned local_size = buffer.size();
unsigned global_size =
all_reduce(process_group, local_size, std::plus<unsigned>());
return global_size == 0;
}
} } } // end namespace boost::graph::distributed
( run in 0.602 second using v1.01-cache-2.11-cpan-39bf76dae61 )