view release on metacpan or search on metacpan
lib/Async/Trampoline/Scheduler.pm view on Meta::CPAN
=head2 new
$scheduler = Async::Trampoline::Scheduler->new
Create a new scheduler.
=head2 enqueue
$scheduler->enqueue($task)
$scheduler->enqueue($task => @blocked)
Add a I<$task> to the runnable queue.
B<$task>:
A task to be scheduled at some point in the future.
B<@blocked>:
Any number of tasks that depend on the I<$task>.
The blocked tasks will be added to the runnable queue
once the I<$task> is completed.
=head2 dequeue
($task) = $scheduler->dequeue
() = $scheduler->dequeue
Get the next scheduled I<$task>.
B<returns>:
src/Async.cpp view on Meta::CPAN
return *this;
// flatten the pointer until we reach something concrete
AsyncRef& ptr = as_ptr;
while (ptr->type == Async_Type::IS_PTR)
ptr = ptr->as_ptr;
return ptr.get();
}
auto Async::add_blocked(AsyncRef b) -> void
{
ptr_follow().blocked.emplace_back(std::move(b));
}
src/Async.h view on Meta::CPAN
fprintf(stderr, "#DEBUG Async: " __VA_ARGS__); \
fflush(stderr); \
} while (0)
#endif /* ifndef ASYNC_TRAMPOLINE_DEBUG */
#define ASYNC_FORMAT "<Async %p %s ref=%zu blocks=%zu>"
#define ASYNC_FORMAT_ARGS(aptr) \
(aptr), \
Async_maybe_type_name(aptr), \
Async_maybe_refcount(aptr), \
Async_maybe_blocked_size(aptr)
#ifdef __cpp_ref_qualifiers
#define MAYBE_MOVEREF &&
#else
#define MAYBE_MOVEREF
#endif
enum class Async_Type
{
IS_UNINITIALIZED,
src/Async.h view on Meta::CPAN
size_t refcount;
union {
Async_Uninitialized as_uninitialized;
AsyncRef as_ptr;
Async_Thunk as_thunk;
Async_Pair as_binary;
Async_Flow as_flow;
Destructible as_error;
DestructibleTuple as_value;
};
std::vector<AsyncRef> blocked;
Async() :
type{Async_Type::IS_UNINITIALIZED},
refcount{1},
as_ptr{nullptr},
blocked{}
{ }
Async(Async&& other) : Async{} { set_from(std::move(other)); }
~Async() {
assert(blocked.size() == 0);
clear();
assert(type == Async_Type::IS_UNINITIALIZED);
}
auto ref() noexcept -> Async& { refcount++; return *this; }
auto unref() -> void;
auto operator=(Async& other) -> Async&;
auto clear() -> void;
auto set_from(Async&& other) -> void;
void set_to_Ptr (AsyncRef target);
void set_to_RawThunk (Async_RawThunk::Callback callback, AsyncRef dep);
void set_to_Thunk (Async_Thunk::Callback callback, AsyncRef dep);
void set_to_Concat (AsyncRef left, AsyncRef right);
void set_to_Flow (Async_Flow);
void set_to_Cancel ();
void set_to_Error (Destructible error);
void set_to_Value (DestructibleTuple values);
auto add_blocked(AsyncRef blocked) -> void;
auto ptr_follow() -> Async&;
auto has_category(Async_Type type) -> bool
{ return ptr_follow().type >= type; }
auto has_type(Async_Type type) -> bool
{ return ptr_follow().type == type; }
static auto alloc() -> AsyncRef;
src/Async.h view on Meta::CPAN
ptr = nullptr;
}
// functions used with debugging output
static inline auto Async_maybe_type_name(Async const* ptr) noexcept -> char const*
{ return ptr ? Async_Type_name(ptr->type) : "(NULL)"; }
static inline auto Async_maybe_refcount(Async const* ptr) noexcept -> size_t
{ return ptr ? ptr->refcount : 0; }
static inline auto Async_maybe_blocked_size(Async const* ptr) noexcept -> size_t
{ return ptr ? ptr->blocked.size() : 0; }
// Evaluation: Async_X_evaluate()
// Incomplete -> Complete
void
Async_eval(
Async* self,
AsyncRef& next,
AsyncRef& blocked);
inline auto AsyncRef::fold() -> AsyncRef&
{
Async* target = &ptr->ptr_follow();
if (target != ptr)
*this = target;
return *this;
}
inline static
src/Async_Evaluation.cpp view on Meta::CPAN
#include "Async.h"
#include "Scheduler.h"
#define UNUSED(x) static_cast<void>(x)
#define EVAL_RETURN(next_async, blocked_async) \
(void) (next = next_async, blocked = blocked_async)
void
Async_run_until_completion(
Async* async)
{
ASYNC_LOG_DEBUG("loop for Async %p\n", async);
Async_Trampoline_Scheduler scheduler{};
scheduler.enqueue(async);
while (scheduler.queue_size() > 0)
{
AsyncRef top = scheduler.dequeue();
if (!top)
break;
Async trap;
AsyncRef next = &trap;
AsyncRef blocked = &trap;
Async_eval(top.decay(), next, blocked);
assert(next.decay() != &trap);
assert(blocked.decay() != &trap);
if (blocked)
assert(next);
if (next)
{
scheduler.enqueue(next.decay());
if (blocked)
scheduler.block_on(next.get(), blocked.decay());
}
if (top.decay() != next.decay() && top.decay() != blocked.decay())
{
ASYNC_LOG_DEBUG("completed %p\n", top.decay());
assert(top->has_category(Async_Type::CATEGORY_COMPLETE));
scheduler.complete(*top);
}
}
ASYNC_LOG_DEBUG("loop complete\n");
}
// Type-specific cases
#define ENSURE_DEPENDENCY(self, dependency) do { \
if (!(dependency)->has_category(Async_Type::CATEGORY_COMPLETE)) \
return EVAL_RETURN((dependency), (self)); \
} while (0)
static void Async_Ptr_eval(
Async* self,
AsyncRef& next,
AsyncRef& blocked)
{
assert(self);
assert(self->type == Async_Type::IS_PTR);
Async* dep = self->as_ptr.decay();
ASYNC_LOG_DEBUG("eval Ptr %p dep=%p\n", self, dep);
ENSURE_DEPENDENCY(self, dep);
src/Async_Evaluation.cpp view on Meta::CPAN
self->as_ptr = followed;
return EVAL_RETURN(nullptr, nullptr);
}
static
void
Async_RawThunk_eval(
Async* self,
AsyncRef& next,
AsyncRef& blocked)
{
assert(self);
assert(self->type == Async_Type::IS_RAWTHUNK);
UNUSED(next);
UNUSED(blocked);
assert(0); // TODO not implemented
}
static
void
Async_Thunk_eval(
Async* self,
AsyncRef& next,
AsyncRef& blocked)
{
assert(self);
assert(self->type == Async_Type::IS_THUNK);
ASYNC_LOG_DEBUG(
"running Thunk %p: callback=??? dependency=%p\n",
self,
self->as_thunk.dependency.decay());
AsyncRef dependency = self->as_thunk.dependency;
src/Async_Evaluation.cpp view on Meta::CPAN
if (right->has_type(type))
return right.decay();
return nullptr;
}
static
void
Async_Concat_eval(
Async* self,
AsyncRef& next,
AsyncRef& blocked)
{
assert(self);
assert(self->type == Async_Type::IS_CONCAT);
auto& left = self->as_binary.left.fold();
auto& right = self->as_binary.right.fold();
for (Async_Type type : { Async_Type::IS_CANCEL, Async_Type::IS_ERROR })
{
if(Async* selected = select_if_either_has_type(left, right, type))
src/Async_Evaluation.cpp view on Meta::CPAN
}
self->clear();
self->set_to_Value(std::move(tuple));
return EVAL_RETURN(NULL, NULL);
}
void Async_Flow_eval(
Async* self,
AsyncRef& next,
AsyncRef& blocked)
{
assert(self);
assert(self->type == Async_Type::IS_FLOW);
using Direction = Async_Flow::Direction;
Async* left = self->as_flow.left.decay();
Async* right = self->as_flow.right.decay();
Async_Type decision_type = self->as_flow.flow_type;
Direction flow_direction = self->as_flow.direction;
src/Async_Evaluation.cpp view on Meta::CPAN
return EVAL_RETURN(self, NULL);
}
}
// Polymorphic
void
Async_eval(
Async* self,
AsyncRef& next,
AsyncRef& blocked)
{
ASYNC_LOG_DEBUG(
"running Async %p (%2d %s)\n",
self,
static_cast<int>(self->type),
Async_Type_name(self->type));
switch (self->type) {
case Async_Type::IS_UNINITIALIZED:
assert(0);
break;
case Async_Type::CATEGORY_INITIALIZED:
assert(0);
break;
case Async_Type::IS_PTR:
Async_Ptr_eval(self, next, blocked);
break;
case Async_Type::IS_RAWTHUNK:
Async_RawThunk_eval(
self, next, blocked);
break;
case Async_Type::IS_THUNK:
Async_Thunk_eval(
self, next, blocked);
break;
case Async_Type::IS_CONCAT:
Async_Concat_eval(
self, next, blocked);
break;
case Async_Type::IS_FLOW:
Async_Flow_eval(self, next, blocked);
break;
case Async_Type::CATEGORY_COMPLETE:
assert(0);
break;
case Async_Type::IS_CANCEL: // already complete
EVAL_RETURN(NULL, NULL);
break;
case Async_Type::CATEGORY_RESOLVED:
src/Async_Evaluation.cpp view on Meta::CPAN
break;
case Async_Type::IS_VALUE: // already complete
EVAL_RETURN(NULL, NULL);
break;
default:
assert(0);
}
ASYNC_LOG_DEBUG(
"... %p result: next=%p blocked=%p\n",
self,
next.decay(),
blocked.decay());
}
src/Async_Initialization.cpp view on Meta::CPAN
default:
assert(0);
}
}
auto Async::set_from(Async&& other) -> void
{
assert(type == Async_Type::IS_UNINITIALIZED);
assert(other.blocked.size() == 0);
switch (other.type)
{
case Async_Type::IS_UNINITIALIZED:
break;
case Async_Type::CATEGORY_INITIALIZED:
assert(0);
break;
case Async_Type::IS_PTR:
src/Async_Initialization.cpp view on Meta::CPAN
ASYNC_LOG_DEBUG("init %p to Ptr: target=" ASYNC_FORMAT "\n",
this,
ASYNC_FORMAT_ARGS(target.decay()));
type = Async_Type::IS_PTR;
new (&as_ptr) AsyncRef { std::move(target) };
// // transfer all dependencies to target
// Async& target_ref = ptr_follow();
// for (auto& depref : blocked)
// target_ref.add_blocked(std::move(depref));
// blocked.clear();
}
void
Async_Ptr_clear(
Async* self)
{
assert(self);
assert(self->type == Async_Type::IS_PTR);
ASYNC_LOG_DEBUG("clear %p of Ptr: target=" ASYNC_FORMAT "\n",
src/Scheduler.cpp view on Meta::CPAN
fprintf(stderr, "#DEBUG " __VA_ARGS__); \
fflush(stderr); \
} while (0)
#endif
// == The Impl Declaration ==
class Async_Trampoline_Scheduler::Impl {
CircularBuffer<AsyncRef> runnable_queue{};
std::unordered_set<Async const*> runnable_enqueued{};
// std::unordered_multimap<Async const*, AsyncRef> blocked{};
public:
Impl(size_t initial_capacity);
~Impl();
auto queue_size() const -> size_t { return runnable_queue.size(); }
void enqueue(AsyncRef async);
AsyncRef dequeue();
void block_on(Async& dependency_async, AsyncRef blocked_async);
void complete(Async& async);
};
// == The Public C++ Interface ==
Async_Trampoline_Scheduler::Async_Trampoline_Scheduler(size_t initial_capacity)
: m_impl{new Async_Trampoline_Scheduler::Impl{initial_capacity}}
{}
Async_Trampoline_Scheduler::~Async_Trampoline_Scheduler() = default;
src/Scheduler.cpp view on Meta::CPAN
{
m_impl->enqueue(std::move(async));
}
auto Async_Trampoline_Scheduler::dequeue() -> AsyncRef
{
return m_impl->dequeue();
}
auto Async_Trampoline_Scheduler::block_on(
Async& dependency_async, AsyncRef blocked_async) -> void
{
m_impl->block_on(dependency_async, std::move(blocked_async));
}
auto Async_Trampoline_Scheduler::complete(Async& async) -> void
{
m_impl->complete(async);
}
// == The Impl implementation ==
#define SCHEDULER_RUNNABLE_QUEUE_FORMAT \
src/Scheduler.cpp view on Meta::CPAN
Async* key = async.decay();
runnable_queue.enq(std::move(async));
runnable_enqueued.insert(key);
LOG_DEBUG(
" '-> " SCHEDULER_RUNNABLE_QUEUE_FORMAT "\n",
SCHEDULER_RUNNABLE_QUEUE_FORMAT_ARGS(*this));
}
void Async_Trampoline_Scheduler::Impl::block_on(
Async& dependency_async, AsyncRef blocked_async)
{
LOG_DEBUG(
"dependency of " ASYNC_FORMAT " on " ASYNC_FORMAT "\n",
ASYNC_FORMAT_ARGS(blocked_async.decay()),
ASYNC_FORMAT_ARGS(&dependency_async));
dependency_async.add_blocked(std::move(blocked_async));
}
AsyncRef Async_Trampoline_Scheduler::Impl::dequeue()
{
assert(runnable_queue.size());
AsyncRef async = runnable_queue.deq();
LOG_DEBUG(
"dequeue %p from " SCHEDULER_RUNNABLE_QUEUE_FORMAT "\n",
src/Scheduler.cpp view on Meta::CPAN
}
runnable_enqueued.erase(entry);
return async;
}
void Async_Trampoline_Scheduler::Impl::complete(Async& async)
{
LOG_DEBUG("completing %p\n", &async);
LOG_DEBUG(" '-> %zu dependencies\n", async.blocked.size());
if (async.blocked.size() == 0)
return;
for (auto& ref : async.blocked)
enqueue(std::move(ref));
async.blocked.clear();
if (async.type == Async_Type::IS_PTR
&& async.has_category(Async_Type::CATEGORY_COMPLETE))
{
complete(async.as_ptr.get());
}
}
src/Scheduler.h view on Meta::CPAN
*
* Returns: AsyncRef
* The next item.
*/
auto dequeue() -> AsyncRef;
/** Register a dependency relationship.
*
* This is similar to a Makefile rule:
*
* blocked_async: dependency_async
*
* dependency_async: Async&
* must be completed first.
* blocked_async: AsyncRef
* is blocked until the "dependency_async" is completed-
*/
auto block_on(Async& dependency_async, AsyncRef blocked_async) -> void;
/** Mark an item as completed.
*
* Any items that block on the completed items will be enqueued in an
* unspecified order.
*
* async: Async&
* a completed item.
*/
auto complete(Async& async) -> void;
t/scheduler.t view on Meta::CPAN
is $starter, $starter_again, q(got starter async back);
is scalar $scheduler->dequeue, undef, q(queue has no further elems);
$scheduler->complete($starter_again);
my @results;
while (my ($async) = $scheduler->dequeue) {
push @results, $async->run_until_completion;
}
@results = sort @results;
is "@results", "1 2", q(got blocked tasks back);
};
done_testing;