Zuzu

 view release on metacpan or  search on metacpan

lib/Zuzu/Runtime/Async/Scheduler.pm  view on Meta::CPAN

	$self->trace_events( [] );

	return;
}

sub schedule {
	my ( $self, $task, $group ) = @_;

	return $task if !defined $task;
	$task->scheduler($self) if $task->can('scheduler');
	$task->use_coro(1) if $task->can('use_coro');
	if ( $task->can('id') and !defined $task->id ) {
		$task->id( $self->next_task_id );
		$self->next_task_id( $self->next_task_id + 1 );
	}
	if (
		$task->can('parent_id')
		and !defined $task->parent_id
		and defined $self->current_task
		and $self->current_task->can('id')
	) {

lib/Zuzu/Runtime/Async/Scheduler.pm  view on Meta::CPAN

	return $task;
}

sub ensure_scheduled {
	my ( $self, $task ) = @_;

	return $task if !defined $task;
	return $self->schedule($task)
		if !exists $self->tasks->{ refaddr($task) };
	$task->scheduler($self) if $task->can('scheduler') and !$task->scheduler;
	$task->use_coro(1) if $task->can('use_coro');

	return $task;
}

sub _task_done {
	my ( $task ) = @_;

	return 1
		if $task->status eq 'fulfilled'
		or $task->status eq 'rejected'

lib/Zuzu/Runtime/Async/Scheduler.pm  view on Meta::CPAN

	my @tasks = values %{ $self->tasks };
	push @tasks, $target
		if defined $target
		and !exists $self->tasks->{ refaddr($target) };

	my $progress = 0;
	for my $task ( @tasks ) {
		next if !defined $task;
		my $before = $task->status;
		if ( $task->status eq 'pending' ) {
			if ( $task->can('use_coro') and $task->use_coro ) {
				$self->start_coro_task($task);
			}
			else {
				$self->run_pending_task($task);
			}
			$progress = 1;
		}
		else {
			$progress = 1 if $task->poll;
		}
		$progress = 1 if $task->status ne $before;

lib/Zuzu/Runtime/Async/Scheduler.pm  view on Meta::CPAN

	$self->current_task($prior_task);
	$self->current_group($prior_group);
	die $err if !$ok;
	$self->trace_task( $task->status eq 'fulfilled' ? 'fulfill' : 'reject', $task )
		if _task_done($task);
	$self->cleanup_task($task);

	return $task;
}

sub start_coro_task {
	my ( $self, $task ) = @_;

	return $task if !defined $task;
	return $task if $task->is_done;
	return $task if defined $task->coro;
	my $coro = async {
		my $prior_task = $self->current_task;
		my $prior_group = $self->current_group;
		my $runtime = $self->runtime;
		my $prior_stack = defined $runtime
			? [ @{ $runtime->{_stack} // [] } ]
			: undef;
		$self->current_task($task);
		$self->current_group(
			$task->group // $prior_group // $self->root_group
		);

lib/Zuzu/Runtime/Async/Scheduler.pm  view on Meta::CPAN

		if ( !$ok ) {
			$task->error($err);
			$task->status('rejected');
		}
		$self->trace_task(
			$task->status eq 'fulfilled' ? 'fulfill' : 'reject',
			$task,
		) if _task_done($task);
		$self->cleanup_task($task);
	};
	$task->coro($coro);

	return $task;
}

sub yield_current_task {
	my ( $self ) = @_;

	my $current = $self->current_task;
	my $group = $self->current_group;
	$self->_save_task_runtime_stack($current);

lib/Zuzu/Runtime/Async/Scheduler.pm  view on Meta::CPAN

	return;
}

sub await_from_current_task {
	my ( $self, $target ) = @_;

	$self->ensure_scheduled($target) if $target->can('scheduler');
	my $current = $self->current_task;
	while ( !_task_done($target) ) {
		if ( $target->status eq 'pending' ) {
			if ( $target->can('use_coro') and $target->use_coro ) {
				$self->start_coro_task($target);
			}
			else {
				$self->run_pending_task($target);
			}
		}
		else {
			$target->poll;
		}
		if ( !_task_done($target) ) {
			$self->yield_current_task;

lib/Zuzu/Value/Task.pm  view on Meta::CPAN

has 'ready_at' => ( is => 'rw' );
has 'poll_cb' => ( is => 'rw' );
has 'on_cancel' => ( is => 'rw' );
has 'scheduler' => ( is => 'rw' );
has 'group' => ( is => 'rw' );
has 'id' => ( is => 'rw' );
has 'parent_id' => ( is => 'rw' );
has 'file' => ( is => 'rw' );
has 'line' => ( is => 'rw' );
has 'cancel_reason' => ( is => 'rw' );
has 'coro' => ( is => 'rw' );
has 'use_coro' => ( is => 'rw', default => sub { 0 } );
has 'runtime_stack' => ( is => 'rw' );
has 'cleanup_ran' => ( is => 'rw', default => sub { 0 } );

sub is_truthy { 1 }

sub to_String {
	my ( $self ) = @_;

	return '[Task ' . $self->status . ']';
}

lib/Zuzu/Value/Task.pm  view on Meta::CPAN

}

sub poll {
	my ( $self ) = @_;

	return 1
		if $self->_cleanup_if_done->is_done;
	if (
		$self->status eq 'pending'
		and defined $self->scheduler
		and $self->use_coro
	) {
		$self->scheduler->start_coro_task($self);
		return $self->is_done ? 1 : 0;
	}
	if ( $self->status eq 'pending' and defined $self->scheduler ) {
		$self->scheduler->run_pending_task($self);
		return $self->is_done ? 1 : 0;
	}
	if ( $self->status eq 'sleeping' ) {
		return 0 if time < ( $self->ready_at // 0 );
		$self->result(undef);
		$self->status('fulfilled');

lib/Zuzu/Value/Task.pm  view on Meta::CPAN


	return;
}

sub cancel {
	my ( $self, $reason ) = @_;

	return $self
		if $self->is_done;

	my $needs_coro_unwind = (
		$self->status eq 'running'
		and defined $self->coro
	) ? 1 : 0;
	if ( $self->status eq 'running' ) {
		if ( defined $self->pid ) {
			kill 'TERM', -$self->pid;
			kill 'TERM', $self->pid;
		}
		waitpid( $self->pid, 0 ) if defined $self->pid;
		close $self->reader if $self->reader;
		if ( defined $self->coro and $self->coro->can('throw') ) {
			$self->coro->throw( defined $reason ? $reason : 'Task cancelled' );
		}
	}
	$self->cancel_reason($reason) if defined $reason;
	$self->error( defined $reason ? $reason : 'Task cancelled' );
	my $on_cancel = $self->on_cancel;
	$on_cancel->($self) if defined $on_cancel;
	$self->status('cancelled');
	$self->_run_cancel_cleanup if $needs_coro_unwind;
	$self->scheduler->trace_task( cancel => $self )
		if defined $self->scheduler
		and $self->scheduler->can('trace_task');
	$self->_cleanup_if_done if !$needs_coro_unwind;

	return $self;
}

sub await {
	my ( $self ) = @_;

	if (
		defined $self->scheduler
		and !$self->is_done



( run in 1.373 second using v1.01-cache-2.11-cpan-df04353d9ac )