BoardStreams

 view release on metacpan or  search on metacpan

lib/Mojolicious/Plugin/BoardStreams.pm  view on Meta::CPAN

                {
                    limit => 1,
                    for => 'update',
                },
            ))->hashes->[0] or last;
            my ($stream_id, $stream_name) = $stream_row->@{qw/ id name /};

            my @new_dead_worker_ids_for_stream;
            my $get_dead_worker_ids = async sub {
                my $results = await $db->select_p(
                    [
                        'bs_guards',
                        [-left, 'bs_workers', id => 'worker_id'],
                    ],
                    [\'DISTINCT bs_guards.worker_id'],
                    {
                        'bs_guards.stream_id' => $stream_id,
                        'bs_workers.id'       => undef,
                    },
                );
                @new_dead_worker_ids_for_stream = $results->arrays->map(sub {$_->[0]})->@*;
                return { map {( $_ => 1 )} @new_dead_worker_ids_for_stream };
            };

            # repair stream
            if (my $stream_repair_sub = $registry->get_repair($stream_name)) {
                my $ret = $stream_repair_sub->($c, $stream_name, $get_dead_worker_ids);
                await $ret if $ret->$_can('then');
            }

            # delete guards pointing to this stream
            await $db->delete_p(
                'bs_guards',
                {
                    stream_id => $stream_id,
                    worker_id => {-in, \@new_dead_worker_ids_for_stream},
                },
            ) if @new_dead_worker_ids_for_stream;

            $tx->commit;
        }
    });

    # send JSON, but only if transaction is not destroyed
    async sub _send_p ($c, $data) {
        my sub get_max_size {
            my $tx = $c->tx or return $MAX_WEBSOCKET_SIZE;
            return min($tx->max_websocket_size, $MAX_WEBSOCKET_SIZE);
        }

        my $message = encode_json $data;
        my $whole_length = length $message;
        if ($whole_length <= get_max_size) {
            $c->tx or return !!0; # check if transaction is destroyed
            $c->send({ binary => $message });
            return 1;
        }

        my $identifier = $c->stash->{'boardstreams.outgoing_uuid'}++;

        for (my ($i, $cursor, $sent_ending) = (0, 0, 0); ! $sent_ending; $i++) {
            my $bytes_prefix;
            my $ending_prefix = ":$identifier $i\$: ";
            my $max_size = get_max_size;
            if (length($ending_prefix) + $whole_length - $cursor <= $max_size) {
                $bytes_prefix = $ending_prefix;
                $sent_ending = 1;
            } else {
                $bytes_prefix = ":$identifier $i: ";
            }

            my $max_sublength = $max_size - length $bytes_prefix;
            my $substring = $cursor <= $whole_length ? substr($message, $cursor, $max_sublength) : '';
            $cursor += $max_sublength;

            $c->tx or return !!0; # check if transaction is destroyed
            $c->send({ binary => $bytes_prefix . $substring });

            # don't cause other threads to hang if message is very large
            await next_tick_p unless $sent_ending;
        }

        return 1;
    }

    $app->helper('bs.init_client_p' => async sub ($c) {
        $can_accept_clients or return $c->rendered(503);

        $registry->conn_subscriptions->{$c} = {};
        $registry->pending_joins->{$c} = rx_behavior_subject->new(0);
        $registry->register_conn($c);
        $c->stash->{'boardstreams.outgoing_uuid'} = 'a';

        $c->on(finish => async sub ($_c, @) {
            await $on_client_finish_sub->($_c);
        });

        await sleep_p(0.25); # mojo issue 1895

        $c->on(text => async sub ($_c, $bytes) {
            my $data = decode_json $bytes;

            my $id;

            # pong and return on ping
            if (($data->{type} // '') eq 'ping') {
                await _send_p($_c, { type => 'pong' });
                return;
            }

            try {
                defined $data->{jsonrpc} and $data->{jsonrpc} eq '2.0'
                    or die 'incoming message is not jsonrpc 2.0';

                (my $method, my $params, $id) = $data->@{qw/ method params id /};

                ! $is_finished or die jsonrpc_error 503, 'this server worker stopped, please try again';
                $registry->is_conn_registered($_c)
                    or die jsonrpc_error 503, "can't receive because connection is closing";

                if ($method eq 'doAction') {
                    # params
                    my ($stream_name, $action_name, $payload) = @$params;

                    # validation
                    ! defined $id or die "action '$action_name' on stream '$stream_name' contains extra id ($id)\n";
                    $registry->is_member_of($_c, $stream_name)
                        or die "Connection has not joined '$stream_name' but tried to do action '$action_name'\n"
                        unless $stream_name eq '!open';

                    # fetch + act
                    my $action_sub = $registry->get_action($stream_name, $action_name)
                        or die "invalid action '$action_name' on stream '$stream_name'\n";

lib/Mojolicious/Plugin/BoardStreams.pm  view on Meta::CPAN


                my $diff;
                if (defined $new_state) {
                    delete $old_state->{_secret} if ref $old_state eq 'HASH';
                    delete $new_state->{_secret} if ref $new_state eq 'HASH';
                    $diff = diff($old_state, $new_state, noO => 1, noU => 1);
                    $diff = undef if ! %$diff;
                }

                if (defined $extra_guards) {
                    if ($extra_guards > 0) {
                        await $db->insert_p(
                            'bs_guards',
                            {
                                worker_id => $worker_id,
                                stream_id => $stream_id,
                                count     => $extra_guards,
                            },
                            {
                                on_conflict => [
                                    [ 'worker_id', 'stream_id' ] => { count => \'bs_guards.count + EXCLUDED.count' },
                                ],
                            }
                        );
                    } elsif ($extra_guards < 0) {
                        my $guard_row = (await $db->update_p(
                            'bs_guards',
                            { count => \['"count" - ?', abs($extra_guards)] },
                            {
                                worker_id => $worker_id,
                                stream_id => $stream_id,
                            },
                            { returning => 'count' },
                        ))->hashes->[0] or croak "missing guard row";

                        if (! $guard_row->{count}) {
                            await $db->delete_p(
                                'bs_guards',
                                {
                                    worker_id => $worker_id,
                                    stream_id => $stream_id,
                                    count     => 0, # because user may have forgotten to start a txn
                                },
                            );
                        }
                    }
                }

                if (defined $diff or defined $new_event) {
                    my $pg_channel = get_pg_channel_name($sname);
                    my $notification = encode_json({
                        id    => int $new_event_id,
                        event => $new_event,
                        patch => $diff,
                    });
                    my $whole_length = length $notification;
                    if ($whole_length <= $NOTIFY_SIZE_LIMIT) {
                        $db->notify($pg_channel, $notification);
                    } else {
                        # send notification in chunks
                        for (my ($i, $cursor, $sent_ending) = (0, 0, 0); ! $sent_ending; $i++) {
                            my $bytes_prefix;
                            my $ending_bytes_prefix = ":$new_event_id $i\$: ";
                            # this assumes that length $ending_bytes_prefix < $NOTIFY_SIZE_LIMIT
                            if (length($ending_bytes_prefix) + $whole_length - $cursor <= $NOTIFY_SIZE_LIMIT) {
                                $bytes_prefix = $ending_bytes_prefix;
                                $sent_ending = 1;
                            } else {
                                $bytes_prefix = ":$new_event_id $i: ";
                            }

                            my $max_sublength = $NOTIFY_SIZE_LIMIT - length $bytes_prefix;
                            my $substring = $cursor <= $whole_length ? substr($notification, $cursor, $max_sublength) : '';
                            $cursor += $max_sublength;

                            $db->notify($pg_channel, $bytes_prefix . $substring);

                            # don't cause other threads to hang if notification is very large
                            await next_tick_p unless $stream_names->[-1] eq $sname and $sent_ending;
                        }
                    }
                }
            }
        });

        return undef;
    });

    $app->helper('bs.set_action' => sub ($c, $stream_name, $action_name, $sub) {
        $registry->set_action_request(
            action => $stream_name, $action_name, $sub
        );
    });

    $app->helper('bs.set_request' => sub ($c, $stream_name, $request_name, $sub) {
        $registry->set_action_request(
            request => $stream_name, $request_name, $sub
        );
    });

    $app->helper('bs.set_join' => sub ($c, $stream_name, $sub) {
        $registry->set_action_request(
            join_leave => $stream_name, 'join', $sub
        );
    });

    $app->helper('bs.set_leave' => sub ($c, $stream_name, $sub) {
        $registry->set_action_request(
            join_leave => $stream_name, 'leave', $sub
        );
    });

    $app->helper('bs.set_repair' => sub ($c, $stream_name, $sub) {
        $registry->set_action_request(
            join_leave => $stream_name, 'repair', $sub
        );
    });

    $app->helper('bs.get_state_p' => async sub ($c, $stream_name) {
        my $db = $BoardStreams::db // $pg->db;

        my $stream_row = (await $db->select_p(
            'bs_streams',
            [qw/ state /],
            { name => $stream_name },
            { for => 'update' },
        ))->expand->hashes->[0] or return undef;

        return $stream_row->{state};
    });

    # join
    $app->bs->set_request('!open', 'join', async sub ($c, $, $payload) {
        my ($stream_name, $last_id) = $payload->@{qw/ name last_id /};



( run in 0.507 second using v1.01-cache-2.11-cpan-39bf76dae61 )