BoardStreams

 view release on metacpan or  search on metacpan

lib/Mojolicious/Plugin/BoardStreams.pm  view on Meta::CPAN

                        or die "invalid action '$action_name' on stream '$stream_name'\n";

                    my $ret = $action_sub->($_c, $stream_name, $payload);
                    await $ret if $ret->$_can('then');
                } elsif ($method eq 'doRequest') {
                    # params
                    my ($stream_name, $request_name, $payload) = @$params;

                    # validation
                    defined $id and ! length ref $id
                        or die "request '$request_name' on stream '$stream_name' has missing or invalid id ("
                            . ($id // 'undef') . ")\n";
                    $registry->is_member_of($_c, $stream_name)
                        or die jsonrpc_error(
                            403,
                            "Connection has not joined '$stream_name' but tried to do request '$request_name'"
                        )
                        unless $stream_name eq '!open';

                    # fetch + act
                    my $request_sub = $registry->get_request($stream_name, $request_name)
                        or die "invalid request '$request_name' on stream '$stream_name'\n";

                    my $result = $request_sub->($_c, $stream_name, $payload);
                    $result = await $result if $result->$_can('then');

                    # respond
                    await _send_p($_c, {
                        jsonrpc => '2.0',
                        result  => $result,
                        id      => $id,
                    });
                } else {
                    die jsonrpc_error -32_601, 'invalid method', { method => $method };
                }
            } catch ($e) {
                $_c->log->error(trim "$e");
                if (defined $id) {
                    my $jsonrpc_error =
                        $e isa 'BoardStreams::Error::JSONRPC' ? $e
                        : jsonrpc_error 500, trim("$e");

                    await _send_p($_c, {
                        jsonrpc => '2.0',
                        error   => $jsonrpc_error,
                        id      => $id,
                    });
                }
            };
        }) if $c->tx;

        await _send_p($c, {
            type => 'config',
            data => {
                pingInterval => 0 + $PING_INTERVAL,
            },
        });
    });

    $app->hook(around_action => async sub ($next, $c, $action, $last) {
        if ($last and $c->stash->{'boardstreams.endpoint'}) {
            $c->render_later;
            try {
                my $ret = $next->();
                await $ret if $ret->$_can('then');
                return await $c->bs->init_client_p;
            } catch ($e) {
                await sleep_p(1.5);
                await _send_p($c, { type => 'connection failure', requestId => scalar eval {$c->req->request_id} });
                $c->finish if $c->tx;
                $c->log->error($e);
                die $e;
            };
        } else {
            return $next->();
        }
    });

    $app->helper('bs.create_stream_p' => async sub ($c, $stream_name, $starting_state, %opts) {
        local @CARP_NOT = qw/ Mojolicious::Renderer /,
            croak 'Not in an event loop, using bs->create_stream_p; use bs->create_stream instead'
            unless Mojo::IOLoop->is_running;

        # opts can be: type, keep_events
        my $type = $opts{type};
        my $keep_events = exists $opts{keep_events} ? int(!!$opts{keep_events}) : 1;

        # validate params
        $stream_name =~ $BoardStreams::REs::STREAM_NAME
            or croak "invalid stream name: '$stream_name'";
        defined $starting_state
            or croak "starting state not defined";

        my $db = $BoardStreams::db // $pg->db;

        # TODO: Consider locking this row for update
        return !!0 if await exists_p($db, 'bs_streams', { name => $stream_name });

        my $savepoint_name = unique_id;
        my ($in_txn) = eval {
            await $db->query_p(qq{SAVEPOINT "$savepoint_name"});
            1
        };

        try {
            await query_throwing_exception_object_p($db, 'insert_p', [
                'bs_streams',
                {
                    name        => $stream_name,
                    state       => { -json => $starting_state },
                    type        => $type,
                    keep_events => $keep_events,
                },
            ]);

            return 1;
        } catch ($e) {
            die $e
                unless $e isa 'BoardStreams::Error::DB::Duplicate'
                    and $e->data->{key_name} eq UNIQUE_STREAM_NAME_INDEX;



( run in 0.736 second using v1.01-cache-2.11-cpan-75ffa21a3d4 )