BoardStreams

 view release on metacpan or  search on metacpan

lib/Mojolicious/Plugin/BoardStreams.pm  view on Meta::CPAN

            $registry->is_member_of($c, $stream_name) or next PAIR;

            my $tx = $db->begin;

            await exists_p(
                $db,
                'bs_streams',
                { name => $stream_name },
                { for => 'update' },
            ) or next PAIR;

            $registry->is_member_of($c, $stream_name) or next PAIR;

            if (my $leave_sub = $registry->get_leave($stream_name)) {
                my $left_completely;
                while (! $left_completely) {
                    $left_completely = $registry->remove_membership($c, $stream_name)
                        and (delete $registry->conn_subscriptions->{$c}{$stream_name})->unsubscribe;

                    try {
                        my $result = $leave_sub->($c, $stream_name);
                        await $result if $result->$_can('then');
                    } catch ($e) {
                        $c->log->fatal("Couldn't execute leave sub, stopping worker: $e");
                        # so as to not leave the streams in an inconsistent state
                        stop_gracefully();
                    };
                }
            } else {
                1 until $registry->remove_membership($c, $stream_name);
                (delete $registry->conn_subscriptions->{$c}{$stream_name})->unsubscribe;
            }

            $tx->commit;
        }
    };

    # all workers execute this at the beginning
    Mojo::IOLoop->next_tick(sub {
        $worker_id = unique_id;

        # set $can_accept_clients
        {
            my $pubsub_connected_o = rx_merge(
                rx_from_event($pg->pubsub, 'reconnect')->pipe(op_map_to(1)),
                rx_from_event($pg->pubsub, 'disconnect')->pipe(op_map_to(0)),
            )->pipe(op_start_with(0));

            # stop & kick everyone on these events
            rx_merge(
                # if disconnect from pg
                $pubsub_connected_o->pipe(
                    op_pairwise(),
                    op_filter(sub {
                        my ($prev, $curr) = @$_;
                        return $prev && ! $curr;
                    })
                ),

                # if not connected to pg the first three seconds
                rx_timer(3)->pipe(
                    op_take_until(
                        $pubsub_connected_o->pipe(op_filter(sub { $_ })),
                    ),
                ),
            )->subscribe(sub {
                $can_accept_clients = 0;
                stop_gracefully();
            });

            # allow connections on...
            $pubsub_connected_o->pipe(
                op_filter(sub { $_ }),
            )->subscribe(sub { $can_accept_clients = 1 });
        }

        # this is to... (?)
        $pg->pubsub->listen('boardstreams.dummy' => sub {});

        # create worker row
        $pg->db->insert(
            'bs_workers',
            {
                id           => $worker_id,
                dt_heartbeat => \'CURRENT_TIMESTAMP',
            },
            { on_conflict => undef },
        )->rows or stop_gracefully();

        # store heartbeat, shutdown if worker row is missing
        rx_timer(rand($HEARTBEAT_INTERVAL), $HEARTBEAT_INTERVAL)->subscribe(async sub {
            # update heartbeat or stop and finish
            (await $pg->db->update_p(
                'bs_workers',
                { dt_heartbeat => \'CURRENT_TIMESTAMP' },
                { id => $worker_id },
            ))->rows or stop_gracefully();
        });

        # elect repairer if needed, and repair
        rx_timer(rand($REPAIR_INTERVAL), $REPAIR_INTERVAL)->subscribe(async sub {
            my $db = $pg->db;

            # revolt against absent ruler
            await $db->delete_p(
                'bs_workers',
                {
                    is_repairer  => 1,
                    dt_heartbeat => { '<', \["NOW() - INTERVAL '1 SECOND' * ?", 2 * $HEARTBEAT_INTERVAL] }
                }
            );

            my $repairer_row = (await $db->select_p(
                'bs_workers',
                'id',
                { is_repairer => 1 },
            ))->hashes->[0];

            if ($repairer_row) {
                $am_repairer = $repairer_row->{id} eq $worker_id;
            } else {
                $am_repairer = 1 if (await $db->update_p(
                    'bs_workers',
                    { is_repairer => 1 },
                    { id => $worker_id },
                    { on_conflict => undef },
                ))->rows;
            }

            if ($am_repairer) {
                await $app->bs->repair_p;
            }
        });

        # on ioloop finish, make clients leave and remove worker row
        rx_from_event(Mojo::IOLoop->singleton, 'finish')->pipe(
            op_take(1),
        )->subscribe(async sub {
            $is_finished = 1;
            # this is to make clients leave their streams before before deleting worker row, to avoid
            # having the repairer repairing w/o reason
            $_->finish foreach $registry->get_all_conns->@*;
            foreach my $conn ($registry->get_all_conns->@*) {
                await $on_client_finish_sub->($conn);
            };

            await $pg->db->delete_p(
                'bs_workers',
                { id => $worker_id },
            );
        });
    });

    # repair all streams that need repairing
    $app->helper('bs.repair_p' => async sub ($c) {
        my $db = dynamically $BoardStreams::db = $pg->db;

        await $db->delete_p(
            'bs_workers',
            { dt_heartbeat => { '<', \["NOW() - INTERVAL '1 SECOND' * ?", 2 * $HEARTBEAT_INTERVAL] } },
        );



( run in 0.638 second using v1.01-cache-2.11-cpan-39bf76dae61 )