IPC-ConcurrencyLimit

 view release on metacpan or  search on metacpan

lib/IPC/ConcurrencyLimit/WithLatestStandby.pm  view on Meta::CPAN

            if $self->{process_name_change};
        return;
    }

    # Each worker tries to acquire the lock to its left. If it does
    # then it abandons its old lock. If that means the worker ends up
    # on locker_id 0 then they are done, and can do work.
    # The first standby worker also looks to its right to see if there
    # is a replacement process for it, if there is it exits, leaving
    # a gap and letting the replacements shuffle left.
    my $tries= 0;
    my $lock_tries= 0;
    my $standby_start= time();
    my $lock_start= time();
    my $poll_time= $self->{poll_time};

    while ( $locker_id > 0 ) {
        $0 = "$old_oh - $names->[$locker_id]"
            if $self->{process_name_change};

        # can we shuffle our lock left?
        if ( $locker->[$locker_id - 1]->get_lock() ) {
            $self->_diag( "Got a $names->[$locker_id -1] lock, dropping old $names->[$locker_id] lock")
                if $self->{debug};
            # yep, we got the lock to the left, so drop our old lock,
            # and move the pointer left at the same time.
            $locker->[ $locker_id-- ]->release_lock();
            $lock_tries= 0;
            $lock_start= time();
            next;
        }

        unless ($self->{retry_sub}->(++$tries, ++$lock_tries, time - $standby_start, time - $lock_start)) {
            $0 = "$old_oh - no-lock-timeout"
                if $self->{process_name_change};
            return;
        }

        # check if we are the first standby worker.
        if ( $locker_id == 1 ) {
            # yep - we are the first standby worker,
            # so check if the lock to our right is being held:
            if ( $locker->[$locker_id + 1]->get_lock() ) {
                # we got the lock, which means nothing else
                # holds it. so we release the lock and move on.
                $locker->[$locker_id + 1]->release_lock();
            } else {
                $self->_diag(
                    "A newer worker is holding the $names->[$locker_id+1] lock, will exit to let it take over"
                ) if $self->{debug};
                # we failed to get the lock, which means there is a newer
                # process that can replace us so return/exit - this frees up
                # our lock and lets the newer process to move into our position.
                $0 = "$old_oh - no-lock-retired"
                    if $self->{process_name_change};
                return;
            }
        }

        # nope - the lock to our left is being held so sleep a while before
        # we try again. We use the rand and the formula so that items to the
        # right poll faster than items to the left, and to reduce the chance
        # that lock holder 1 and lock holder 3 poll lock 2 at the same time
        # forever. The formula guarantees that items to the left poll faster,
        # and the rand ensures there is jitter.
        sleep rand(($poll_time / $locker_id)*2);
    }

    # assert that $locker_id is 0 at this point.
    die "panic: We should not reach this point with \$locker_id larger than 0, got $locker_id"
        if $locker_id;

    $self->_diag("Got $names->[$locker_id] lock, we are allowed to do work.")
        if $self->{debug};

    # at this point we should be $locker_id == 0 and we can do work.
    if ($self->{process_name_change}) {
        if ($self->{process_name_change} > 1) {
            $0 = $old_oh;
        } else {
            $0 = "$old_oh - $names->[$locker_id]"
        }
    }
    return 1;
}


sub is_locked {
    my $self = shift;
    return $self->{locker}[0]->is_locked(@_);
}

sub release_lock {
    my $self = shift;
    return $self->{locker}[0]->release_lock(@_);
}

sub lock_id {
    my $self = shift;
    return $self->{locker}[0]->lock_id(@_);
}

sub heartbeat {
    my $self = shift;
    return $self->{locker}[0]->heartbeat;
}


1;

__END__


=head1 NAME

IPC::ConcurrencyLimit::WithLatestStandby - IPC::ConcurrencyLimit with latest started working as standby

=head1 SYNOPSIS

  use IPC::ConcurrencyLimit::WithLatestStandby;

  sub run {
    my $limit = IPC::ConcurrencyLimit::WithLatestStandby->new(
      type              => 'Flock', # default, and currently only supported type



( run in 0.911 second using v1.01-cache-2.11-cpan-ceb78f64989 )