Zuzu
view release on metacpan or search on metacpan
lib/Zuzu/Runtime/Async/Scheduler.pm view on Meta::CPAN
has 'next_task_id' => ( is => 'rw', default => sub { 1 } );
has 'trace_events' => ( is => 'rw', default => sub { [] } );
sub BUILD {
my ( $self ) = @_;
weaken( $self->{runtime} ) if defined $self->{runtime};
$self->current_group( $self->root_group );
}
sub new_group {
my ( $self, $parent ) = @_;
return Zuzu::Runtime::Async::TaskGroup->new(
parent => $parent // $self->current_group // $self->root_group,
);
}
sub debug_enabled {
no warnings 'once';
return $Zuzu::Runtime::DEBUG_LEVEL > 0 ? 1 : 0;
}
sub trace_task {
my ( $self, $event, $task, $extra ) = @_;
return if !$self->debug_enabled;
my $record = {
event => $event,
task_id => defined $task && $task->can('id') ? $task->id : undef,
parent_task_id => defined $task && $task->can('parent_id')
? $task->parent_id
: undef,
name => defined $task && $task->can('name') ? $task->name : undef,
status => defined $task && $task->can('status') ? $task->status : undef,
file => defined $task && $task->can('file') ? $task->file : undef,
line => defined $task && $task->can('line') ? $task->line : undef,
};
if ( defined $extra ) {
$record->{$_} = $extra->{$_} for keys %$extra;
}
push @{ $self->trace_events }, $record;
return $record;
}
sub clear_trace {
my ( $self ) = @_;
$self->trace_events( [] );
return;
}
sub schedule {
my ( $self, $task, $group ) = @_;
return $task if !defined $task;
$task->scheduler($self) if $task->can('scheduler');
$task->use_coro(1) if $task->can('use_coro');
if ( $task->can('id') and !defined $task->id ) {
$task->id( $self->next_task_id );
$self->next_task_id( $self->next_task_id + 1 );
}
if (
$task->can('parent_id')
and !defined $task->parent_id
and defined $self->current_task
and $self->current_task->can('id')
) {
$task->parent_id( $self->current_task->id );
}
$group //= $self->current_group // $self->root_group;
$group->add($task) if $group;
$self->tasks->{ refaddr($task) } = $task;
$self->trace_task( schedule => $task );
return $task;
}
sub ensure_scheduled {
my ( $self, $task ) = @_;
return $task if !defined $task;
return $self->schedule($task)
if !exists $self->tasks->{ refaddr($task) };
$task->scheduler($self) if $task->can('scheduler') and !$task->scheduler;
$task->use_coro(1) if $task->can('use_coro');
return $task;
}
sub _task_done {
my ( $task ) = @_;
return 1
if $task->status eq 'fulfilled'
or $task->status eq 'rejected'
or $task->status eq 'cancelled';
return 0;
}
sub cleanup_task {
my ( $self, $task ) = @_;
return if !defined $task;
return if !_task_done($task);
delete $self->tasks->{ refaddr($task) };
if ( $task->can('group') and defined $task->group ) {
$task->group->remove($task);
}
$self->trace_task( cleanup => $task );
return;
}
sub cleanup_done {
my ( $self ) = @_;
for my $task ( values %{ $self->tasks } ) {
$self->cleanup_task($task);
}
return;
}
sub active_count {
my ( $self ) = @_;
$self->cleanup_done;
return scalar keys %{ $self->tasks };
}
sub shutdown {
my ( $self, $reason ) = @_;
for my $task ( values %{ $self->tasks } ) {
next if !defined $task;
$task->cancel($reason)
if $task->can('cancel') and !_task_done($task);
}
$self->cleanup_done;
$self->current_task(undef);
$self->current_group( $self->root_group );
$self->trace_task( shutdown => undef );
return;
}
sub run_until {
my ( $self, $target ) = @_;
$self->ensure_scheduled($target) if $target->can('scheduler');
while ( !_task_done($target) ) {
my $progress = $self->progress_once($target);
cede;
sleep(0.001) if !$progress and !_task_done($target);
}
$self->cleanup_task($target);
return $target;
}
sub progress_once {
my ( $self, $target ) = @_;
my @tasks = values %{ $self->tasks };
push @tasks, $target
if defined $target
and !exists $self->tasks->{ refaddr($target) };
my $progress = 0;
for my $task ( @tasks ) {
next if !defined $task;
my $before = $task->status;
if ( $task->status eq 'pending' ) {
if ( $task->can('use_coro') and $task->use_coro ) {
$self->start_coro_task($task);
}
else {
$self->run_pending_task($task);
}
$progress = 1;
}
else {
$progress = 1 if $task->poll;
}
$progress = 1 if $task->status ne $before;
$self->cleanup_task($task);
}
return $progress;
}
sub _save_task_runtime_stack {
my ( $self, $task ) = @_;
my $runtime = $self->runtime;
return if !defined $runtime or !defined $task;
$task->runtime_stack( [ @{ $runtime->{_stack} // [] } ] )
if $task->can('runtime_stack');
return;
}
sub _restore_task_runtime_stack {
my ( $self, $task ) = @_;
my $runtime = $self->runtime;
return if !defined $runtime or !defined $task;
return if !$task->can('runtime_stack') or !defined $task->runtime_stack;
$runtime->{_stack} = [ @{ $task->runtime_stack } ];
return;
}
sub _throw_if_task_cancelled {
my ( $self, $task ) = @_;
return if !defined $task;
return if !$task->can('status') or $task->status ne 'cancelled';
die $task->error;
}
sub run_pending_task {
my ( $self, $task ) = @_;
my $prior_task = $self->current_task;
my $prior_group = $self->current_group;
my $runtime = $self->runtime;
my $prior_stack = defined $runtime
? [ @{ $runtime->{_stack} // [] } ]
: undef;
$self->current_task($task);
$self->current_group( $task->group // $prior_group // $self->root_group );
$self->_restore_task_runtime_stack($task);
$self->trace_task( start => $task );
my $ok = eval {
$task->_run_pending;
1;
};
my $err = $@ if !$ok;
$self->_save_task_runtime_stack($task);
$runtime->{_stack} = $prior_stack if defined $runtime and defined $prior_stack;
$self->current_task($prior_task);
$self->current_group($prior_group);
die $err if !$ok;
$self->trace_task( $task->status eq 'fulfilled' ? 'fulfill' : 'reject', $task )
if _task_done($task);
$self->cleanup_task($task);
return $task;
}
sub start_coro_task {
my ( $self, $task ) = @_;
return $task if !defined $task;
return $task if $task->is_done;
return $task if defined $task->coro;
my $coro = async {
my $prior_task = $self->current_task;
my $prior_group = $self->current_group;
my $runtime = $self->runtime;
my $prior_stack = defined $runtime
? [ @{ $runtime->{_stack} // [] } ]
: undef;
$self->current_task($task);
$self->current_group(
$task->group // $prior_group // $self->root_group
);
$self->_restore_task_runtime_stack($task);
$task->status('running') if $task->status eq 'pending';
$self->trace_task( start => $task );
my $ok = eval {
$task->_run_body;
1;
};
my $err = $@ if !$ok;
$self->_save_task_runtime_stack($task);
$runtime->{_stack} = $prior_stack
if defined $runtime and defined $prior_stack;
$self->current_task($prior_task);
$self->current_group($prior_group);
if ( !$ok ) {
$task->error($err);
$task->status('rejected');
}
$self->trace_task(
$task->status eq 'fulfilled' ? 'fulfill' : 'reject',
$task,
) if _task_done($task);
$self->cleanup_task($task);
};
$task->coro($coro);
return $task;
}
sub yield_current_task {
my ( $self ) = @_;
my $current = $self->current_task;
my $group = $self->current_group;
$self->_save_task_runtime_stack($current);
cede;
$self->current_task($current);
$self->current_group($group);
$self->_restore_task_runtime_stack($current);
$self->_throw_if_task_cancelled($current);
return;
}
sub await_from_current_task {
my ( $self, $target ) = @_;
$self->ensure_scheduled($target) if $target->can('scheduler');
my $current = $self->current_task;
while ( !_task_done($target) ) {
if ( $target->status eq 'pending' ) {
if ( $target->can('use_coro') and $target->use_coro ) {
$self->start_coro_task($target);
}
else {
$self->run_pending_task($target);
}
}
else {
$target->poll;
}
if ( !_task_done($target) ) {
$self->yield_current_task;
}
}
$self->cleanup_task($target);
return $target;
}
1;
=pod
=head1 COPYRIGHT AND LICENCE
B<< Zuzu::Runtime::Async::Scheduler >> is copyright Toby Inkster.
It is free software; you may redistribute it and/or modify it under
the terms of either the Artistic License 1.0 or the GNU General Public
License version 2.
=cut
( run in 1.602 second using v1.01-cache-2.11-cpan-df04353d9ac )