BoardStreams
view release on metacpan or search on metacpan
lib/BoardStreams/Client/Manager.pm view on Meta::CPAN
};
} else {
if ($i == ($ongoing_messages->{$identifier}{i} // -2) + 1) {
delete $ongoing_messages->{$identifier};
}
if (my $ong = $ongoing_messages->{$identifier}) {
$ong->{i} = $i;
substr($ong->{bytes}, length($ong->{bytes}), 0) = $remaining;
}
}
if ($is_final and $i == ($ongoing_messages->{$identifier}{i} // -1)) {
my $data = decode_json $ongoing_messages->{$identifier}{bytes};
delete $ongoing_messages->{$identifier};
return rx_of($data);
}
return rx_EMPTY;
}
# if message is not a part
return rx_of(decode_json $binary_part);
}),
)->subscribe($self->{incoming_o});
# incoming JSON-RPC responses
$self->{responses_o} = $self->{incoming_o}->pipe(
op_filter(sub {
eqq($_->{jsonrpc}, '2.0')
and (exists $_->{result} or exists $_->{error})
}),
op_share(),
);
my $config_o = $self->{incoming_o}->pipe(
op_filter(sub { eqq $_->{type}, 'config' }),
op_map(sub { $_->{data} }),
op_share(),
);
# create connected_o to show websocket connection status (after config is received)
$self->connected_o( rx_behavior_subject->new(0) );
rx_merge(
$config_o->pipe(
op_map_to(1),
),
$websocket_o->pipe(
op_filter(sub { ! $_ }),
op_map_to(0),
),
)->pipe(
op_distinct_until_changed(),
)->subscribe($self->connected_o);
$self->connected_o->subscribe(sub ($status) { $self->emit('connected', $status) });
# ping
$config_o->pipe(
op_map(sub ($config, @) { $config->{pingInterval} - rand() }),
op_switch_map(sub ($iv, @) {
rx_timer($iv * rand(), $iv)->pipe(
op_take_until($self->connected_o->pipe(op_filter(sub { ! $_ }))),
),
}),
)->subscribe(sub { $self->send({ type => 'ping' }) });
# timeout
$config_o->pipe(
op_switch_map(sub ($config, @) {
$self->{incoming_o}->pipe(
op_start_with(undef),
op_switch_map(sub {
rx_of(1)->pipe(
op_delay($config->{pingInterval} + 10),
op_take_until(
$self->connected_o->pipe(op_filter(sub { !$_ }))
),
);
}),
);
}),
op_with_latest_from($websocket_o),
)->subscribe(sub ($conf_ws) {
my (undef, $ws) = @$conf_ws;
$ws && $ws->finish; # TODO: or maybe some other way to close (?)
});
return $self;
}
sub connect ($self) {
$self->{please_be_connected}->next(1);
}
sub disconnect ($self) {
$self->{please_be_connected}->next(0);
}
sub send ($self, $data) {
$self->connected_o->get_value or eqq($data->{type}, 'ping')
or die "Can't send data because websocket not available";
$self->{send_o}->next($data);
}
sub do_action ($self, $stream_name, $action_name, $payload = undef) {
$self->send({
jsonrpc => '2.0',
method => 'doAction',
params => [$stream_name, $action_name, $payload],
});
}
sub do_request ($self, $stream_name, $request_name, $payload = undef) {
my $id = unique_id;
$self->send({
jsonrpc => '2.0',
method => 'doRequest',
params => [ $stream_name, $request_name, $payload ],
id => $id,
( run in 1.091 second using v1.01-cache-2.11-cpan-d8267643d1d )