Forks-Super

 view release on metacpan or  search on metacpan

lib/Forks/Super/Sync/IPCSemaphore.pm  view on Meta::CPAN

            # but we need to set $self->{acquired};
            $self->{acquired}[$i] = 1;
	}
    }
    return;
}

sub release {
    my ($self, $n) = @_;
    if ($n < 0 || $n >= $self->{count}) {
        return;
    }
    if ($self->{acquired}[$n]) {
	$self->{sems} && $self->{sems}->op($n, -1, 0);
	$self->{acquired}[$n] = 0;
	return 1;
    }
    return;
}

# robuster version of  $self->{sems}->op($n,0,FLAGS, $n,1,FLAGS)
# detects when partner process has died without removing the semaphore
# return true if successfully waited on lock and incremented the semaphore
sub _wait_on {
    my ($self, $n, $expire) = @_;
    if (!$self->{sems}) {
        return 1;
    }

    my $partner = $$ == $self->{ppid} ? $self->{childPid} : $self->{ppid};

    while (1) {
	local $! = 0;

	my $nk = $partner && CORE::kill 0, $partner;
	if (!$nk) {
	    carp "sync::_wait_on $$ thinks that $partner is gone ...return 3.1";
	    $self->{skip_wait_on} = 1;
	    delete $self->{sems};
	    return $Forks::Super::Sync::SYNC_PARTNER_GONE + 0.1;
	}
        if (!$self->{sems}) {
            carp "sync::_wait_on: semaphore resource not available";
            return 4;
        }

	my $z = $self->{sems}->op($n, 0, &IPC_NOWAIT,
                                  $n, 1, 0);
	if ($z) {
	    return 1;
	} elsif ($!{EINVAL}) {  # semaphore was removed

	    carp "sync::_wait_on: \$!=Invalid resource ... return 2";
	    return 2;
	}

	if ($expire > 0 && Time::HiRes::time() >= $expire) {
	    return 0;
	}

	# sem value not zero. Is the process that partner process still alive?
	if (!CORE::kill(0, $partner)) {
	    carp "sync::_wait_on thinks that $partner is gone ...return 3";
	    $self->{skip_wait_on} = 1;
	    delete $self->{sems};
	    return $Forks::Super::Sync::SYNC_PARTNER_GONE;
	}
	Time::HiRes::sleep( $NOWAIT_YIELD_DURATION );
	my $z5 = waitpid -1, &WNOHANG;
    }
    return; # unreachable
}

sub acquire {
    my ($self, $n, $timeout) = @_;
    if ($n < 0 || $n >= $self->{count}) {
	return;
    }
    if ($n >= 0 && $self->{acquired}[$n]) {
	return -1; # already acquired
    }


    my $expire = -1;
    if (defined $timeout) {
	$expire = Time::HiRes::time() + $timeout;
    }
    my $z = $self->_wait_on($n, $expire);
    if ($z > 0) {
        $self->{acquired}[$n] = 1;
    }

    if ($z > 1) {
	return "0 but true";
    }
    return $z;
}

END {
    foreach my $sync (@RELEASE_ON_EXIT) {
	$sync->release($_) for 0 .. $sync->{count} - 1;
	$sync->{sems} && $sync->{sems}->remove;
    }
}

1;

=head1 NAME

Forks::Super::Sync::IPCSemaphore
- Forks::Super sync object using SysV semaphores

=head1 SYNOPSIS

    $lock = Forks::Super::Sync->new(implementation => 'IPCSemaphore', ...);

    $pid=fork();
    $lock->releaseAfterFork();

    if ($pid == 0) { # child code
       $lock->acquire(...);



( run in 0.693 second using v1.01-cache-2.11-cpan-39bf76dae61 )