BoardStreams

 view release on metacpan or  search on metacpan

lib/BoardStreams/Client/Util.pm  view on Meta::CPAN


use Exporter 'import';
our @EXPORT_OK = qw/ debug unique_id /;

our $VERSION = "v0.0.36";

my $log = Mojo::Log->new;
sub debug (@args) { $log->info(@args) }

my $UNIQUE_ID_LIMIT = 2 ** 50;
my $unique_id_cursor = 1;
sub unique_id {
    my $ret = $unique_id_cursor++;
    $unique_id_cursor <= $UNIQUE_ID_LIMIT or $unique_id_cursor = 1;
    return "$ret";
}

1;

lib/BoardStreams/Registry.pm  view on Meta::CPAN

        my $thing = $_;
        if (! length ref $thing) {
            $thing =~ $BoardStreams::REs::ANY_STREAM_NAME or croak 'invalid stream definition';
            split /\:/, $thing;
        } else {
            $thing;
        }
    } @$stream_def;

    my $start = $self->_actions_requests;
    my $cursor_ref = \$start;
    SEGMENT:
    foreach my $segment (@$stream_def) {
        if (! length ref $segment) {
            $cursor_ref = \($$cursor_ref->{strings}{$segment} //= {});
        } else {
            foreach my $pair (pairs $$cursor_ref->{regexes}->@*) {
                my ($preexisting_regex, $hashref) = @$pair;
                if ("$segment" eq "$preexisting_regex") {
                    $cursor_ref = \$hashref;
                    next SEGMENT;
                }
            }
            push $$cursor_ref->{regexes}->@*, ($segment => {});
            $cursor_ref = \$$cursor_ref->{regexes}[-1];
        }
    }
    $$cursor_ref->{$type}{$thing_name} = $sub;
}

sub get_action_request ($self, $type, $stream_name, $thing_name) {
    # validate
    belongs_to($type, [qw/ action request join_leave /])
        or die "invalid type '$type'";

    my @segments = split /\:/, $stream_name;
    my @cursors = $self->_actions_requests;
    foreach my $segment (@segments) {
        my @new_cursors = grep defined, map $_->{strings}{$segment}, @cursors;
        foreach my $cursor (@cursors) {
            if ($cursor->{regexes}) {
                foreach my $pair (pairs $cursor->{regexes}->@*) {
                    my ($regex, $new_candidate_cursor) = @$pair;
                    if ($segment =~ $regex) {
                        push @new_cursors, $new_candidate_cursor;
                    }
                }
            }
        }
        @cursors = @new_cursors;
    }
    return (grep defined, map $_->{$type}{$thing_name}, @cursors)[0];
}

sub get_action ($self, $stream_name, $action_name) {
    return $self->get_action_request(action => $stream_name, $action_name);
}

sub get_request ($self, $stream_name, $request_name) {
    return $self->get_action_request(request => $stream_name, $request_name);
}

lib/BoardStreams/Util.pm  view on Meta::CPAN

sub belongs_to ($x, $array) {
    return any { eqq($_, $x) } @$array;
}

sub unique_id { Data::GUID->new->as_base64_urlsafe }

sub hashify ($hashes, $fields, $sub = undef) {
    my $ret = {};

    foreach my $hash (@$hashes) {
        my $cursor = \$ret;
        foreach my $field (@$fields) {
            my $value = $hash->{$field};
            $cursor = \($$cursor->{$value} //= {});
        }
        $$cursor = $sub ? do {
            local $_ = $hash;
            $sub->($hash);
        } : $hash;
    }

    return $ret;
}

sub next_tick_p {
    my $p = Mojo::Promise->new;

lib/Mojolicious/Plugin/BoardStreams.pm  view on Meta::CPAN

        my $message = encode_json $data;
        my $whole_length = length $message;
        if ($whole_length <= get_max_size) {
            $c->tx or return !!0; # check if transaction is destroyed
            $c->send({ binary => $message });
            return 1;
        }

        my $identifier = $c->stash->{'boardstreams.outgoing_uuid'}++;

        for (my ($i, $cursor, $sent_ending) = (0, 0, 0); ! $sent_ending; $i++) {
            my $bytes_prefix;
            my $ending_prefix = ":$identifier $i\$: ";
            my $max_size = get_max_size;
            if (length($ending_prefix) + $whole_length - $cursor <= $max_size) {
                $bytes_prefix = $ending_prefix;
                $sent_ending = 1;
            } else {
                $bytes_prefix = ":$identifier $i: ";
            }

            my $max_sublength = $max_size - length $bytes_prefix;
            my $substring = $cursor <= $whole_length ? substr($message, $cursor, $max_sublength) : '';
            $cursor += $max_sublength;

            $c->tx or return !!0; # check if transaction is destroyed
            $c->send({ binary => $bytes_prefix . $substring });

            # don't cause other threads to hang if message is very large
            await next_tick_p unless $sent_ending;
        }

        return 1;
    }

lib/Mojolicious/Plugin/BoardStreams.pm  view on Meta::CPAN

                    my $notification = encode_json({
                        id    => int $new_event_id,
                        event => $new_event,
                        patch => $diff,
                    });
                    my $whole_length = length $notification;
                    if ($whole_length <= $NOTIFY_SIZE_LIMIT) {
                        $db->notify($pg_channel, $notification);
                    } else {
                        # send notification in chunks
                        for (my ($i, $cursor, $sent_ending) = (0, 0, 0); ! $sent_ending; $i++) {
                            my $bytes_prefix;
                            my $ending_bytes_prefix = ":$new_event_id $i\$: ";
                            # this assumes that length $ending_bytes_prefix < $NOTIFY_SIZE_LIMIT
                            if (length($ending_bytes_prefix) + $whole_length - $cursor <= $NOTIFY_SIZE_LIMIT) {
                                $bytes_prefix = $ending_bytes_prefix;
                                $sent_ending = 1;
                            } else {
                                $bytes_prefix = ":$new_event_id $i: ";
                            }

                            my $max_sublength = $NOTIFY_SIZE_LIMIT - length $bytes_prefix;
                            my $substring = $cursor <= $whole_length ? substr($notification, $cursor, $max_sublength) : '';
                            $cursor += $max_sublength;

                            $db->notify($pg_channel, $bytes_prefix . $substring);

                            # don't cause other threads to hang if notification is very large
                            await next_tick_p unless $stream_names->[-1] eq $sname and $sent_ending;
                        }
                    }
                }
            }
        });



( run in 0.251 second using v1.01-cache-2.11-cpan-4d50c553e7e )