Async-Trampoline

 view release on metacpan or  search on metacpan

lib/Async/Trampoline/Scheduler.pm  view on Meta::CPAN

=head2 new

    $scheduler = Async::Trampoline::Scheduler->new

Create a new scheduler.

=head2 enqueue

    $scheduler->enqueue($task)

    $scheduler->enqueue($task => @blocked)

Add a I<$task> to the runnable queue.

B<$task>:
A task to be scheduled at some point in the future.

B<@blocked>:
Any number of tasks that depend on the I<$task>.
The blocked tasks will be added to the runnable queue
once the I<$task> is completed.

=head2 dequeue

    ($task) = $scheduler->dequeue
    ()      = $scheduler->dequeue

Get the next scheduled I<$task>.

B<returns>:

src/Async.cpp  view on Meta::CPAN

        return *this;

    // flatten the pointer until we reach something concrete
    AsyncRef& ptr = as_ptr;
    while (ptr->type == Async_Type::IS_PTR)
        ptr = ptr->as_ptr;

    return ptr.get();
}

auto Async::add_blocked(AsyncRef b) -> void
{
    ptr_follow().blocked.emplace_back(std::move(b));
}

src/Async.h  view on Meta::CPAN

    fprintf(stderr, "#DEBUG Async: " __VA_ARGS__);                          \
    fflush(stderr);                                                         \
} while (0)
#endif /* ifndef ASYNC_TRAMPOLINE_DEBUG */

#define ASYNC_FORMAT "<Async %p %s ref=%zu blocks=%zu>"
#define ASYNC_FORMAT_ARGS(aptr)                                             \
    (aptr),                                                                 \
    Async_maybe_type_name(aptr),                                            \
    Async_maybe_refcount(aptr),                                             \
    Async_maybe_blocked_size(aptr)

#ifdef __cpp_ref_qualifiers
#define MAYBE_MOVEREF &&
#else
#define MAYBE_MOVEREF
#endif

enum class Async_Type
{
    IS_UNINITIALIZED,

src/Async.h  view on Meta::CPAN

    size_t refcount;
    union {
        Async_Uninitialized as_uninitialized;
        AsyncRef            as_ptr;
        Async_Thunk         as_thunk;
        Async_Pair          as_binary;
        Async_Flow          as_flow;
        Destructible        as_error;
        DestructibleTuple   as_value;
    };
    std::vector<AsyncRef> blocked;

    Async() :
        type{Async_Type::IS_UNINITIALIZED},
        refcount{1},
        as_ptr{nullptr},
        blocked{}
    { }
    Async(Async&& other) : Async{} { set_from(std::move(other)); }
    ~Async() {
        assert(blocked.size() == 0);
        clear();
        assert(type == Async_Type::IS_UNINITIALIZED);
    }

    auto ref() noexcept -> Async& { refcount++; return *this; }
    auto unref() -> void;

    auto operator=(Async& other) -> Async&;
    auto clear() -> void;
    auto set_from(Async&& other) -> void;

    void set_to_Ptr         (AsyncRef target);
    void set_to_RawThunk    (Async_RawThunk::Callback callback, AsyncRef dep);
    void set_to_Thunk       (Async_Thunk::Callback    callback, AsyncRef dep);
    void set_to_Concat      (AsyncRef left, AsyncRef right);
    void set_to_Flow        (Async_Flow);
    void set_to_Cancel      ();
    void set_to_Error       (Destructible error);
    void set_to_Value       (DestructibleTuple values);

    auto add_blocked(AsyncRef blocked) -> void;

    auto ptr_follow() -> Async&;

    auto has_category(Async_Type type) -> bool
    { return ptr_follow().type >= type; }

    auto has_type(Async_Type type) -> bool
    { return ptr_follow().type == type; }

    static auto alloc() -> AsyncRef;

src/Async.h  view on Meta::CPAN

    ptr = nullptr;
}

// functions used with debugging output
static inline auto Async_maybe_type_name(Async const* ptr) noexcept -> char const*
{ return ptr ? Async_Type_name(ptr->type) : "(NULL)"; }

static inline auto Async_maybe_refcount(Async const* ptr) noexcept -> size_t
{ return ptr ? ptr->refcount : 0; }

static inline auto Async_maybe_blocked_size(Async const* ptr) noexcept -> size_t
{ return ptr ? ptr->blocked.size() : 0; }


// Evaluation: Async_X_evaluate()
// Incomplete -> Complete
void
Async_eval(
        Async*  self,
        AsyncRef& next,
        AsyncRef& blocked);

inline auto AsyncRef::fold() -> AsyncRef&
{
    Async* target = &ptr->ptr_follow();
    if (target != ptr)
        *this = target;
    return *this;
}

inline static

src/Async_Evaluation.cpp  view on Meta::CPAN

#include "Async.h"
#include "Scheduler.h"

#define UNUSED(x) static_cast<void>(x)

#define EVAL_RETURN(next_async, blocked_async) \
    (void)  (next = next_async, blocked = blocked_async)

void
Async_run_until_completion(
        Async* async)
{
    ASYNC_LOG_DEBUG("loop for Async %p\n", async);

    Async_Trampoline_Scheduler scheduler{};

    scheduler.enqueue(async);

    while (scheduler.queue_size() > 0)
    {
        AsyncRef top = scheduler.dequeue();

        if (!top)
            break;

        Async trap;
        AsyncRef next = &trap;
        AsyncRef blocked = &trap;
        Async_eval(top.decay(), next, blocked);

        assert(next.decay() != &trap);
        assert(blocked.decay() != &trap);

        if (blocked)
            assert(next);

        if (next)
        {
            scheduler.enqueue(next.decay());

            if (blocked)
                scheduler.block_on(next.get(), blocked.decay());
        }

        if (top.decay() != next.decay() && top.decay() != blocked.decay())
        {
            ASYNC_LOG_DEBUG("completed %p\n", top.decay());
            assert(top->has_category(Async_Type::CATEGORY_COMPLETE));
            scheduler.complete(*top);
        }
    }

    ASYNC_LOG_DEBUG("loop complete\n");
}

// Type-specific cases

#define ENSURE_DEPENDENCY(self, dependency) do {                            \
    if (!(dependency)->has_category(Async_Type::CATEGORY_COMPLETE))         \
        return EVAL_RETURN((dependency), (self));                           \
} while (0)

static void Async_Ptr_eval(
        Async*      self,
        AsyncRef&   next,
        AsyncRef&   blocked)
{
    assert(self);
    assert(self->type == Async_Type::IS_PTR);

    Async* dep = self->as_ptr.decay();

    ASYNC_LOG_DEBUG("eval Ptr %p dep=%p\n", self, dep);

    ENSURE_DEPENDENCY(self, dep);

src/Async_Evaluation.cpp  view on Meta::CPAN

        self->as_ptr = followed;

    return EVAL_RETURN(nullptr, nullptr);
}

static
void
Async_RawThunk_eval(
        Async*  self,
        AsyncRef& next,
        AsyncRef& blocked)
{
    assert(self);
    assert(self->type == Async_Type::IS_RAWTHUNK);

    UNUSED(next);
    UNUSED(blocked);
    assert(0);  // TODO not implemented
}

static
void
Async_Thunk_eval(
        Async*  self,
        AsyncRef& next,
        AsyncRef& blocked)
{
    assert(self);
    assert(self->type == Async_Type::IS_THUNK);

    ASYNC_LOG_DEBUG(
            "running Thunk %p: callback=??? dependency=%p\n",
            self,
            self->as_thunk.dependency.decay());

    AsyncRef dependency = self->as_thunk.dependency;

src/Async_Evaluation.cpp  view on Meta::CPAN

    if (right->has_type(type))
        return right.decay();
    return nullptr;
}

static
void
Async_Concat_eval(
        Async*  self,
        AsyncRef& next,
        AsyncRef& blocked)
{
    assert(self);
    assert(self->type == Async_Type::IS_CONCAT);

    auto& left  = self->as_binary.left.fold();
    auto& right = self->as_binary.right.fold();

    for (Async_Type type : { Async_Type::IS_CANCEL, Async_Type::IS_ERROR })
    {
        if(Async* selected  = select_if_either_has_type(left, right, type))

src/Async_Evaluation.cpp  view on Meta::CPAN

    }

    self->clear();
    self->set_to_Value(std::move(tuple));
    return EVAL_RETURN(NULL, NULL);
}

void Async_Flow_eval(
        Async*      self,
        AsyncRef&   next,
        AsyncRef&   blocked)
{
    assert(self);
    assert(self->type == Async_Type::IS_FLOW);

    using Direction = Async_Flow::Direction;
    Async* left = self->as_flow.left.decay();
    Async* right = self->as_flow.right.decay();
    Async_Type decision_type = self->as_flow.flow_type;
    Direction flow_direction = self->as_flow.direction;

src/Async_Evaluation.cpp  view on Meta::CPAN

        return EVAL_RETURN(self, NULL);
    }
}

// Polymorphic

void
Async_eval(
        Async*  self,
        AsyncRef& next,
        AsyncRef& blocked)
{
    ASYNC_LOG_DEBUG(
            "running Async %p (%2d %s)\n",
            self,
            static_cast<int>(self->type),
            Async_Type_name(self->type));

    switch (self->type) {
        case Async_Type::IS_UNINITIALIZED:
            assert(0);
            break;

        case Async_Type::CATEGORY_INITIALIZED:
            assert(0);
            break;
        case Async_Type::IS_PTR:
            Async_Ptr_eval(self, next, blocked);
            break;
        case Async_Type::IS_RAWTHUNK:
            Async_RawThunk_eval(
                    self, next, blocked);
            break;
        case Async_Type::IS_THUNK:
            Async_Thunk_eval(
                    self, next, blocked);
            break;
        case Async_Type::IS_CONCAT:
            Async_Concat_eval(
                    self, next, blocked);
            break;
        case Async_Type::IS_FLOW:
            Async_Flow_eval(self, next, blocked);
            break;

        case Async_Type::CATEGORY_COMPLETE:
            assert(0);
            break;
        case Async_Type::IS_CANCEL:  // already complete
            EVAL_RETURN(NULL, NULL);
            break;

        case Async_Type::CATEGORY_RESOLVED:

src/Async_Evaluation.cpp  view on Meta::CPAN

            break;
        case Async_Type::IS_VALUE:  // already complete
            EVAL_RETURN(NULL, NULL);
            break;

        default:
            assert(0);
    }

    ASYNC_LOG_DEBUG(
            "... %p result: next=%p blocked=%p\n",
            self,
            next.decay(),
            blocked.decay());
}

src/Async_Initialization.cpp  view on Meta::CPAN


        default:
            assert(0);
    }
}

auto Async::set_from(Async&& other) -> void
{
    assert(type == Async_Type::IS_UNINITIALIZED);

    assert(other.blocked.size() == 0);

    switch (other.type)
    {
        case Async_Type::IS_UNINITIALIZED:
            break;

        case Async_Type::CATEGORY_INITIALIZED:
            assert(0);
            break;
        case Async_Type::IS_PTR:

src/Async_Initialization.cpp  view on Meta::CPAN


    ASYNC_LOG_DEBUG("init %p to Ptr: target=" ASYNC_FORMAT "\n",
            this,
            ASYNC_FORMAT_ARGS(target.decay()));

    type = Async_Type::IS_PTR;
    new (&as_ptr) AsyncRef { std::move(target) };

    //  // transfer all dependencies to target
    //  Async& target_ref = ptr_follow();
    //  for (auto& depref : blocked)
    //      target_ref.add_blocked(std::move(depref));
    //  blocked.clear();
}

void
Async_Ptr_clear(
        Async*  self)
{
    assert(self);
    assert(self->type == Async_Type::IS_PTR);

    ASYNC_LOG_DEBUG("clear %p of Ptr: target=" ASYNC_FORMAT "\n",

src/Scheduler.cpp  view on Meta::CPAN

    fprintf(stderr, "#DEBUG " __VA_ARGS__);                                 \
    fflush(stderr);                                                         \
} while (0)
#endif

// == The Impl Declaration ==

class Async_Trampoline_Scheduler::Impl {
    CircularBuffer<AsyncRef> runnable_queue{};
    std::unordered_set<Async const*> runnable_enqueued{};
    // std::unordered_multimap<Async const*, AsyncRef> blocked{};

public:

    Impl(size_t initial_capacity);
    ~Impl();

    auto queue_size() const -> size_t { return runnable_queue.size(); }
    void enqueue(AsyncRef async);
    AsyncRef dequeue();
    void block_on(Async& dependency_async, AsyncRef blocked_async);
    void complete(Async& async);
};

// == The Public C++ Interface ==

Async_Trampoline_Scheduler::Async_Trampoline_Scheduler(size_t initial_capacity)
    : m_impl{new Async_Trampoline_Scheduler::Impl{initial_capacity}}
{}

Async_Trampoline_Scheduler::~Async_Trampoline_Scheduler() = default;

src/Scheduler.cpp  view on Meta::CPAN

{
    m_impl->enqueue(std::move(async));
}

auto Async_Trampoline_Scheduler::dequeue() -> AsyncRef
{
    return m_impl->dequeue();
}

auto Async_Trampoline_Scheduler::block_on(
        Async& dependency_async, AsyncRef blocked_async) -> void
{
    m_impl->block_on(dependency_async, std::move(blocked_async));
}

auto Async_Trampoline_Scheduler::complete(Async& async) -> void
{
    m_impl->complete(async);
}

// == The Impl implementation ==

#define SCHEDULER_RUNNABLE_QUEUE_FORMAT                                     \

src/Scheduler.cpp  view on Meta::CPAN

    Async* key = async.decay();
    runnable_queue.enq(std::move(async));
    runnable_enqueued.insert(key);

    LOG_DEBUG(
            "    '-> " SCHEDULER_RUNNABLE_QUEUE_FORMAT "\n",
            SCHEDULER_RUNNABLE_QUEUE_FORMAT_ARGS(*this));
}

void Async_Trampoline_Scheduler::Impl::block_on(
        Async& dependency_async, AsyncRef blocked_async)
{
    LOG_DEBUG(
        "dependency of " ASYNC_FORMAT " on " ASYNC_FORMAT "\n",
        ASYNC_FORMAT_ARGS(blocked_async.decay()),
        ASYNC_FORMAT_ARGS(&dependency_async));

    dependency_async.add_blocked(std::move(blocked_async));
}

AsyncRef Async_Trampoline_Scheduler::Impl::dequeue()
{
    assert(runnable_queue.size());

    AsyncRef async = runnable_queue.deq();

    LOG_DEBUG(
            "dequeue %p from " SCHEDULER_RUNNABLE_QUEUE_FORMAT "\n",

src/Scheduler.cpp  view on Meta::CPAN

    }
    runnable_enqueued.erase(entry);

    return async;
}

void Async_Trampoline_Scheduler::Impl::complete(Async& async)
{
    LOG_DEBUG("completing %p\n", &async);

    LOG_DEBUG("    '-> %zu dependencies\n", async.blocked.size());

    if (async.blocked.size() == 0)
        return;

    for (auto& ref : async.blocked)
        enqueue(std::move(ref));
    async.blocked.clear();

    if (async.type == Async_Type::IS_PTR
            && async.has_category(Async_Type::CATEGORY_COMPLETE))
    {
        complete(async.as_ptr.get());
    }
}

src/Scheduler.h  view on Meta::CPAN

     *
     *  Returns: AsyncRef
     *      The next item.
     */
    auto dequeue() -> AsyncRef;

    /** Register a dependency relationship.
     *
     *  This is similar to a Makefile rule:
     *
     *      blocked_async: dependency_async
     *
     *  dependency_async: Async&
     *      must be completed first.
     *  blocked_async: AsyncRef
     *      is blocked until the "dependency_async" is completed-
     */
    auto block_on(Async& dependency_async, AsyncRef blocked_async) -> void;

    /** Mark an item as completed.
     *
     *  Any items that block on the completed items will be enqueued in an
     *  unspecified order.
     *
     *  async: Async&
     *      a completed item.
     */
    auto complete(Async& async) -> void;

t/scheduler.t  view on Meta::CPAN

    is $starter, $starter_again, q(got starter async back);
    is scalar $scheduler->dequeue, undef, q(queue has no further elems);
    $scheduler->complete($starter_again);

    my @results;
    while (my ($async) = $scheduler->dequeue) {
        push @results, $async->run_until_completion;
    }
    @results = sort @results;

    is "@results", "1 2", q(got blocked tasks back);
};

done_testing;



( run in 0.693 second using v1.01-cache-2.11-cpan-49f99fa48dc )