Zuzu
view release on metacpan or search on metacpan
lib/Zuzu/Runtime/Async/Scheduler.pm view on Meta::CPAN
$self->trace_events( [] );
return;
}
sub schedule {
my ( $self, $task, $group ) = @_;
return $task if !defined $task;
$task->scheduler($self) if $task->can('scheduler');
$task->use_coro(1) if $task->can('use_coro');
if ( $task->can('id') and !defined $task->id ) {
$task->id( $self->next_task_id );
$self->next_task_id( $self->next_task_id + 1 );
}
if (
$task->can('parent_id')
and !defined $task->parent_id
and defined $self->current_task
and $self->current_task->can('id')
) {
lib/Zuzu/Runtime/Async/Scheduler.pm view on Meta::CPAN
return $task;
}
sub ensure_scheduled {
my ( $self, $task ) = @_;
return $task if !defined $task;
return $self->schedule($task)
if !exists $self->tasks->{ refaddr($task) };
$task->scheduler($self) if $task->can('scheduler') and !$task->scheduler;
$task->use_coro(1) if $task->can('use_coro');
return $task;
}
sub _task_done {
my ( $task ) = @_;
return 1
if $task->status eq 'fulfilled'
or $task->status eq 'rejected'
lib/Zuzu/Runtime/Async/Scheduler.pm view on Meta::CPAN
my @tasks = values %{ $self->tasks };
push @tasks, $target
if defined $target
and !exists $self->tasks->{ refaddr($target) };
my $progress = 0;
for my $task ( @tasks ) {
next if !defined $task;
my $before = $task->status;
if ( $task->status eq 'pending' ) {
if ( $task->can('use_coro') and $task->use_coro ) {
$self->start_coro_task($task);
}
else {
$self->run_pending_task($task);
}
$progress = 1;
}
else {
$progress = 1 if $task->poll;
}
$progress = 1 if $task->status ne $before;
lib/Zuzu/Runtime/Async/Scheduler.pm view on Meta::CPAN
$self->current_task($prior_task);
$self->current_group($prior_group);
die $err if !$ok;
$self->trace_task( $task->status eq 'fulfilled' ? 'fulfill' : 'reject', $task )
if _task_done($task);
$self->cleanup_task($task);
return $task;
}
sub start_coro_task {
my ( $self, $task ) = @_;
return $task if !defined $task;
return $task if $task->is_done;
return $task if defined $task->coro;
my $coro = async {
my $prior_task = $self->current_task;
my $prior_group = $self->current_group;
my $runtime = $self->runtime;
my $prior_stack = defined $runtime
? [ @{ $runtime->{_stack} // [] } ]
: undef;
$self->current_task($task);
$self->current_group(
$task->group // $prior_group // $self->root_group
);
lib/Zuzu/Runtime/Async/Scheduler.pm view on Meta::CPAN
if ( !$ok ) {
$task->error($err);
$task->status('rejected');
}
$self->trace_task(
$task->status eq 'fulfilled' ? 'fulfill' : 'reject',
$task,
) if _task_done($task);
$self->cleanup_task($task);
};
$task->coro($coro);
return $task;
}
sub yield_current_task {
my ( $self ) = @_;
my $current = $self->current_task;
my $group = $self->current_group;
$self->_save_task_runtime_stack($current);
lib/Zuzu/Runtime/Async/Scheduler.pm view on Meta::CPAN
return;
}
sub await_from_current_task {
my ( $self, $target ) = @_;
$self->ensure_scheduled($target) if $target->can('scheduler');
my $current = $self->current_task;
while ( !_task_done($target) ) {
if ( $target->status eq 'pending' ) {
if ( $target->can('use_coro') and $target->use_coro ) {
$self->start_coro_task($target);
}
else {
$self->run_pending_task($target);
}
}
else {
$target->poll;
}
if ( !_task_done($target) ) {
$self->yield_current_task;
lib/Zuzu/Value/Task.pm view on Meta::CPAN
has 'ready_at' => ( is => 'rw' );
has 'poll_cb' => ( is => 'rw' );
has 'on_cancel' => ( is => 'rw' );
has 'scheduler' => ( is => 'rw' );
has 'group' => ( is => 'rw' );
has 'id' => ( is => 'rw' );
has 'parent_id' => ( is => 'rw' );
has 'file' => ( is => 'rw' );
has 'line' => ( is => 'rw' );
has 'cancel_reason' => ( is => 'rw' );
has 'coro' => ( is => 'rw' );
has 'use_coro' => ( is => 'rw', default => sub { 0 } );
has 'runtime_stack' => ( is => 'rw' );
has 'cleanup_ran' => ( is => 'rw', default => sub { 0 } );
sub is_truthy { 1 }
sub to_String {
my ( $self ) = @_;
return '[Task ' . $self->status . ']';
}
lib/Zuzu/Value/Task.pm view on Meta::CPAN
}
sub poll {
my ( $self ) = @_;
return 1
if $self->_cleanup_if_done->is_done;
if (
$self->status eq 'pending'
and defined $self->scheduler
and $self->use_coro
) {
$self->scheduler->start_coro_task($self);
return $self->is_done ? 1 : 0;
}
if ( $self->status eq 'pending' and defined $self->scheduler ) {
$self->scheduler->run_pending_task($self);
return $self->is_done ? 1 : 0;
}
if ( $self->status eq 'sleeping' ) {
return 0 if time < ( $self->ready_at // 0 );
$self->result(undef);
$self->status('fulfilled');
lib/Zuzu/Value/Task.pm view on Meta::CPAN
return;
}
sub cancel {
my ( $self, $reason ) = @_;
return $self
if $self->is_done;
my $needs_coro_unwind = (
$self->status eq 'running'
and defined $self->coro
) ? 1 : 0;
if ( $self->status eq 'running' ) {
if ( defined $self->pid ) {
kill 'TERM', -$self->pid;
kill 'TERM', $self->pid;
}
waitpid( $self->pid, 0 ) if defined $self->pid;
close $self->reader if $self->reader;
if ( defined $self->coro and $self->coro->can('throw') ) {
$self->coro->throw( defined $reason ? $reason : 'Task cancelled' );
}
}
$self->cancel_reason($reason) if defined $reason;
$self->error( defined $reason ? $reason : 'Task cancelled' );
my $on_cancel = $self->on_cancel;
$on_cancel->($self) if defined $on_cancel;
$self->status('cancelled');
$self->_run_cancel_cleanup if $needs_coro_unwind;
$self->scheduler->trace_task( cancel => $self )
if defined $self->scheduler
and $self->scheduler->can('trace_task');
$self->_cleanup_if_done if !$needs_coro_unwind;
return $self;
}
sub await {
my ( $self ) = @_;
if (
defined $self->scheduler
and !$self->is_done
( run in 1.373 second using v1.01-cache-2.11-cpan-df04353d9ac )