Beekeeper
view release on metacpan or search on metacpan
lib/Beekeeper/Worker/Extension/SharedCache.pm view on Meta::CPAN
return {
uid => $self->{uid},
time => Time::HiRes::time(),
dump => \@dump,
};
}
sub _merge_dump {
my ($self, $dump) = @_;
# Discard dumps sent by myself
return if ($dump->{uid} eq $self->{uid});
foreach my $entry (@{$dump->{dump}}) {
$self->_merge($entry);
}
}
sub touch {
my ($self, $key) = @_;
return unless defined $self->{data}->{$key};
croak "No max_age specified (gc is disabled)" unless $self->{max_age};
my $age = time() - $self->{time}->{$key};
return unless ( $age > $self->{max_age} * 0.3);
return unless ( $age < $self->{max_age} * 1.3);
# Set to current value but without increasing version
$self->{vers}->{$key}--;
$self->set( $key => $self->{data}->{$key} );
}
sub _gc {
my $self = shift;
my $min_time = time() - $self->{max_age} * 1.3;
foreach my $key (keys %{$self->{data}}) {
next unless ( $self->{time}->{$key} < $min_time );
next unless ( defined $self->{data}->{$key} );
$self->delete( $key );
}
}
sub _save_state {
my $self = shift;
return unless ($self->{synced});
my $id = $self->{id};
my ($pool_id) = ($self->{pool_id} =~ m/^([\w-]+)$/); # untaint
my $tmp_file = "/tmp/beekeeper-sharedcache-$pool_id-$id.dump";
# Avoid stampede when several workers are exiting simultaneously
return if (-e $tmp_file && (stat($tmp_file))[9] == time());
# Lock file because several workers may try to write simultaneously to it
sysopen(my $fh, $tmp_file, O_RDWR|O_CREAT) or return;
flock($fh, LOCK_EX | LOCK_NB) or return;
truncate($fh, 0) or return;
print $fh encode_json( $self->dump );
close($fh);
}
sub _load_state {
my $self = shift;
my $id = $self->{id};
my ($pool_id) = ($self->{pool_id} =~ m/^([\w-]+)$/); # untaint
my $tmp_file = "/tmp/beekeeper-sharedcache-$pool_id-$id.dump";
return unless (-e $tmp_file);
# Do not load stale dumps
return if ($self->{max_age} && (stat($tmp_file))[9] < time() - $self->{max_age});
local($/);
open(my $fh, '<', $tmp_file) or die "Couldn't read $tmp_file: $!";
my $data = <$fh>;
close($fh);
local $@;
my $dump = eval { decode_json($data) };
return if $@;
my $min_time = $self->{max_age} ? time() - $self->{max_age} : undef;
foreach my $entry (@{$dump->{dump}}) {
# Do not merge stale entries
next if ($min_time && $entry->[3] < $min_time);
$self->_merge($entry);
}
}
sub _disconnect {
my $self = shift;
$self->_save_state if $self->{persist};
foreach my $bus (@{$self->{_BUS_GROUP}}) {
next unless ($bus->{is_connected});
$bus->disconnect;
}
}
sub DESTROY {
my $self = shift;
$self->_disconnect;
}
1;
__END__
=pod
=encoding utf8
=head1 NAME
Beekeeper::Worker::Extension::SharedCache - Locally mirrored shared cache
=head1 VERSION
Version 0.09
=head1 SYNOPSIS
use Beekeeper::Worker::Extension::SharedCache;
my $c = $self->shared_cache(
id => "mycache",
max_age => 300,
( run in 1.372 second using v1.01-cache-2.11-cpan-39bf76dae61 )