Beekeeper

 view release on metacpan or  search on metacpan

lib/Beekeeper/Worker/Extension/SharedCache.pm  view on Meta::CPAN


    my $cache_id  = $self->{id};
    my $uid       = $self->{uid};
    my $local_bus = $bus->{bus_role};
    my $client_id = $bus->{client_id};

    my $topic = "msg/$local_bus/_sync/$cache_id/set";

    $bus->subscribe(
        topic      => $topic,
        on_publish => sub {
          # my ($payload_ref, $mqtt_properties) = @_;

            my $entry = decode_json( ${$_[0]} );

            $self->_merge($entry);
        },
        on_suback => sub {
            my ($success) = @_;
            log_error "Could not subscribe to topic '$topic'" unless $success;
        }
    );

    my $reply_topic = "priv/$client_id/sync-$cache_id";

    $bus->subscribe(
        topic      => $reply_topic,
        on_publish => sub {
            my ($payload_ref, $mqtt_properties) = @_;

            my $dump = decode_json($$payload_ref);

            $self->_merge_dump($dump);

            $self->_sync_completed(1);
        },
        on_suback => sub {
            my ($success) = @_;
            log_error "Could not subscribe to reply topic '$reply_topic'" unless $success;
        }
    );
}

sub _send_sync_request {
    my ($self, $bus) = @_;
    weaken $self;

    # Do not send more than one sync request at the time
    return if $self->{_sync_timeout};

    my $cache_id  = $self->{id};
    my $uid       = $self->{uid};
    my $local_bus = $bus->{bus_role};
    my $client_id = $bus->{client_id};

    $bus->publish(
        topic          => "req/$local_bus/_sync/$cache_id/dump",
        response_topic => "priv/$client_id/sync-$cache_id",
    );

    # Ensure that timeout is set properly when the event loop was blocked
    AnyEvent->now_update;

    # When a fresh pool is started there is no master to reply sync requests
    $self->{_sync_timeout} = AnyEvent->timer(
        after => SYNC_REQUEST_TIMEOUT,
        cb    => sub { $self->_sync_completed(0) },
    );
}

sub _sync_completed {
    my ($self, $success) = @_;

    delete $self->{_sync_timeout};

    return if $self->{synced};

    # BUG: When a fresh pool is started there is no master to reply sync requests.
    # When two fresh pools are started at t0 and t1 time, and (t1 - t0) < SYNC_REQUEST_TIMEOUT,
    # cache updates in the t0-t1 range are not properly synced in the pool wich was started later
    log_debug( "Shared cache '$self->{id}': " . ($success ? "Sync completed" : "Acting as master"));

    $self->{synced} = 1;

    foreach my $bus ( @{$self->{_BUS_GROUP}} ) {

        # Connections to other buses could have failed or be in progress
        next unless $bus->{is_connected};

        $self->_accept_sync_requests($bus);
    }
}

sub _accept_sync_requests {
    my ($self, $bus) = @_;
    weaken $self;
    weaken $bus;

    my $cache_id  = $self->{id};
    my $uid       = $self->{uid};
    my $bus_id    = $bus->{bus_id};
    my $local_bus = $bus->{bus_role};

    log_debug "Shared cache '$self->{id}': Accepting sync requests from $local_bus";

    my $topic = "\$share/BKPR/req/$local_bus/_sync/$cache_id/dump";

    $bus->subscribe(
        topic      => $topic,
        on_publish => sub {
            my ($payload_ref, $mqtt_properties) = @_;

            my $dump = encode_json( $self->dump );

            $bus->publish(
                topic   => $mqtt_properties->{'response_topic'},
                payload => \$dump,
            );
        },
        on_suback => sub {
            my ($success) = @_;
            log_error "Could not subscribe to topic '$topic'" unless $success;
        }
    );
}

sub _ping_backend_brokers {
    my $self = shift;

    foreach my $bus (@{$self->{_BUS_GROUP}}) {

        next unless $bus->{is_connected};
        $bus->pingreq;
    }
}

my $_now = 0;

sub set {
    my ($self, $key, $value) = @_;
    weaken $self;

    croak "Key value is undefined" unless (defined $key);

    my $old = $self->{data}->{$key};

    $self->{data}->{$key} = $value;
    $self->{vers}->{$key}++;
    $self->{time}->{$key} = Time::HiRes::time();

    my $json = encode_json([
        $key,
        $value,
        $self->{vers}->{$key},
        $self->{time}->{$key},
        $self->{uid},
    ]);

    $self->{on_update}->($key, $value, $old) if $self->{on_update};

    # Notify all workers in every cluster about the change
    my @bus_group = grep { $_->{is_connected} } @{$self->{_BUS_GROUP}};

    unshift @bus_group, $self->{_BUS};

    foreach my $bus (@bus_group) {
        my $local_bus = $bus->{bus_role};
        my $cache_id  = $self->{id};

        $bus->publish(
            topic    => "msg/$local_bus/_sync/$cache_id/set",
            payload  => \$json,
        );
    }

    unless (defined $value) {
        # Postpone delete because it is necessary to keep the versioning 
        # of this modification until it is propagated to all workers

        # Ensure that timer is set properly when the event loop was blocked
        if ($_now != time) { $_now = time; AnyEvent->now_update }

        $self->{_destroy}->{$key} = AnyEvent->timer( after => 60, cb => sub {
            delete $self->{_destroy}->{$key};
            delete $self->{data}->{$key};
            delete $self->{vers}->{$key};
            delete $self->{time}->{$key};
        });
    }

    return 1;
}

sub get {
    my ($self, $key) = @_;

    $self->{data}->{$key};
}

sub delete {
    my ($self, $key) = @_;

    $self->set( $key => undef );
}

sub raw_data {
    my $self = shift;

    $self->{data};
}

sub _merge {
    my ($self, $entry) = @_;

    my ($key, $value, $version, $time, $uid) = @$entry;

    # Discard updates sent by myself
    return if (defined $uid && $uid eq $self->{uid});

    if ($version > ($self->{vers}->{$key} || 0)) {

        # Received a fresher value for the entry
        my $old = $self->{data}->{$key};

        $self->{data}->{$key} = $value;
        $self->{vers}->{$key} = $version;
        $self->{time}->{$key} = $time;

        $self->{on_update}->($key, $value, $old) if $self->{on_update};
    }
    elsif ($version < $self->{vers}->{$key}) {

        # Received a stale value (we have a newest version)
        return;
    }
    else {

        # Version conflict, default resolution is to keep newest value
        my $resolver = $self->{resolver} || sub {
            return $_[0]->{time} > $_[1]->{time} ? $_[0] : $_[1];
        };

        my $keep = $resolver->(
            {   # Mine
                data => $self->{data}->{$key},
                vers => $self->{vers}->{$key},
                time => $self->{time}->{$key},
            },
            {   # Theirs
                data => $value,
                vers => $version,
                time => $time,
            },
        );

        my $old = $self->{data}->{$key};

        $self->{data}->{$key} = $keep->{data};
        $self->{vers}->{$key} = $keep->{vers};
        $self->{time}->{$key} = $keep->{time};

        $self->{on_update}->($key, $keep->{data}, $old) if $self->{on_update};
    }

    unless (defined $self->{data}->{$key}) {
        # Postpone delete because it is necessary to keep the versioning 
        # of this modification until it is propagated to all workers

        # Ensure that timer is set properly when the event loop was blocked
        if ($_now != time) { $_now = time; AnyEvent->now_update }

        $self->{_destroy}->{$key} = AnyEvent->timer( after => 60, cb => sub {
            delete $self->{_destroy}->{$key};
            delete $self->{data}->{$key};
            delete $self->{vers}->{$key};
            delete $self->{time}->{$key};
        });
    }
}

sub dump {
    my $self = shift;

    my @dump;

    foreach my $key (keys %{$self->{data}}) {
        push @dump, [
            $key,
            $self->{data}->{$key},
            $self->{vers}->{$key},
            $self->{time}->{$key},
        ];
    }

    return {
        uid   => $self->{uid},
        time  => Time::HiRes::time(),
        dump  => \@dump,
    };
}

sub _merge_dump {
    my ($self, $dump) = @_;

    # Discard dumps sent by myself
    return if ($dump->{uid} eq $self->{uid});

    foreach my $entry (@{$dump->{dump}}) {
        $self->_merge($entry);
    }
}

sub touch {
    my ($self, $key) = @_;

    return unless defined $self->{data}->{$key};

    croak "No max_age specified (gc is disabled)" unless $self->{max_age};

    my $age = time() - $self->{time}->{$key};

    return unless ( $age > $self->{max_age} * 0.3);
    return unless ( $age < $self->{max_age} * 1.3);

    # Set to current value but without increasing version
    $self->{vers}->{$key}--;

    $self->set( $key => $self->{data}->{$key} );
}



( run in 1.223 second using v1.01-cache-2.11-cpan-ceb78f64989 )