Async-Trampoline
view release on metacpan or search on metacpan
src/Scheduler.cpp view on Meta::CPAN
#include "Scheduler.h"
#include "CircularBuffer.h"
#include <unordered_set>
#ifndef ASYNC_TRAMPOLINE_SCHEDULER_DEBUG
#define ASYNC_TRAMPOLINE_SCHEDULER_DEBUG 0
#define LOG_DEBUG(...) do { } while (0)
#else
#define ASYNC_TRAMPOLINE_SCHEDULER_DEBUG 1
#define LOG_DEBUG(...) do { \
fprintf(stderr, "#DEBUG " __VA_ARGS__); \
fflush(stderr); \
} while (0)
#endif
// == The Impl Declaration ==
class Async_Trampoline_Scheduler::Impl {
CircularBuffer<AsyncRef> runnable_queue{};
std::unordered_set<Async const*> runnable_enqueued{};
// std::unordered_multimap<Async const*, AsyncRef> blocked{};
public:
Impl(size_t initial_capacity);
~Impl();
auto queue_size() const -> size_t { return runnable_queue.size(); }
void enqueue(AsyncRef async);
AsyncRef dequeue();
void block_on(Async& dependency_async, AsyncRef blocked_async);
void complete(Async& async);
};
// == The Public C++ Interface ==
Async_Trampoline_Scheduler::Async_Trampoline_Scheduler(size_t initial_capacity)
: m_impl{new Async_Trampoline_Scheduler::Impl{initial_capacity}}
{}
Async_Trampoline_Scheduler::~Async_Trampoline_Scheduler() = default;
auto Async_Trampoline_Scheduler::queue_size() const -> size_t
{
return m_impl->queue_size();
}
auto Async_Trampoline_Scheduler::enqueue(AsyncRef async) -> void
{
m_impl->enqueue(std::move(async));
}
auto Async_Trampoline_Scheduler::dequeue() -> AsyncRef
{
return m_impl->dequeue();
}
auto Async_Trampoline_Scheduler::block_on(
Async& dependency_async, AsyncRef blocked_async) -> void
{
m_impl->block_on(dependency_async, std::move(blocked_async));
}
auto Async_Trampoline_Scheduler::complete(Async& async) -> void
{
m_impl->complete(async);
}
// == The Impl implementation ==
#define SCHEDULER_RUNNABLE_QUEUE_FORMAT \
"Scheduler { " \
"queue={ start=%zu size=%ld storage.size=%ld } " \
"runnable_enqueued=%zu " \
"}"
#define SCHEDULER_RUNNABLE_QUEUE_FORMAT_ARGS(self) \
(self).runnable_queue._internal_start(), \
(self).runnable_queue.size(), \
(self).runnable_queue.capacity(), \
(self).runnable_enqueued.size()
Async_Trampoline_Scheduler::Impl::Impl(
size_t initial_capacity)
{
runnable_queue.grow(initial_capacity);
}
Async_Trampoline_Scheduler::Impl::~Impl()
{
LOG_DEBUG(
"clearing queue: " SCHEDULER_RUNNABLE_QUEUE_FORMAT "\n",
SCHEDULER_RUNNABLE_QUEUE_FORMAT_ARGS(*this));
}
void Async_Trampoline_Scheduler::Impl::enqueue(AsyncRef async)
{
LOG_DEBUG("enqueueing %p into " SCHEDULER_RUNNABLE_QUEUE_FORMAT ": " ASYNC_FORMAT "\n",
async.decay(),
SCHEDULER_RUNNABLE_QUEUE_FORMAT_ARGS(*this),
ASYNC_FORMAT_ARGS(async.decay()));
if (runnable_enqueued.find(async.decay()) != runnable_enqueued.end())
{
LOG_DEBUG("enqueuing skieeped because already in queue\n");
return;
}
Async* key = async.decay();
runnable_queue.enq(std::move(async));
runnable_enqueued.insert(key);
LOG_DEBUG(
" '-> " SCHEDULER_RUNNABLE_QUEUE_FORMAT "\n",
SCHEDULER_RUNNABLE_QUEUE_FORMAT_ARGS(*this));
}
void Async_Trampoline_Scheduler::Impl::block_on(
Async& dependency_async, AsyncRef blocked_async)
{
LOG_DEBUG(
"dependency of " ASYNC_FORMAT " on " ASYNC_FORMAT "\n",
ASYNC_FORMAT_ARGS(blocked_async.decay()),
ASYNC_FORMAT_ARGS(&dependency_async));
dependency_async.add_blocked(std::move(blocked_async));
}
AsyncRef Async_Trampoline_Scheduler::Impl::dequeue()
{
assert(runnable_queue.size());
AsyncRef async = runnable_queue.deq();
LOG_DEBUG(
"dequeue %p from " SCHEDULER_RUNNABLE_QUEUE_FORMAT "\n",
async.decay(),
SCHEDULER_RUNNABLE_QUEUE_FORMAT_ARGS(*this));
auto entry = runnable_enqueued.find(async.decay());
if (entry == runnable_enqueued.end())
{
assert(0 /* dequeued an entry that was not registered in the enqueued set! */);
}
runnable_enqueued.erase(entry);
return async;
}
void Async_Trampoline_Scheduler::Impl::complete(Async& async)
{
LOG_DEBUG("completing %p\n", &async);
LOG_DEBUG(" '-> %zu dependencies\n", async.blocked.size());
if (async.blocked.size() == 0)
return;
for (auto& ref : async.blocked)
enqueue(std::move(ref));
async.blocked.clear();
if (async.type == Async_Type::IS_PTR
&& async.has_category(Async_Type::CATEGORY_COMPLETE))
{
complete(async.as_ptr.get());
}
}
( run in 0.943 second using v1.01-cache-2.11-cpan-13bb782fe5a )