BoardStreams
view release on metacpan or search on metacpan
lib/BoardStreams/Client/Manager.pm view on Meta::CPAN
),
)->pipe(
op_distinct_until_changed(),
)->subscribe($self->connected_o);
$self->connected_o->subscribe(sub ($status) { $self->emit('connected', $status) });
# ping
$config_o->pipe(
op_map(sub ($config, @) { $config->{pingInterval} - rand() }),
op_switch_map(sub ($iv, @) {
rx_timer($iv * rand(), $iv)->pipe(
op_take_until($self->connected_o->pipe(op_filter(sub { ! $_ }))),
),
}),
)->subscribe(sub { $self->send({ type => 'ping' }) });
# timeout
$config_o->pipe(
op_switch_map(sub ($config, @) {
$self->{incoming_o}->pipe(
op_start_with(undef),
lib/BoardStreams/Client/WebSockets.pm view on Meta::CPAN
return rx_observable->new(sub ($subscriber) {
my @delays = (0, 1, 2, 3, 4, 5);
my $num_failures = 0;
my $s = rx_defer(sub {
rx_concat(
make_websocket_observable($url, $manager->ua),
rx_of(undef),
rx_defer(sub {
my $delay = $delays[$num_failures++] // $delays[-1];
return rx_timer($delay)->pipe(op_ignore_elements());
}),
);
})->pipe(
op_tap(sub ($x) { $num_failures = 0 if $x }),
op_repeat(),
op_start_with(undef),
op_distinct_until_changed(),
)->subscribe($subscriber);
return $s;
lib/BoardStreams/Util.pm view on Meta::CPAN
Mojo::IOLoop->next_tick(sub {
$p->resolve();
});
return $p;
}
sub sleep_p :prototype(_) ($duration) {
my $p = Mojo::Promise->new;
Mojo::IOLoop->timer($duration, sub {
$p->resolve();
});
return $p;
}
sub encode_json :prototype(_) ($data) { encode_utf8 to_json $data }
sub decode_json :prototype(_) ($bytes) { from_json decode_utf8 $bytes }
lib/Mojolicious/Plugin/BoardStreams.pm view on Meta::CPAN
# if disconnect from pg
$pubsub_connected_o->pipe(
op_pairwise(),
op_filter(sub {
my ($prev, $curr) = @$_;
return $prev && ! $curr;
})
),
# if not connected to pg the first three seconds
rx_timer(3)->pipe(
op_take_until(
$pubsub_connected_o->pipe(op_filter(sub { $_ })),
),
),
)->subscribe(sub {
$can_accept_clients = 0;
stop_gracefully();
});
# allow connections on...
lib/Mojolicious/Plugin/BoardStreams.pm view on Meta::CPAN
$pg->db->insert(
'bs_workers',
{
id => $worker_id,
dt_heartbeat => \'CURRENT_TIMESTAMP',
},
{ on_conflict => undef },
)->rows or stop_gracefully();
# store heartbeat, shutdown if worker row is missing
rx_timer(rand($HEARTBEAT_INTERVAL), $HEARTBEAT_INTERVAL)->subscribe(async sub {
# update heartbeat or stop and finish
(await $pg->db->update_p(
'bs_workers',
{ dt_heartbeat => \'CURRENT_TIMESTAMP' },
{ id => $worker_id },
))->rows or stop_gracefully();
});
# elect repairer if needed, and repair
rx_timer(rand($REPAIR_INTERVAL), $REPAIR_INTERVAL)->subscribe(async sub {
my $db = $pg->db;
# revolt against absent ruler
await $db->delete_p(
'bs_workers',
{
is_repairer => 1,
dt_heartbeat => { '<', \["NOW() - INTERVAL '1 SECOND' * ?", 2 * $HEARTBEAT_INTERVAL] }
}
);
( run in 0.624 second using v1.01-cache-2.11-cpan-49f99fa48dc )