BoardStreams
view release on metacpan or search on metacpan
lib/Mojolicious/Plugin/BoardStreams.pm view on Meta::CPAN
{
limit => 1,
for => 'update',
},
))->hashes->[0] or last;
my ($stream_id, $stream_name) = $stream_row->@{qw/ id name /};
my @new_dead_worker_ids_for_stream;
my $get_dead_worker_ids = async sub {
my $results = await $db->select_p(
[
'bs_guards',
[-left, 'bs_workers', id => 'worker_id'],
],
[\'DISTINCT bs_guards.worker_id'],
{
'bs_guards.stream_id' => $stream_id,
'bs_workers.id' => undef,
},
);
@new_dead_worker_ids_for_stream = $results->arrays->map(sub {$_->[0]})->@*;
return { map {( $_ => 1 )} @new_dead_worker_ids_for_stream };
};
# repair stream
if (my $stream_repair_sub = $registry->get_repair($stream_name)) {
my $ret = $stream_repair_sub->($c, $stream_name, $get_dead_worker_ids);
await $ret if $ret->$_can('then');
}
# delete guards pointing to this stream
await $db->delete_p(
'bs_guards',
{
stream_id => $stream_id,
worker_id => {-in, \@new_dead_worker_ids_for_stream},
},
) if @new_dead_worker_ids_for_stream;
$tx->commit;
}
});
# send JSON, but only if transaction is not destroyed
async sub _send_p ($c, $data) {
my sub get_max_size {
my $tx = $c->tx or return $MAX_WEBSOCKET_SIZE;
return min($tx->max_websocket_size, $MAX_WEBSOCKET_SIZE);
}
my $message = encode_json $data;
my $whole_length = length $message;
if ($whole_length <= get_max_size) {
$c->tx or return !!0; # check if transaction is destroyed
$c->send({ binary => $message });
return 1;
}
my $identifier = $c->stash->{'boardstreams.outgoing_uuid'}++;
for (my ($i, $cursor, $sent_ending) = (0, 0, 0); ! $sent_ending; $i++) {
my $bytes_prefix;
my $ending_prefix = ":$identifier $i\$: ";
my $max_size = get_max_size;
if (length($ending_prefix) + $whole_length - $cursor <= $max_size) {
$bytes_prefix = $ending_prefix;
$sent_ending = 1;
} else {
$bytes_prefix = ":$identifier $i: ";
}
my $max_sublength = $max_size - length $bytes_prefix;
my $substring = $cursor <= $whole_length ? substr($message, $cursor, $max_sublength) : '';
$cursor += $max_sublength;
$c->tx or return !!0; # check if transaction is destroyed
$c->send({ binary => $bytes_prefix . $substring });
# don't cause other threads to hang if message is very large
await next_tick_p unless $sent_ending;
}
return 1;
}
$app->helper('bs.init_client_p' => async sub ($c) {
$can_accept_clients or return $c->rendered(503);
$registry->conn_subscriptions->{$c} = {};
$registry->pending_joins->{$c} = rx_behavior_subject->new(0);
$registry->register_conn($c);
$c->stash->{'boardstreams.outgoing_uuid'} = 'a';
$c->on(finish => async sub ($_c, @) {
await $on_client_finish_sub->($_c);
});
await sleep_p(0.25); # mojo issue 1895
$c->on(text => async sub ($_c, $bytes) {
my $data = decode_json $bytes;
my $id;
# pong and return on ping
if (($data->{type} // '') eq 'ping') {
await _send_p($_c, { type => 'pong' });
return;
}
try {
defined $data->{jsonrpc} and $data->{jsonrpc} eq '2.0'
or die 'incoming message is not jsonrpc 2.0';
(my $method, my $params, $id) = $data->@{qw/ method params id /};
! $is_finished or die jsonrpc_error 503, 'this server worker stopped, please try again';
$registry->is_conn_registered($_c)
or die jsonrpc_error 503, "can't receive because connection is closing";
if ($method eq 'doAction') {
# params
my ($stream_name, $action_name, $payload) = @$params;
# validation
! defined $id or die "action '$action_name' on stream '$stream_name' contains extra id ($id)\n";
$registry->is_member_of($_c, $stream_name)
or die "Connection has not joined '$stream_name' but tried to do action '$action_name'\n"
unless $stream_name eq '!open';
# fetch + act
my $action_sub = $registry->get_action($stream_name, $action_name)
or die "invalid action '$action_name' on stream '$stream_name'\n";
lib/Mojolicious/Plugin/BoardStreams.pm view on Meta::CPAN
my $diff;
if (defined $new_state) {
delete $old_state->{_secret} if ref $old_state eq 'HASH';
delete $new_state->{_secret} if ref $new_state eq 'HASH';
$diff = diff($old_state, $new_state, noO => 1, noU => 1);
$diff = undef if ! %$diff;
}
if (defined $extra_guards) {
if ($extra_guards > 0) {
await $db->insert_p(
'bs_guards',
{
worker_id => $worker_id,
stream_id => $stream_id,
count => $extra_guards,
},
{
on_conflict => [
[ 'worker_id', 'stream_id' ] => { count => \'bs_guards.count + EXCLUDED.count' },
],
}
);
} elsif ($extra_guards < 0) {
my $guard_row = (await $db->update_p(
'bs_guards',
{ count => \['"count" - ?', abs($extra_guards)] },
{
worker_id => $worker_id,
stream_id => $stream_id,
},
{ returning => 'count' },
))->hashes->[0] or croak "missing guard row";
if (! $guard_row->{count}) {
await $db->delete_p(
'bs_guards',
{
worker_id => $worker_id,
stream_id => $stream_id,
count => 0, # because user may have forgotten to start a txn
},
);
}
}
}
if (defined $diff or defined $new_event) {
my $pg_channel = get_pg_channel_name($sname);
my $notification = encode_json({
id => int $new_event_id,
event => $new_event,
patch => $diff,
});
my $whole_length = length $notification;
if ($whole_length <= $NOTIFY_SIZE_LIMIT) {
$db->notify($pg_channel, $notification);
} else {
# send notification in chunks
for (my ($i, $cursor, $sent_ending) = (0, 0, 0); ! $sent_ending; $i++) {
my $bytes_prefix;
my $ending_bytes_prefix = ":$new_event_id $i\$: ";
# this assumes that length $ending_bytes_prefix < $NOTIFY_SIZE_LIMIT
if (length($ending_bytes_prefix) + $whole_length - $cursor <= $NOTIFY_SIZE_LIMIT) {
$bytes_prefix = $ending_bytes_prefix;
$sent_ending = 1;
} else {
$bytes_prefix = ":$new_event_id $i: ";
}
my $max_sublength = $NOTIFY_SIZE_LIMIT - length $bytes_prefix;
my $substring = $cursor <= $whole_length ? substr($notification, $cursor, $max_sublength) : '';
$cursor += $max_sublength;
$db->notify($pg_channel, $bytes_prefix . $substring);
# don't cause other threads to hang if notification is very large
await next_tick_p unless $stream_names->[-1] eq $sname and $sent_ending;
}
}
}
}
});
return undef;
});
$app->helper('bs.set_action' => sub ($c, $stream_name, $action_name, $sub) {
$registry->set_action_request(
action => $stream_name, $action_name, $sub
);
});
$app->helper('bs.set_request' => sub ($c, $stream_name, $request_name, $sub) {
$registry->set_action_request(
request => $stream_name, $request_name, $sub
);
});
$app->helper('bs.set_join' => sub ($c, $stream_name, $sub) {
$registry->set_action_request(
join_leave => $stream_name, 'join', $sub
);
});
$app->helper('bs.set_leave' => sub ($c, $stream_name, $sub) {
$registry->set_action_request(
join_leave => $stream_name, 'leave', $sub
);
});
$app->helper('bs.set_repair' => sub ($c, $stream_name, $sub) {
$registry->set_action_request(
join_leave => $stream_name, 'repair', $sub
);
});
$app->helper('bs.get_state_p' => async sub ($c, $stream_name) {
my $db = $BoardStreams::db // $pg->db;
my $stream_row = (await $db->select_p(
'bs_streams',
[qw/ state /],
{ name => $stream_name },
{ for => 'update' },
))->expand->hashes->[0] or return undef;
return $stream_row->{state};
});
# join
$app->bs->set_request('!open', 'join', async sub ($c, $, $payload) {
my ($stream_name, $last_id) = $payload->@{qw/ name last_id /};
( run in 0.507 second using v1.01-cache-2.11-cpan-39bf76dae61 )