Async-Trampoline
view release on metacpan or search on metacpan
src/Async_Evaluation.cpp view on Meta::CPAN
#include "Async.h"
#include "Scheduler.h"
#define UNUSED(x) static_cast<void>(x)
#define EVAL_RETURN(next_async, blocked_async) \
(void) (next = next_async, blocked = blocked_async)
void
Async_run_until_completion(
Async* async)
{
ASYNC_LOG_DEBUG("loop for Async %p\n", async);
Async_Trampoline_Scheduler scheduler{};
scheduler.enqueue(async);
while (scheduler.queue_size() > 0)
{
AsyncRef top = scheduler.dequeue();
if (!top)
break;
Async trap;
AsyncRef next = &trap;
AsyncRef blocked = &trap;
Async_eval(top.decay(), next, blocked);
assert(next.decay() != &trap);
assert(blocked.decay() != &trap);
if (blocked)
assert(next);
if (next)
{
scheduler.enqueue(next.decay());
if (blocked)
scheduler.block_on(next.get(), blocked.decay());
}
if (top.decay() != next.decay() && top.decay() != blocked.decay())
{
ASYNC_LOG_DEBUG("completed %p\n", top.decay());
assert(top->has_category(Async_Type::CATEGORY_COMPLETE));
scheduler.complete(*top);
}
}
ASYNC_LOG_DEBUG("loop complete\n");
}
// Type-specific cases
#define ENSURE_DEPENDENCY(self, dependency) do { \
if (!(dependency)->has_category(Async_Type::CATEGORY_COMPLETE)) \
return EVAL_RETURN((dependency), (self)); \
} while (0)
static void Async_Ptr_eval(
Async* self,
AsyncRef& next,
AsyncRef& blocked)
{
assert(self);
assert(self->type == Async_Type::IS_PTR);
Async* dep = self->as_ptr.decay();
ASYNC_LOG_DEBUG("eval Ptr %p dep=%p\n", self, dep);
ENSURE_DEPENDENCY(self, dep);
Async* followed = &self->ptr_follow();
if (dep != followed)
self->as_ptr = followed;
return EVAL_RETURN(nullptr, nullptr);
}
static
void
Async_RawThunk_eval(
Async* self,
AsyncRef& next,
AsyncRef& blocked)
{
assert(self);
assert(self->type == Async_Type::IS_RAWTHUNK);
UNUSED(next);
UNUSED(blocked);
assert(0); // TODO not implemented
}
static
void
Async_Thunk_eval(
Async* self,
AsyncRef& next,
AsyncRef& blocked)
{
assert(self);
assert(self->type == Async_Type::IS_THUNK);
ASYNC_LOG_DEBUG(
"running Thunk %p: callback=??? dependency=%p\n",
self,
self->as_thunk.dependency.decay());
AsyncRef dependency = self->as_thunk.dependency;
DestructibleTuple default_value{};
DestructibleTuple const* values = &default_value;
if (dependency)
{
dependency.fold();
ENSURE_DEPENDENCY(self, dependency);
if (!dependency->has_type(Async_Type::IS_VALUE))
{
*self = dependency.get();
return EVAL_RETURN(NULL, NULL);
}
assert(dependency->type == Async_Type::IS_VALUE);
values = &dependency->as_value;
}
AsyncRef result = self->as_thunk.callback(*values);
assert(result);
*self = result.get();
return EVAL_RETURN(self, nullptr);
}
static
Async*
select_if_either_has_type(AsyncRef& left, AsyncRef& right, Async_Type type)
{
if (left->has_type(type))
return left.decay();
if (right->has_type(type))
return right.decay();
return nullptr;
}
static
void
Async_Concat_eval(
Async* self,
AsyncRef& next,
AsyncRef& blocked)
{
assert(self);
assert(self->type == Async_Type::IS_CONCAT);
auto& left = self->as_binary.left.fold();
auto& right = self->as_binary.right.fold();
for (Async_Type type : { Async_Type::IS_CANCEL, Async_Type::IS_ERROR })
{
if(Async* selected = select_if_either_has_type(left, right, type))
{
*self = *selected;
return EVAL_RETURN(nullptr, nullptr);
}
}
ENSURE_DEPENDENCY(self, left);
ENSURE_DEPENDENCY(self, right);
assert(left->type == Async_Type::IS_VALUE);
assert(right->type == Async_Type::IS_VALUE);
assert(left->as_value.vtable == right->as_value.vtable);
auto vtable = left->as_value.vtable;
size_t size = left->as_value.size + right->as_value.size;
DestructibleTuple tuple {vtable, size};
// move or copy the values,
// depending on left/right refcount
size_t output_i = 0;
for (Async* source : { left.decay(), right.decay() })
{
auto copy_or_move =
(source->refcount == 1)
? [](DestructibleTuple& input, size_t i)
{ return input.move_from(i); }
: [](DestructibleTuple& input, size_t i)
{ return input.copy_from(i); };
DestructibleTuple& input = source->as_value;
for (size_t input_i = 0; input_i < input.size; input_i++, output_i++)
{
Destructible temp = copy_or_move(input, input_i);
tuple.set(output_i, std::move(temp));
}
}
self->clear();
self->set_to_Value(std::move(tuple));
return EVAL_RETURN(NULL, NULL);
}
void Async_Flow_eval(
Async* self,
AsyncRef& next,
AsyncRef& blocked)
{
assert(self);
assert(self->type == Async_Type::IS_FLOW);
using Direction = Async_Flow::Direction;
Async* left = self->as_flow.left.decay();
Async* right = self->as_flow.right.decay();
Async_Type decision_type = self->as_flow.flow_type;
Direction flow_direction = self->as_flow.direction;
ENSURE_DEPENDENCY(self, left);
bool stay_left;
switch (flow_direction)
{
case Direction::THEN:
stay_left = !Async_has_category(left, decision_type);
break;
case Direction::OR:
stay_left = Async_has_category(left, decision_type);
break;
default:
assert(0);
}
if (stay_left)
{
*self = *left;
return EVAL_RETURN(NULL, NULL);
}
else
{
*self = *right;
return EVAL_RETURN(self, NULL);
}
}
// Polymorphic
void
Async_eval(
Async* self,
AsyncRef& next,
AsyncRef& blocked)
{
ASYNC_LOG_DEBUG(
"running Async %p (%2d %s)\n",
self,
static_cast<int>(self->type),
Async_Type_name(self->type));
switch (self->type) {
case Async_Type::IS_UNINITIALIZED:
assert(0);
break;
case Async_Type::CATEGORY_INITIALIZED:
assert(0);
break;
case Async_Type::IS_PTR:
Async_Ptr_eval(self, next, blocked);
break;
case Async_Type::IS_RAWTHUNK:
Async_RawThunk_eval(
self, next, blocked);
break;
case Async_Type::IS_THUNK:
Async_Thunk_eval(
self, next, blocked);
break;
case Async_Type::IS_CONCAT:
Async_Concat_eval(
self, next, blocked);
break;
case Async_Type::IS_FLOW:
Async_Flow_eval(self, next, blocked);
break;
case Async_Type::CATEGORY_COMPLETE:
assert(0);
break;
case Async_Type::IS_CANCEL: // already complete
EVAL_RETURN(NULL, NULL);
break;
case Async_Type::CATEGORY_RESOLVED:
assert(0);
break;
case Async_Type::IS_ERROR: // already complete
EVAL_RETURN(NULL, NULL);
break;
case Async_Type::IS_VALUE: // already complete
EVAL_RETURN(NULL, NULL);
break;
default:
assert(0);
}
ASYNC_LOG_DEBUG(
"... %p result: next=%p blocked=%p\n",
self,
next.decay(),
blocked.decay());
}
( run in 1.213 second using v1.01-cache-2.11-cpan-13bb782fe5a )