BoardStreams
view release on metacpan or search on metacpan
lib/Mojolicious/Plugin/BoardStreams.pm view on Meta::CPAN
or die "invalid action '$action_name' on stream '$stream_name'\n";
my $ret = $action_sub->($_c, $stream_name, $payload);
await $ret if $ret->$_can('then');
} elsif ($method eq 'doRequest') {
# params
my ($stream_name, $request_name, $payload) = @$params;
# validation
defined $id and ! length ref $id
or die "request '$request_name' on stream '$stream_name' has missing or invalid id ("
. ($id // 'undef') . ")\n";
$registry->is_member_of($_c, $stream_name)
or die jsonrpc_error(
403,
"Connection has not joined '$stream_name' but tried to do request '$request_name'"
)
unless $stream_name eq '!open';
# fetch + act
my $request_sub = $registry->get_request($stream_name, $request_name)
or die "invalid request '$request_name' on stream '$stream_name'\n";
my $result = $request_sub->($_c, $stream_name, $payload);
$result = await $result if $result->$_can('then');
# respond
await _send_p($_c, {
jsonrpc => '2.0',
result => $result,
id => $id,
});
} else {
die jsonrpc_error -32_601, 'invalid method', { method => $method };
}
} catch ($e) {
$_c->log->error(trim "$e");
if (defined $id) {
my $jsonrpc_error =
$e isa 'BoardStreams::Error::JSONRPC' ? $e
: jsonrpc_error 500, trim("$e");
await _send_p($_c, {
jsonrpc => '2.0',
error => $jsonrpc_error,
id => $id,
});
}
};
}) if $c->tx;
await _send_p($c, {
type => 'config',
data => {
pingInterval => 0 + $PING_INTERVAL,
},
});
});
$app->hook(around_action => async sub ($next, $c, $action, $last) {
if ($last and $c->stash->{'boardstreams.endpoint'}) {
$c->render_later;
try {
my $ret = $next->();
await $ret if $ret->$_can('then');
return await $c->bs->init_client_p;
} catch ($e) {
await sleep_p(1.5);
await _send_p($c, { type => 'connection failure', requestId => scalar eval {$c->req->request_id} });
$c->finish if $c->tx;
$c->log->error($e);
die $e;
};
} else {
return $next->();
}
});
$app->helper('bs.create_stream_p' => async sub ($c, $stream_name, $starting_state, %opts) {
local @CARP_NOT = qw/ Mojolicious::Renderer /,
croak 'Not in an event loop, using bs->create_stream_p; use bs->create_stream instead'
unless Mojo::IOLoop->is_running;
# opts can be: type, keep_events
my $type = $opts{type};
my $keep_events = exists $opts{keep_events} ? int(!!$opts{keep_events}) : 1;
# validate params
$stream_name =~ $BoardStreams::REs::STREAM_NAME
or croak "invalid stream name: '$stream_name'";
defined $starting_state
or croak "starting state not defined";
my $db = $BoardStreams::db // $pg->db;
# TODO: Consider locking this row for update
return !!0 if await exists_p($db, 'bs_streams', { name => $stream_name });
my $savepoint_name = unique_id;
my ($in_txn) = eval {
await $db->query_p(qq{SAVEPOINT "$savepoint_name"});
1
};
try {
await query_throwing_exception_object_p($db, 'insert_p', [
'bs_streams',
{
name => $stream_name,
state => { -json => $starting_state },
type => $type,
keep_events => $keep_events,
},
]);
return 1;
} catch ($e) {
die $e
unless $e isa 'BoardStreams::Error::DB::Duplicate'
and $e->data->{key_name} eq UNIQUE_STREAM_NAME_INDEX;
( run in 0.736 second using v1.01-cache-2.11-cpan-75ffa21a3d4 )