BoardStreams
view release on metacpan or search on metacpan
lib/BoardStreams/Client/WebSockets.pm view on Meta::CPAN
package BoardStreams::Client::WebSockets;
use Mojo::Base -strict, -signatures;
use BoardStreams::Client::Util 'debug';
use Mojo::UserAgent;
use RxPerl::Mojo ':all';
use Exporter 'import';
our @EXPORT_OK = qw/ make_websockets_observable /;
our $VERSION = "v0.0.36";
sub make_websocket_observable ($url, $ua) {
return rx_observable->new(sub ($subscriber) {
my $_tx;
my $tx_has_finished;
$ua->websocket($url, sub ($, $tx) {
$_tx = $tx;
if (! $tx->is_websocket) {
debug("WebSocket connection failed, or handshake failed!");
$tx->res->error({message => "Didn't upgrade connection, cancelling it"}) if $tx->res;
$tx_has_finished = 1;
$subscriber->complete();
return;
}
# on close
$tx->on(finish => sub {
$tx_has_finished = 1;
debug("WebSocket connection closed");
$subscriber->complete();
});
# on open
debug("WebSocket connection opened");
$subscriber->next($tx);
});
return sub { $_tx->finish if $_tx->is_websocket and ! $tx_has_finished };
});
}
sub make_websockets_observable ($url, $manager) {
return rx_observable->new(sub ($subscriber) {
my @delays = (0, 1, 2, 3, 4, 5);
my $num_failures = 0;
my $s = rx_defer(sub {
rx_concat(
make_websocket_observable($url, $manager->ua),
rx_of(undef),
rx_defer(sub {
my $delay = $delays[$num_failures++] // $delays[-1];
return rx_timer($delay)->pipe(op_ignore_elements());
}),
);
})->pipe(
op_tap(sub ($x) { $num_failures = 0 if $x }),
op_repeat(),
op_start_with(undef),
op_distinct_until_changed(),
)->subscribe($subscriber);
return $s;
});
}
1;
( run in 1.424 second using v1.01-cache-2.11-cpan-d8267643d1d )