Beekeeper
view release on metacpan or search on metacpan
lib/Beekeeper/Worker/Extension/SharedCache.pm view on Meta::CPAN
my $cache_id = $self->{id};
my $uid = $self->{uid};
my $local_bus = $bus->{bus_role};
my $client_id = $bus->{client_id};
my $topic = "msg/$local_bus/_sync/$cache_id/set";
$bus->subscribe(
topic => $topic,
on_publish => sub {
# my ($payload_ref, $mqtt_properties) = @_;
my $entry = decode_json( ${$_[0]} );
$self->_merge($entry);
},
on_suback => sub {
my ($success) = @_;
log_error "Could not subscribe to topic '$topic'" unless $success;
}
);
my $reply_topic = "priv/$client_id/sync-$cache_id";
$bus->subscribe(
topic => $reply_topic,
on_publish => sub {
my ($payload_ref, $mqtt_properties) = @_;
my $dump = decode_json($$payload_ref);
$self->_merge_dump($dump);
$self->_sync_completed(1);
},
on_suback => sub {
my ($success) = @_;
log_error "Could not subscribe to reply topic '$reply_topic'" unless $success;
}
);
}
sub _send_sync_request {
my ($self, $bus) = @_;
weaken $self;
# Do not send more than one sync request at the time
return if $self->{_sync_timeout};
my $cache_id = $self->{id};
my $uid = $self->{uid};
my $local_bus = $bus->{bus_role};
my $client_id = $bus->{client_id};
$bus->publish(
topic => "req/$local_bus/_sync/$cache_id/dump",
response_topic => "priv/$client_id/sync-$cache_id",
);
# Ensure that timeout is set properly when the event loop was blocked
AnyEvent->now_update;
# When a fresh pool is started there is no master to reply sync requests
$self->{_sync_timeout} = AnyEvent->timer(
after => SYNC_REQUEST_TIMEOUT,
cb => sub { $self->_sync_completed(0) },
);
}
sub _sync_completed {
my ($self, $success) = @_;
delete $self->{_sync_timeout};
return if $self->{synced};
# BUG: When a fresh pool is started there is no master to reply sync requests.
# When two fresh pools are started at t0 and t1 time, and (t1 - t0) < SYNC_REQUEST_TIMEOUT,
# cache updates in the t0-t1 range are not properly synced in the pool wich was started later
log_debug( "Shared cache '$self->{id}': " . ($success ? "Sync completed" : "Acting as master"));
$self->{synced} = 1;
foreach my $bus ( @{$self->{_BUS_GROUP}} ) {
# Connections to other buses could have failed or be in progress
next unless $bus->{is_connected};
$self->_accept_sync_requests($bus);
}
}
sub _accept_sync_requests {
my ($self, $bus) = @_;
weaken $self;
weaken $bus;
my $cache_id = $self->{id};
my $uid = $self->{uid};
my $bus_id = $bus->{bus_id};
my $local_bus = $bus->{bus_role};
log_debug "Shared cache '$self->{id}': Accepting sync requests from $local_bus";
my $topic = "\$share/BKPR/req/$local_bus/_sync/$cache_id/dump";
$bus->subscribe(
topic => $topic,
on_publish => sub {
my ($payload_ref, $mqtt_properties) = @_;
my $dump = encode_json( $self->dump );
$bus->publish(
topic => $mqtt_properties->{'response_topic'},
payload => \$dump,
);
},
on_suback => sub {
my ($success) = @_;
log_error "Could not subscribe to topic '$topic'" unless $success;
}
);
}
sub _ping_backend_brokers {
my $self = shift;
foreach my $bus (@{$self->{_BUS_GROUP}}) {
next unless $bus->{is_connected};
$bus->pingreq;
}
}
my $_now = 0;
sub set {
my ($self, $key, $value) = @_;
weaken $self;
croak "Key value is undefined" unless (defined $key);
my $old = $self->{data}->{$key};
$self->{data}->{$key} = $value;
$self->{vers}->{$key}++;
$self->{time}->{$key} = Time::HiRes::time();
my $json = encode_json([
$key,
$value,
$self->{vers}->{$key},
$self->{time}->{$key},
$self->{uid},
]);
$self->{on_update}->($key, $value, $old) if $self->{on_update};
# Notify all workers in every cluster about the change
my @bus_group = grep { $_->{is_connected} } @{$self->{_BUS_GROUP}};
unshift @bus_group, $self->{_BUS};
foreach my $bus (@bus_group) {
my $local_bus = $bus->{bus_role};
my $cache_id = $self->{id};
$bus->publish(
topic => "msg/$local_bus/_sync/$cache_id/set",
payload => \$json,
);
}
unless (defined $value) {
# Postpone delete because it is necessary to keep the versioning
# of this modification until it is propagated to all workers
# Ensure that timer is set properly when the event loop was blocked
if ($_now != time) { $_now = time; AnyEvent->now_update }
$self->{_destroy}->{$key} = AnyEvent->timer( after => 60, cb => sub {
delete $self->{_destroy}->{$key};
delete $self->{data}->{$key};
delete $self->{vers}->{$key};
delete $self->{time}->{$key};
});
}
return 1;
}
sub get {
my ($self, $key) = @_;
$self->{data}->{$key};
}
sub delete {
my ($self, $key) = @_;
$self->set( $key => undef );
}
sub raw_data {
my $self = shift;
$self->{data};
}
sub _merge {
my ($self, $entry) = @_;
my ($key, $value, $version, $time, $uid) = @$entry;
# Discard updates sent by myself
return if (defined $uid && $uid eq $self->{uid});
if ($version > ($self->{vers}->{$key} || 0)) {
# Received a fresher value for the entry
my $old = $self->{data}->{$key};
$self->{data}->{$key} = $value;
$self->{vers}->{$key} = $version;
$self->{time}->{$key} = $time;
$self->{on_update}->($key, $value, $old) if $self->{on_update};
}
elsif ($version < $self->{vers}->{$key}) {
# Received a stale value (we have a newest version)
return;
}
else {
# Version conflict, default resolution is to keep newest value
my $resolver = $self->{resolver} || sub {
return $_[0]->{time} > $_[1]->{time} ? $_[0] : $_[1];
};
my $keep = $resolver->(
{ # Mine
data => $self->{data}->{$key},
vers => $self->{vers}->{$key},
time => $self->{time}->{$key},
},
{ # Theirs
data => $value,
vers => $version,
time => $time,
},
);
my $old = $self->{data}->{$key};
$self->{data}->{$key} = $keep->{data};
$self->{vers}->{$key} = $keep->{vers};
$self->{time}->{$key} = $keep->{time};
$self->{on_update}->($key, $keep->{data}, $old) if $self->{on_update};
}
unless (defined $self->{data}->{$key}) {
# Postpone delete because it is necessary to keep the versioning
# of this modification until it is propagated to all workers
# Ensure that timer is set properly when the event loop was blocked
if ($_now != time) { $_now = time; AnyEvent->now_update }
$self->{_destroy}->{$key} = AnyEvent->timer( after => 60, cb => sub {
delete $self->{_destroy}->{$key};
delete $self->{data}->{$key};
delete $self->{vers}->{$key};
delete $self->{time}->{$key};
});
}
}
sub dump {
my $self = shift;
my @dump;
foreach my $key (keys %{$self->{data}}) {
push @dump, [
$key,
$self->{data}->{$key},
$self->{vers}->{$key},
$self->{time}->{$key},
];
}
return {
uid => $self->{uid},
time => Time::HiRes::time(),
dump => \@dump,
};
}
sub _merge_dump {
my ($self, $dump) = @_;
# Discard dumps sent by myself
return if ($dump->{uid} eq $self->{uid});
foreach my $entry (@{$dump->{dump}}) {
$self->_merge($entry);
}
}
sub touch {
my ($self, $key) = @_;
return unless defined $self->{data}->{$key};
croak "No max_age specified (gc is disabled)" unless $self->{max_age};
my $age = time() - $self->{time}->{$key};
return unless ( $age > $self->{max_age} * 0.3);
return unless ( $age < $self->{max_age} * 1.3);
# Set to current value but without increasing version
$self->{vers}->{$key}--;
$self->set( $key => $self->{data}->{$key} );
}
( run in 1.223 second using v1.01-cache-2.11-cpan-ceb78f64989 )