BoardStreams

 view release on metacpan or  search on metacpan

lib/BoardStreams/Client/Manager.pm  view on Meta::CPAN

        ),
    )->pipe(
        op_distinct_until_changed(),
    )->subscribe($self->connected_o);
    $self->connected_o->subscribe(sub ($status) { $self->emit('connected', $status) });

    # ping
    $config_o->pipe(
        op_map(sub ($config, @) { $config->{pingInterval} - rand() }),
        op_switch_map(sub ($iv, @) {
            rx_timer($iv * rand(), $iv)->pipe(
                op_take_until($self->connected_o->pipe(op_filter(sub { ! $_ }))),
            ),
        }),
    )->subscribe(sub { $self->send({ type => 'ping' }) });

    # timeout
    $config_o->pipe(
        op_switch_map(sub ($config, @) {
            $self->{incoming_o}->pipe(
                op_start_with(undef),

lib/BoardStreams/Client/WebSockets.pm  view on Meta::CPAN

    return rx_observable->new(sub ($subscriber) {
        my @delays = (0, 1, 2, 3, 4, 5);
        my $num_failures = 0;

        my $s = rx_defer(sub {
            rx_concat(
                make_websocket_observable($url, $manager->ua),
                rx_of(undef),
                rx_defer(sub {
                    my $delay = $delays[$num_failures++] // $delays[-1];
                    return rx_timer($delay)->pipe(op_ignore_elements());
                }),
            );
        })->pipe(
            op_tap(sub ($x) { $num_failures = 0 if $x }),
            op_repeat(),
            op_start_with(undef),
            op_distinct_until_changed(),
        )->subscribe($subscriber);

        return $s;

lib/BoardStreams/Util.pm  view on Meta::CPAN

    Mojo::IOLoop->next_tick(sub {
        $p->resolve();
    });

    return $p;
}

sub sleep_p :prototype(_) ($duration) {
    my $p = Mojo::Promise->new;

    Mojo::IOLoop->timer($duration, sub {
        $p->resolve();
    });

    return $p;
}

sub encode_json :prototype(_) ($data) { encode_utf8 to_json $data }

sub decode_json :prototype(_) ($bytes) { from_json decode_utf8 $bytes }

lib/Mojolicious/Plugin/BoardStreams.pm  view on Meta::CPAN

                # if disconnect from pg
                $pubsub_connected_o->pipe(
                    op_pairwise(),
                    op_filter(sub {
                        my ($prev, $curr) = @$_;
                        return $prev && ! $curr;
                    })
                ),

                # if not connected to pg the first three seconds
                rx_timer(3)->pipe(
                    op_take_until(
                        $pubsub_connected_o->pipe(op_filter(sub { $_ })),
                    ),
                ),
            )->subscribe(sub {
                $can_accept_clients = 0;
                stop_gracefully();
            });

            # allow connections on...

lib/Mojolicious/Plugin/BoardStreams.pm  view on Meta::CPAN

        $pg->db->insert(
            'bs_workers',
            {
                id           => $worker_id,
                dt_heartbeat => \'CURRENT_TIMESTAMP',
            },
            { on_conflict => undef },
        )->rows or stop_gracefully();

        # store heartbeat, shutdown if worker row is missing
        rx_timer(rand($HEARTBEAT_INTERVAL), $HEARTBEAT_INTERVAL)->subscribe(async sub {
            # update heartbeat or stop and finish
            (await $pg->db->update_p(
                'bs_workers',
                { dt_heartbeat => \'CURRENT_TIMESTAMP' },
                { id => $worker_id },
            ))->rows or stop_gracefully();
        });

        # elect repairer if needed, and repair
        rx_timer(rand($REPAIR_INTERVAL), $REPAIR_INTERVAL)->subscribe(async sub {
            my $db = $pg->db;

            # revolt against absent ruler
            await $db->delete_p(
                'bs_workers',
                {
                    is_repairer  => 1,
                    dt_heartbeat => { '<', \["NOW() - INTERVAL '1 SECOND' * ?", 2 * $HEARTBEAT_INTERVAL] }
                }
            );



( run in 0.624 second using v1.01-cache-2.11-cpan-49f99fa48dc )