Zuzu

 view release on metacpan or  search on metacpan

lib/Zuzu/Value/Task.pm  view on Meta::CPAN

package Zuzu::Value::Task;

use utf8;

our $VERSION = '0.006000';

use Moo;
use Scalar::Util qw( blessed refaddr );
use Storable qw( fd_retrieve nstore_fd );
use POSIX qw( WNOHANG );
use Time::HiRes qw( sleep time );

has 'name' => ( is => 'rw', default => sub { '<task>' } );
has 'thunk' => ( is => 'rw' );
has 'status' => ( is => 'rw', default => sub { 'pending' } );
has 'result' => ( is => 'rw' );
has 'error' => ( is => 'rw' );
has 'pid' => ( is => 'rw' );
has 'reader' => ( is => 'rw' );
has 'ready_at' => ( is => 'rw' );
has 'poll_cb' => ( is => 'rw' );
has 'on_cancel' => ( is => 'rw' );
has 'scheduler' => ( is => 'rw' );
has 'group' => ( is => 'rw' );
has 'id' => ( is => 'rw' );
has 'parent_id' => ( is => 'rw' );
has 'file' => ( is => 'rw' );
has 'line' => ( is => 'rw' );
has 'cancel_reason' => ( is => 'rw' );
has 'coro' => ( is => 'rw' );
has 'use_coro' => ( is => 'rw', default => sub { 0 } );
has 'runtime_stack' => ( is => 'rw' );
has 'cleanup_ran' => ( is => 'rw', default => sub { 0 } );

sub is_truthy { 1 }

sub to_String {
	my ( $self ) = @_;

	return '[Task ' . $self->status . ']';
}

sub is_done {
	my ( $self ) = @_;

	return 1
		if $self->status eq 'fulfilled'
		or $self->status eq 'rejected'
		or $self->status eq 'cancelled';
	return 0;
}

sub _cleanup_if_done {
	my ( $self ) = @_;

	$self->scheduler->cleanup_task($self)
		if $self->is_done
		and defined $self->scheduler
		and $self->scheduler->can('cleanup_task');

	return $self;
}

sub start {
	my ( $self ) = @_;

	return $self if $self->status ne 'pending';

	pipe( my $reader, my $writer )
		or die "Could not create task pipe: $!";
	my $pid = fork();
	die "Could not fork task: $!" if !defined $pid;

	if ( $pid == 0 ) {
		close $reader;
		eval { setpgrp( 0, 0 ); 1 };
		my $payload;
		my $ok = eval {
			my $thunk = $self->thunk;
			my $value = defined $thunk ? $thunk->() : undef;
			while (
				blessed($value)
				and $value->isa('Zuzu::Value::Task')
				and $value != $self
			) {
				$value = $value->await;
			}
			$payload = {
				ok => 1,
				value => $value,
			};

lib/Zuzu/Value/Task.pm  view on Meta::CPAN

			1;
		};
		if ( !$stored ) {
			my $fallback = {
				ok => 0,
				error_string => "Task result could not be serialized: $@",
			};
			eval { nstore_fd( $fallback, $writer ); 1 };
		}
		close $writer;
		require POSIX;
		POSIX::_exit(0);
	}

	close $writer;
	$self->pid($pid);
	$self->reader($reader);
	$self->status('running');

	return $self;
}

sub _finish_running {
	my ( $self, $already_waited ) = @_;

	my $payload = eval { fd_retrieve( $self->reader ) };
	my $retrieve_error = $@;
	close $self->reader if $self->reader;
	waitpid( $self->pid, 0 ) if !$already_waited and defined $self->pid;

	if ( !$payload ) {
		$self->error(
			"Task failed before returning a result: $retrieve_error"
		);
		$self->status('rejected');
	}
	elsif ( $payload->{ok} ) {
		$self->result( $payload->{value} );
		$self->status('fulfilled');
	}
	else {
		$self->error(
			exists $payload->{error}
				? $payload->{error}
				: $payload->{error_string}
		);
		$self->status('rejected');
	}

	return $self;
}

sub poll {
	my ( $self ) = @_;

	return 1
		if $self->_cleanup_if_done->is_done;
	if (
		$self->status eq 'pending'
		and defined $self->scheduler
		and $self->use_coro
	) {
		$self->scheduler->start_coro_task($self);
		return $self->is_done ? 1 : 0;
	}
	if ( $self->status eq 'pending' and defined $self->scheduler ) {
		$self->scheduler->run_pending_task($self);
		return $self->is_done ? 1 : 0;
	}
	if ( $self->status eq 'sleeping' ) {
		return 0 if time < ( $self->ready_at // 0 );
		$self->result(undef);
		$self->status('fulfilled');
		$self->_cleanup_if_done;
		return 1;
	}
	if ( $self->status eq 'waiting' ) {
		my $poll = $self->poll_cb;
		return 0 if !defined $poll;
		my ( $done, $ok, $value ) = $poll->();
		return 0 if !$done;
		if ( $ok ) {
			$self->result($value);
			$self->status('fulfilled');
		}
		else {
			$self->error($value);
			$self->status('rejected');
		}
		$self->_cleanup_if_done;
		return 1;
	}
	return 0 if $self->status ne 'running';
	return 0 if !defined $self->pid;

	my $kid = waitpid( $self->pid, WNOHANG );
	return 0 if $kid == 0;
	$self->_finish_running(1);
	$self->_cleanup_if_done;

	return 1;
}

sub _run_pending {
	my ( $self ) = @_;

	return $self if $self->status ne 'pending';

	my $value;
	my $ok = eval {
		$self->status('running');
		my $thunk = $self->thunk;
		$value = defined $thunk ? $thunk->() : undef;
		while (
			blessed($value)
			and $value->isa('Zuzu::Value::Task')
			and $value != $self
		) {
			$value = $value->await;
		}
		1;
	};
	if ( $ok ) {

lib/Zuzu/Value/Task.pm  view on Meta::CPAN

		$value = defined $thunk ? $thunk->() : undef;
		while (
			blessed($value)
			and $value->isa('Zuzu::Value::Task')
			and $value != $self
		) {
			$value = $value->await;
		}
		1;
	};
	if ( $ok ) {
		$self->result($value);
		$self->status('fulfilled');
	}
	else {
		if ( $self->status ne 'cancelled' ) {
			$self->error($@);
			$self->status('rejected');
		}
	}
	$self->_cleanup_if_done;

	return $self;
}

sub _run_cancel_cleanup {
	my ( $self ) = @_;

	return if $self->cleanup_ran;
	$self->cleanup_ran(1);
	my %seen;
	for my $env ( reverse @{ $self->runtime_stack // [] } ) {
		next if !blessed($env) or !$env->can('slots');
		for my $ref ( values %{ $env->slots // {} } ) {
			next if ref($ref) ne 'SCALAR';
			my $value = $$ref;
			next
				if !blessed($value)
				or !$value->isa('Zuzu::Value::Object')
				or !$value->can('demolish_hook');
			my $addr = refaddr($value);
			next if !$addr;
			next if $seen{$addr}++;
			my $hook = $value->demolish_hook;
			next if ref($hook) ne 'CODE';
			$value->demolish_hook(undef);
			local $@;
			eval { $hook->($value); 1 };
		}
	}

	return;
}

sub cancel {
	my ( $self, $reason ) = @_;

	return $self
		if $self->is_done;

	my $needs_coro_unwind = (
		$self->status eq 'running'
		and defined $self->coro
	) ? 1 : 0;
	if ( $self->status eq 'running' ) {
		if ( defined $self->pid ) {
			kill 'TERM', -$self->pid;
			kill 'TERM', $self->pid;
		}
		waitpid( $self->pid, 0 ) if defined $self->pid;
		close $self->reader if $self->reader;
		if ( defined $self->coro and $self->coro->can('throw') ) {
			$self->coro->throw( defined $reason ? $reason : 'Task cancelled' );
		}
	}
	$self->cancel_reason($reason) if defined $reason;
	$self->error( defined $reason ? $reason : 'Task cancelled' );
	my $on_cancel = $self->on_cancel;
	$on_cancel->($self) if defined $on_cancel;
	$self->status('cancelled');
	$self->_run_cancel_cleanup if $needs_coro_unwind;
	$self->scheduler->trace_task( cancel => $self )
		if defined $self->scheduler
		and $self->scheduler->can('trace_task');
	$self->_cleanup_if_done if !$needs_coro_unwind;

	return $self;
}

sub await {
	my ( $self ) = @_;

	if (
		defined $self->scheduler
		and !$self->is_done
		and defined $self->scheduler->current_task
		and $self->status ne 'running'
	) {
		$self->scheduler->await_from_current_task($self);
	}
	elsif (
		defined $self->scheduler
		and !$self->is_done
		and defined $self->scheduler->current_task
		and $self->status eq 'running'
		and defined $self->pid
	) {
		while ( !$self->is_done ) {
			$self->poll;
			$self->scheduler->yield_current_task
				if !$self->is_done
				and $self->scheduler->can('yield_current_task');
		}
	}
	elsif (
		defined $self->scheduler
		and !$self->is_done
		and $self->status ne 'running'
	) {
		$self->scheduler->run_until($self);
	}
	elsif ( $self->status eq 'running' and defined $self->pid ) {
		$self->_finish_running(0);
	}
	elsif ( $self->status eq 'sleeping' ) {
		sleep(0.001) while !$self->poll;
	}
	elsif ( $self->status eq 'waiting' ) {
		sleep(0.001) while !$self->poll;
	}
	elsif ( $self->status eq 'pending' ) {
		$self->_run_pending;
	}

	die $self->error if $self->status eq 'rejected';
	die $self->error if $self->status eq 'cancelled';
	return $self->result;
}

1;

=pod

=head1 COPYRIGHT AND LICENCE



( run in 0.692 second using v1.01-cache-2.11-cpan-df04353d9ac )