IPC-ConcurrencyLimit
view release on metacpan or search on metacpan
lib/IPC/ConcurrencyLimit/WithLatestStandby.pm view on Meta::CPAN
if $self->{process_name_change};
return;
}
# Each worker tries to acquire the lock to its left. If it does
# then it abandons its old lock. If that means the worker ends up
# on locker_id 0 then they are done, and can do work.
# The first standby worker also looks to its right to see if there
# is a replacement process for it, if there is it exits, leaving
# a gap and letting the replacements shuffle left.
my $tries= 0;
my $lock_tries= 0;
my $standby_start= time();
my $lock_start= time();
my $poll_time= $self->{poll_time};
while ( $locker_id > 0 ) {
$0 = "$old_oh - $names->[$locker_id]"
if $self->{process_name_change};
# can we shuffle our lock left?
if ( $locker->[$locker_id - 1]->get_lock() ) {
$self->_diag( "Got a $names->[$locker_id -1] lock, dropping old $names->[$locker_id] lock")
if $self->{debug};
# yep, we got the lock to the left, so drop our old lock,
# and move the pointer left at the same time.
$locker->[ $locker_id-- ]->release_lock();
$lock_tries= 0;
$lock_start= time();
next;
}
unless ($self->{retry_sub}->(++$tries, ++$lock_tries, time - $standby_start, time - $lock_start)) {
$0 = "$old_oh - no-lock-timeout"
if $self->{process_name_change};
return;
}
# check if we are the first standby worker.
if ( $locker_id == 1 ) {
# yep - we are the first standby worker,
# so check if the lock to our right is being held:
if ( $locker->[$locker_id + 1]->get_lock() ) {
# we got the lock, which means nothing else
# holds it. so we release the lock and move on.
$locker->[$locker_id + 1]->release_lock();
} else {
$self->_diag(
"A newer worker is holding the $names->[$locker_id+1] lock, will exit to let it take over"
) if $self->{debug};
# we failed to get the lock, which means there is a newer
# process that can replace us so return/exit - this frees up
# our lock and lets the newer process to move into our position.
$0 = "$old_oh - no-lock-retired"
if $self->{process_name_change};
return;
}
}
# nope - the lock to our left is being held so sleep a while before
# we try again. We use the rand and the formula so that items to the
# right poll faster than items to the left, and to reduce the chance
# that lock holder 1 and lock holder 3 poll lock 2 at the same time
# forever. The formula guarantees that items to the left poll faster,
# and the rand ensures there is jitter.
sleep rand(($poll_time / $locker_id)*2);
}
# assert that $locker_id is 0 at this point.
die "panic: We should not reach this point with \$locker_id larger than 0, got $locker_id"
if $locker_id;
$self->_diag("Got $names->[$locker_id] lock, we are allowed to do work.")
if $self->{debug};
# at this point we should be $locker_id == 0 and we can do work.
if ($self->{process_name_change}) {
if ($self->{process_name_change} > 1) {
$0 = $old_oh;
} else {
$0 = "$old_oh - $names->[$locker_id]"
}
}
return 1;
}
sub is_locked {
my $self = shift;
return $self->{locker}[0]->is_locked(@_);
}
sub release_lock {
my $self = shift;
return $self->{locker}[0]->release_lock(@_);
}
sub lock_id {
my $self = shift;
return $self->{locker}[0]->lock_id(@_);
}
sub heartbeat {
my $self = shift;
return $self->{locker}[0]->heartbeat;
}
1;
__END__
=head1 NAME
IPC::ConcurrencyLimit::WithLatestStandby - IPC::ConcurrencyLimit with latest started working as standby
=head1 SYNOPSIS
use IPC::ConcurrencyLimit::WithLatestStandby;
sub run {
my $limit = IPC::ConcurrencyLimit::WithLatestStandby->new(
type => 'Flock', # default, and currently only supported type
( run in 0.911 second using v1.01-cache-2.11-cpan-ceb78f64989 )