BoardStreams
view release on metacpan or search on metacpan
lib/Mojolicious/Plugin/BoardStreams.pm view on Meta::CPAN
$registry->is_member_of($c, $stream_name) or next PAIR;
my $tx = $db->begin;
await exists_p(
$db,
'bs_streams',
{ name => $stream_name },
{ for => 'update' },
) or next PAIR;
$registry->is_member_of($c, $stream_name) or next PAIR;
if (my $leave_sub = $registry->get_leave($stream_name)) {
my $left_completely;
while (! $left_completely) {
$left_completely = $registry->remove_membership($c, $stream_name)
and (delete $registry->conn_subscriptions->{$c}{$stream_name})->unsubscribe;
try {
my $result = $leave_sub->($c, $stream_name);
await $result if $result->$_can('then');
} catch ($e) {
$c->log->fatal("Couldn't execute leave sub, stopping worker: $e");
# so as to not leave the streams in an inconsistent state
stop_gracefully();
};
}
} else {
1 until $registry->remove_membership($c, $stream_name);
(delete $registry->conn_subscriptions->{$c}{$stream_name})->unsubscribe;
}
$tx->commit;
}
};
# all workers execute this at the beginning
Mojo::IOLoop->next_tick(sub {
$worker_id = unique_id;
# set $can_accept_clients
{
my $pubsub_connected_o = rx_merge(
rx_from_event($pg->pubsub, 'reconnect')->pipe(op_map_to(1)),
rx_from_event($pg->pubsub, 'disconnect')->pipe(op_map_to(0)),
)->pipe(op_start_with(0));
# stop & kick everyone on these events
rx_merge(
# if disconnect from pg
$pubsub_connected_o->pipe(
op_pairwise(),
op_filter(sub {
my ($prev, $curr) = @$_;
return $prev && ! $curr;
})
),
# if not connected to pg the first three seconds
rx_timer(3)->pipe(
op_take_until(
$pubsub_connected_o->pipe(op_filter(sub { $_ })),
),
),
)->subscribe(sub {
$can_accept_clients = 0;
stop_gracefully();
});
# allow connections on...
$pubsub_connected_o->pipe(
op_filter(sub { $_ }),
)->subscribe(sub { $can_accept_clients = 1 });
}
# this is to... (?)
$pg->pubsub->listen('boardstreams.dummy' => sub {});
# create worker row
$pg->db->insert(
'bs_workers',
{
id => $worker_id,
dt_heartbeat => \'CURRENT_TIMESTAMP',
},
{ on_conflict => undef },
)->rows or stop_gracefully();
# store heartbeat, shutdown if worker row is missing
rx_timer(rand($HEARTBEAT_INTERVAL), $HEARTBEAT_INTERVAL)->subscribe(async sub {
# update heartbeat or stop and finish
(await $pg->db->update_p(
'bs_workers',
{ dt_heartbeat => \'CURRENT_TIMESTAMP' },
{ id => $worker_id },
))->rows or stop_gracefully();
});
# elect repairer if needed, and repair
rx_timer(rand($REPAIR_INTERVAL), $REPAIR_INTERVAL)->subscribe(async sub {
my $db = $pg->db;
# revolt against absent ruler
await $db->delete_p(
'bs_workers',
{
is_repairer => 1,
dt_heartbeat => { '<', \["NOW() - INTERVAL '1 SECOND' * ?", 2 * $HEARTBEAT_INTERVAL] }
}
);
my $repairer_row = (await $db->select_p(
'bs_workers',
'id',
{ is_repairer => 1 },
))->hashes->[0];
if ($repairer_row) {
$am_repairer = $repairer_row->{id} eq $worker_id;
} else {
$am_repairer = 1 if (await $db->update_p(
'bs_workers',
{ is_repairer => 1 },
{ id => $worker_id },
{ on_conflict => undef },
))->rows;
}
if ($am_repairer) {
await $app->bs->repair_p;
}
});
# on ioloop finish, make clients leave and remove worker row
rx_from_event(Mojo::IOLoop->singleton, 'finish')->pipe(
op_take(1),
)->subscribe(async sub {
$is_finished = 1;
# this is to make clients leave their streams before before deleting worker row, to avoid
# having the repairer repairing w/o reason
$_->finish foreach $registry->get_all_conns->@*;
foreach my $conn ($registry->get_all_conns->@*) {
await $on_client_finish_sub->($conn);
};
await $pg->db->delete_p(
'bs_workers',
{ id => $worker_id },
);
});
});
# repair all streams that need repairing
$app->helper('bs.repair_p' => async sub ($c) {
my $db = dynamically $BoardStreams::db = $pg->db;
await $db->delete_p(
'bs_workers',
{ dt_heartbeat => { '<', \["NOW() - INTERVAL '1 SECOND' * ?", 2 * $HEARTBEAT_INTERVAL] } },
);
( run in 0.638 second using v1.01-cache-2.11-cpan-39bf76dae61 )