Beekeeper

 view release on metacpan or  search on metacpan

lib/Beekeeper/Worker/Extension/SharedCache.pm  view on Meta::CPAN

package Beekeeper::Worker::Extension::SharedCache;

use strict;
use warnings;

our $VERSION = '0.10';

use Exporter 'import';

our @EXPORT = qw( shared_cache );


sub shared_cache {
    my $self = shift;

    Beekeeper::Worker::Extension::SharedCache::Cache->new(
        worker => $self,
        @_
    );
}

package
    Beekeeper::Worker::Extension::SharedCache::Cache;   # hide from PAUSE

use Beekeeper::Worker ':log';
use AnyEvent;
use JSON::XS;
use Fcntl qw(:DEFAULT :flock);
use Scalar::Util 'weaken';
use Carp;

use constant SYNC_REQUEST_TIMEOUT => 30;

# Show errors from perspective of caller
$Carp::Internal{(__PACKAGE__)}++;


sub new {
    my ($class, %args) = @_;

    my $worker  = $args{'worker'};
    my $id      = $args{'id'};
    my $uid     = "$$-" . int(rand(90000000)+10000000);
    my $pool_id = $worker->{_WORKER}->{pool_id};

    my $self = {
        id        => $id,
        uid       => $uid,
        pool_id   => $pool_id,
        resolver  => $args{'resolver'},
        on_update => $args{'on_update'},
        persist   => $args{'persist'},
        max_age   => $args{'max_age'},
        refresh   => $args{'refresh'},
        synced    => 0,
        data      => {},
        vers      => {},
        time      => {},
        _BUS      => undef,
        _BUS_GROUP=> undef,
    };

    bless $self, $class;

    $self->_load_state if $self->{persist};

    $self->_connect_to_all_brokers($worker);

    my $Self = $self;
    weaken $Self;

    AnyEvent->now_update;

    if ($self->{max_age}) {

        $self->{gc_timer} = AnyEvent->timer(
            after    => $self->{max_age} * rand() + 60,
            interval => $self->{max_age},
            cb       => sub { $Self->_gc },
        );
    }

    if ($self->{refresh}) {

        $self->{refresh_timer} = AnyEvent->timer(
            after    => $self->{refresh} * rand() + 60,
            interval => $self->{refresh},
            cb       => sub { $Self->_send_sync_request },
        );
    }

    # Ping backend brokers to avoid disconnections due to inactivity
    $self->{ping_timer} = AnyEvent->timer(
        after    => 60 * rand(),
        interval => 60,
        cb       => sub { $Self->_ping_backend_brokers },
    );

    return $self;
}

sub _connect_to_all_brokers {
    my ($self, $worker) = @_;
    weaken $self;

    #TODO: using multiple shared_cache from the same worker will cause multiple bus connections

    my $worker_bus = $worker->{_BUS};
    my $group_config = Beekeeper::Config->get_bus_group_config( bus_id => $worker_bus->bus_id );

    my $bus_group = $self->{_BUS_GROUP} = [];

    foreach my $config (@$group_config) {

        my $bus_id = $config->{'bus_id'};

        if ($bus_id eq $worker_bus->bus_id) {
            # Already connected to our own bus
            $self->_setup_sync_listeners($worker_bus);
            $self->_send_sync_request($worker_bus);
            $self->{_BUS} = $worker_bus;
            weaken $self->{_BUS};
            next;
        }

        my $bus; $bus = Beekeeper::MQTT->new( 
            %$config,
            bus_id   => $bus_id,
            timeout  => 300,
            on_error => sub {
                # Reconnect
                my $errmsg = $_[0] || ""; $errmsg =~ s/\s+/ /sg;
                log_error "Connection to $bus_id failed: $errmsg";
                my $delay = $self->{connect_err}->{$bus_id}++;
                $self->{reconnect_tmr}->{$bus_id} = AnyEvent->timer(
                    after => ($delay < 10 ? $delay * 3 : 30),
                    cb => sub { 
                        $bus->connect(
                            on_connack => sub {
                                log_warn "Reconnected to $bus_id";
                                $self->_setup_sync_listeners($bus);
                                $self->_accept_sync_requests($bus) if $self->{synced};
                            }
                        );
                    },
                );
            },
        );

        push @$bus_group, $bus;

        $bus->connect(
            on_connack => sub {
                # Setup
                log_debug "Connected to $bus_id";
                $self->_setup_sync_listeners($bus);
                $self->_accept_sync_requests($bus) if $self->{synced};
            },
        );
    }
}

sub _setup_sync_listeners {
    my ($self, $bus) = @_;
    weaken $self;

    my $cache_id  = $self->{id};
    my $uid       = $self->{uid};
    my $local_bus = $bus->{bus_role};
    my $client_id = $bus->{client_id};

    my $topic = "msg/$local_bus/_sync/$cache_id/set";

    $bus->subscribe(
        topic      => $topic,
        on_publish => sub {
          # my ($payload_ref, $mqtt_properties) = @_;

            my $entry = decode_json( ${$_[0]} );

            $self->_merge($entry);
        },
        on_suback => sub {
            my ($success) = @_;
            log_error "Could not subscribe to topic '$topic'" unless $success;
        }
    );

    my $reply_topic = "priv/$client_id/sync-$cache_id";

    $bus->subscribe(
        topic      => $reply_topic,
        on_publish => sub {
            my ($payload_ref, $mqtt_properties) = @_;

            my $dump = decode_json($$payload_ref);

            $self->_merge_dump($dump);

            $self->_sync_completed(1);
        },
        on_suback => sub {
            my ($success) = @_;
            log_error "Could not subscribe to reply topic '$reply_topic'" unless $success;
        }
    );
}

sub _send_sync_request {
    my ($self, $bus) = @_;
    weaken $self;

    # Do not send more than one sync request at the time
    return if $self->{_sync_timeout};

    my $cache_id  = $self->{id};
    my $uid       = $self->{uid};
    my $local_bus = $bus->{bus_role};
    my $client_id = $bus->{client_id};

    $bus->publish(
        topic          => "req/$local_bus/_sync/$cache_id/dump",
        response_topic => "priv/$client_id/sync-$cache_id",
    );

    # Ensure that timeout is set properly when the event loop was blocked
    AnyEvent->now_update;

    # When a fresh pool is started there is no master to reply sync requests
    $self->{_sync_timeout} = AnyEvent->timer(
        after => SYNC_REQUEST_TIMEOUT,
        cb    => sub { $self->_sync_completed(0) },
    );
}

sub _sync_completed {
    my ($self, $success) = @_;

    delete $self->{_sync_timeout};

    return if $self->{synced};

    # BUG: When a fresh pool is started there is no master to reply sync requests.
    # When two fresh pools are started at t0 and t1 time, and (t1 - t0) < SYNC_REQUEST_TIMEOUT,
    # cache updates in the t0-t1 range are not properly synced in the pool wich was started later
    log_debug( "Shared cache '$self->{id}': " . ($success ? "Sync completed" : "Acting as master"));

    $self->{synced} = 1;

    foreach my $bus ( @{$self->{_BUS_GROUP}} ) {

        # Connections to other buses could have failed or be in progress
        next unless $bus->{is_connected};

        $self->_accept_sync_requests($bus);
    }
}

sub _accept_sync_requests {
    my ($self, $bus) = @_;
    weaken $self;
    weaken $bus;

    my $cache_id  = $self->{id};
    my $uid       = $self->{uid};
    my $bus_id    = $bus->{bus_id};
    my $local_bus = $bus->{bus_role};

    log_debug "Shared cache '$self->{id}': Accepting sync requests from $local_bus";

    my $topic = "\$share/BKPR/req/$local_bus/_sync/$cache_id/dump";

    $bus->subscribe(
        topic      => $topic,
        on_publish => sub {
            my ($payload_ref, $mqtt_properties) = @_;

            my $dump = encode_json( $self->dump );

            $bus->publish(
                topic   => $mqtt_properties->{'response_topic'},
                payload => \$dump,
            );
        },
        on_suback => sub {
            my ($success) = @_;
            log_error "Could not subscribe to topic '$topic'" unless $success;
        }
    );
}

sub _ping_backend_brokers {
    my $self = shift;

    foreach my $bus (@{$self->{_BUS_GROUP}}) {

        next unless $bus->{is_connected};
        $bus->pingreq;
    }
}

my $_now = 0;

sub set {
    my ($self, $key, $value) = @_;
    weaken $self;

    croak "Key value is undefined" unless (defined $key);

    my $old = $self->{data}->{$key};

    $self->{data}->{$key} = $value;
    $self->{vers}->{$key}++;
    $self->{time}->{$key} = Time::HiRes::time();

    my $json = encode_json([
        $key,
        $value,
        $self->{vers}->{$key},
        $self->{time}->{$key},
        $self->{uid},
    ]);

    $self->{on_update}->($key, $value, $old) if $self->{on_update};

    # Notify all workers in every cluster about the change
    my @bus_group = grep { $_->{is_connected} } @{$self->{_BUS_GROUP}};

    unshift @bus_group, $self->{_BUS};

    foreach my $bus (@bus_group) {
        my $local_bus = $bus->{bus_role};
        my $cache_id  = $self->{id};

        $bus->publish(
            topic    => "msg/$local_bus/_sync/$cache_id/set",
            payload  => \$json,
        );
    }

    unless (defined $value) {
        # Postpone delete because it is necessary to keep the versioning 
        # of this modification until it is propagated to all workers

        # Ensure that timer is set properly when the event loop was blocked
        if ($_now != time) { $_now = time; AnyEvent->now_update }

        $self->{_destroy}->{$key} = AnyEvent->timer( after => 60, cb => sub {
            delete $self->{_destroy}->{$key};
            delete $self->{data}->{$key};
            delete $self->{vers}->{$key};
            delete $self->{time}->{$key};
        });
    }

    return 1;
}

sub get {
    my ($self, $key) = @_;

    $self->{data}->{$key};
}

sub delete {
    my ($self, $key) = @_;



( run in 1.736 second using v1.01-cache-2.11-cpan-39bf76dae61 )