Beekeeper

 view release on metacpan or  search on metacpan

lib/Beekeeper/Worker/Extension/SharedCache.pm  view on Meta::CPAN

    return {
        uid   => $self->{uid},
        time  => Time::HiRes::time(),
        dump  => \@dump,
    };
}

sub _merge_dump {
    my ($self, $dump) = @_;

    # Discard dumps sent by myself
    return if ($dump->{uid} eq $self->{uid});

    foreach my $entry (@{$dump->{dump}}) {
        $self->_merge($entry);
    }
}

sub touch {
    my ($self, $key) = @_;

    return unless defined $self->{data}->{$key};

    croak "No max_age specified (gc is disabled)" unless $self->{max_age};

    my $age = time() - $self->{time}->{$key};

    return unless ( $age > $self->{max_age} * 0.3);
    return unless ( $age < $self->{max_age} * 1.3);

    # Set to current value but without increasing version
    $self->{vers}->{$key}--;

    $self->set( $key => $self->{data}->{$key} );
}

sub _gc {
    my $self = shift;

    my $min_time = time() - $self->{max_age} * 1.3;

    foreach my $key (keys %{$self->{data}}) {

        next unless ( $self->{time}->{$key} < $min_time );
        next unless ( defined $self->{data}->{$key} );

        $self->delete( $key );
    }
}

sub _save_state {
    my $self = shift;

    return unless ($self->{synced});

    my $id = $self->{id};
    my ($pool_id) = ($self->{pool_id} =~ m/^([\w-]+)$/); # untaint
    my $tmp_file = "/tmp/beekeeper-sharedcache-$pool_id-$id.dump";

    # Avoid stampede when several workers are exiting simultaneously
    return if (-e $tmp_file && (stat($tmp_file))[9] == time());

    # Lock file because several workers may try to write simultaneously to it
    sysopen(my $fh, $tmp_file, O_RDWR|O_CREAT) or return;
    flock($fh, LOCK_EX | LOCK_NB) or return;
    truncate($fh, 0) or return;

    print $fh encode_json( $self->dump );

    close($fh);
}

sub _load_state {
    my $self = shift;

    my $id = $self->{id};
    my ($pool_id) = ($self->{pool_id} =~ m/^([\w-]+)$/); # untaint
    my $tmp_file = "/tmp/beekeeper-sharedcache-$pool_id-$id.dump";

    return unless (-e $tmp_file);

    # Do not load stale dumps
    return if ($self->{max_age} && (stat($tmp_file))[9] < time() - $self->{max_age});

    local($/);
    open(my $fh, '<', $tmp_file) or die "Couldn't read $tmp_file: $!";
    my $data = <$fh>;
    close($fh);

    local $@;
    my $dump = eval { decode_json($data) };
    return if $@;

    my $min_time = $self->{max_age} ? time() - $self->{max_age} : undef;

    foreach my $entry (@{$dump->{dump}}) {
        # Do not merge stale entries
        next if ($min_time && $entry->[3] < $min_time);
        $self->_merge($entry);
    }
}

sub _disconnect {
    my $self = shift;

    $self->_save_state if $self->{persist};

    foreach my $bus (@{$self->{_BUS_GROUP}}) {

        next unless ($bus->{is_connected});
        $bus->disconnect;
    }
}

sub DESTROY {
    my $self = shift;

    $self->_disconnect;
}

1;

__END__

=pod

=encoding utf8

=head1 NAME

Beekeeper::Worker::Extension::SharedCache - Locally mirrored shared cache

=head1 VERSION

Version 0.09

=head1 SYNOPSIS

  use Beekeeper::Worker::Extension::SharedCache;
  
  my $c = $self->shared_cache(
      id      => "mycache",
      max_age => 300,



( run in 1.372 second using v1.01-cache-2.11-cpan-39bf76dae61 )