BoardStreams

 view release on metacpan or  search on metacpan

lib/BoardStreams/Client/Manager.pm  view on Meta::CPAN

                    };
                } else {
                    if ($i == ($ongoing_messages->{$identifier}{i} // -2) + 1) {
                        delete $ongoing_messages->{$identifier};
                    }

                    if (my $ong = $ongoing_messages->{$identifier}) {
                        $ong->{i} = $i;
                        substr($ong->{bytes}, length($ong->{bytes}), 0) = $remaining;
                    }
                }

                if ($is_final and $i == ($ongoing_messages->{$identifier}{i} // -1)) {
                    my $data = decode_json $ongoing_messages->{$identifier}{bytes};
                    delete $ongoing_messages->{$identifier};
                    return rx_of($data);
                }

                return rx_EMPTY;
            }

            # if message is not a part
            return rx_of(decode_json $binary_part);
        }),
    )->subscribe($self->{incoming_o});

    # incoming JSON-RPC responses
    $self->{responses_o} = $self->{incoming_o}->pipe(
        op_filter(sub {
            eqq($_->{jsonrpc}, '2.0')
                and (exists $_->{result} or exists $_->{error})
        }),
        op_share(),
    );

    my $config_o = $self->{incoming_o}->pipe(
        op_filter(sub { eqq $_->{type}, 'config' }),
        op_map(sub { $_->{data} }),
        op_share(),
    );

    # create connected_o to show websocket connection status (after config is received)
    $self->connected_o( rx_behavior_subject->new(0) );
    rx_merge(
        $config_o->pipe(
            op_map_to(1),
        ),
        $websocket_o->pipe(
            op_filter(sub { ! $_ }),
            op_map_to(0),
        ),
    )->pipe(
        op_distinct_until_changed(),
    )->subscribe($self->connected_o);
    $self->connected_o->subscribe(sub ($status) { $self->emit('connected', $status) });

    # ping
    $config_o->pipe(
        op_map(sub ($config, @) { $config->{pingInterval} - rand() }),
        op_switch_map(sub ($iv, @) {
            rx_timer($iv * rand(), $iv)->pipe(
                op_take_until($self->connected_o->pipe(op_filter(sub { ! $_ }))),
            ),
        }),
    )->subscribe(sub { $self->send({ type => 'ping' }) });

    # timeout
    $config_o->pipe(
        op_switch_map(sub ($config, @) {
            $self->{incoming_o}->pipe(
                op_start_with(undef),
                op_switch_map(sub {
                    rx_of(1)->pipe(
                        op_delay($config->{pingInterval} + 10),
                        op_take_until(
                            $self->connected_o->pipe(op_filter(sub { !$_ }))
                        ),
                    );
                }),
            );
        }),
        op_with_latest_from($websocket_o),
    )->subscribe(sub ($conf_ws) {
        my (undef, $ws) = @$conf_ws;
        $ws && $ws->finish; # TODO: or maybe some other way to close (?)
    });

    return $self;
}

sub connect ($self) {
    $self->{please_be_connected}->next(1);
}

sub disconnect ($self) {
    $self->{please_be_connected}->next(0);
}

sub send ($self, $data) {
    $self->connected_o->get_value or eqq($data->{type}, 'ping')
        or die "Can't send data because websocket not available";

    $self->{send_o}->next($data);
}

sub do_action ($self, $stream_name, $action_name, $payload = undef) {
    $self->send({
        jsonrpc => '2.0',
        method  => 'doAction',
        params  => [$stream_name, $action_name, $payload],
    });
}

sub do_request ($self, $stream_name, $request_name, $payload = undef) {
    my $id = unique_id;

    $self->send({
        jsonrpc => '2.0',
        method  => 'doRequest',
        params  => [ $stream_name, $request_name, $payload ],
        id      => $id,



( run in 1.091 second using v1.01-cache-2.11-cpan-d8267643d1d )