IPC-LeaderBoard
view release on metacpan or search on metacpan
lib/IPC/LeaderBoard.pm view on Meta::CPAN
# die if we can't lock it, that means, another master-process
# already acquired it
flock($fd, LOCK_EX | LOCK_NB)
|| die("LeaderBoard ($filename) is owned by some other process, cannot lock it exclusively");
# we use the addtitional fields: for spinlock and generation
my $declared_size = $self->slot_shared_size + $self->slot_private_size + 2;
$score_board = IPC::ScoreBoard->named($filename, $self->n_slots, $declared_size, 0);
$self->_fd($fd);
}
$self->_generation_idx($self->slot_shared_size + 1); # [spin_lock | shared_data | generation | private_data ]
$self->_score_board($score_board);
return;
}
sub DEMOLISH {
my $self = shift;
# actually we need that only for tests
if ($self->_mode eq 'master') {
flock($self->_fd, LOCK_UN) if ($self->_fd);
}
return;
}
sub attach {
return IPC::LeaderBoard->new({
_mode => 'slave',
@_,
});
}
sub create {
return IPC::LeaderBoard->new({
_mode => 'master',
@_,
});
}
# our use-case implies, that if we read a bit outdated data, this is OK, because
# the generation field will be outdated, hence, no update would occur
sub read_slot {
my ($self, $idx) = @_;
die("wrong index") if ($idx >= $self->n_slots) || $idx < 0;
my @all_values = $self->_score_board->get_all($idx);
# drop spinlock and generation
my $generation = splice @all_values, $self->_generation_idx, 1;
splice @all_values, 0, 1;
# record generation + index for further possible update
$self->_last_idx($idx);
$self->_last_generation($generation);
# separate shared and private data
my $shared_size = $self->slot_shared_size;
my @shared_values = @all_values[0 .. $shared_size - 1];
my @private_values = @all_values[$shared_size .. $shared_size + $self->slot_private_size - 1];
return \@shared_values, \@private_values;
}
sub update {
my ($self, $idx, @rest) = @_;
my $values = (@rest && ref($rest[0]) eq 'ARRAY') ? shift(@rest) : undef;
my %private_values = @rest;
my $operation_result = 0;
die("wrong index") if ($idx >= $self->n_slots) || $idx < 0;
die("update for only last read index is allowed") if $idx != $self->_last_idx;
my $sb = $self->_score_board;
# updating shared values
if ($values) {
die("values size mismatch slot size") if @$values != $self->slot_shared_size;
# obtain spin-lock
my $attempts = 0;
while ($sb->incr($idx, 0) != 1) {
$sb->decr($idx, 0);
if (++$attempts > $max_lock_attempts) {
warn("failed to acquire spin lock for row $idx after $attempts attempts");
return 0;
}
}
# release the lock at the end of the scope
scope_guard { $sb->decr($idx, 0) };
# now we hold the record, nobody else can update it.
# Atomically read generation value via increment it to zero.
# The simple $sb->get(...) cannot be used, because it does not guarantees
# atomicity, i.e. slot re-write is possible due to L1/L2 caches in CPU
my $actual_generation = $sb->incr($idx, $self->_generation_idx, 0);
if ($actual_generation == $self->_last_generation) {
# now we are sure, that nobody else updated the record since our last read
# so we can safely update it
# +1 because the 1st field is spinlock
$sb->set($idx, $_ + 1, $values->[$_]) for (0 .. @$values - 1);
# increment the generation field
$sb->incr($idx, $self->_generation_idx);
# success
$operation_result = 1;
}
}
# updating private values
if (%private_values) {
my $idx_delta = $self->_generation_idx + 1;
for my $private_idx (keys %private_values) {
my $value = $private_values{$private_idx};
if (($private_idx >= $self->slot_private_size) || ($private_idx < 0)) {
die("wrong private index");
}
$sb->set($idx, $private_idx + $idx_delta, $value);
}
}
return $operation_result;
}
=head1 AUTHOR
( run in 0.878 second using v1.01-cache-2.11-cpan-bbe5e583499 )