BoardStreams
view release on metacpan or search on metacpan
lib/BoardStreams/Client/Util.pm view on Meta::CPAN
use Exporter 'import';
our @EXPORT_OK = qw/ debug unique_id /;
our $VERSION = "v0.0.36";
my $log = Mojo::Log->new;
sub debug (@args) { $log->info(@args) }
my $UNIQUE_ID_LIMIT = 2 ** 50;
my $unique_id_cursor = 1;
sub unique_id {
my $ret = $unique_id_cursor++;
$unique_id_cursor <= $UNIQUE_ID_LIMIT or $unique_id_cursor = 1;
return "$ret";
}
1;
lib/BoardStreams/Registry.pm view on Meta::CPAN
my $thing = $_;
if (! length ref $thing) {
$thing =~ $BoardStreams::REs::ANY_STREAM_NAME or croak 'invalid stream definition';
split /\:/, $thing;
} else {
$thing;
}
} @$stream_def;
my $start = $self->_actions_requests;
my $cursor_ref = \$start;
SEGMENT:
foreach my $segment (@$stream_def) {
if (! length ref $segment) {
$cursor_ref = \($$cursor_ref->{strings}{$segment} //= {});
} else {
foreach my $pair (pairs $$cursor_ref->{regexes}->@*) {
my ($preexisting_regex, $hashref) = @$pair;
if ("$segment" eq "$preexisting_regex") {
$cursor_ref = \$hashref;
next SEGMENT;
}
}
push $$cursor_ref->{regexes}->@*, ($segment => {});
$cursor_ref = \$$cursor_ref->{regexes}[-1];
}
}
$$cursor_ref->{$type}{$thing_name} = $sub;
}
sub get_action_request ($self, $type, $stream_name, $thing_name) {
# validate
belongs_to($type, [qw/ action request join_leave /])
or die "invalid type '$type'";
my @segments = split /\:/, $stream_name;
my @cursors = $self->_actions_requests;
foreach my $segment (@segments) {
my @new_cursors = grep defined, map $_->{strings}{$segment}, @cursors;
foreach my $cursor (@cursors) {
if ($cursor->{regexes}) {
foreach my $pair (pairs $cursor->{regexes}->@*) {
my ($regex, $new_candidate_cursor) = @$pair;
if ($segment =~ $regex) {
push @new_cursors, $new_candidate_cursor;
}
}
}
}
@cursors = @new_cursors;
}
return (grep defined, map $_->{$type}{$thing_name}, @cursors)[0];
}
sub get_action ($self, $stream_name, $action_name) {
return $self->get_action_request(action => $stream_name, $action_name);
}
sub get_request ($self, $stream_name, $request_name) {
return $self->get_action_request(request => $stream_name, $request_name);
}
lib/BoardStreams/Util.pm view on Meta::CPAN
sub belongs_to ($x, $array) {
return any { eqq($_, $x) } @$array;
}
sub unique_id { Data::GUID->new->as_base64_urlsafe }
sub hashify ($hashes, $fields, $sub = undef) {
my $ret = {};
foreach my $hash (@$hashes) {
my $cursor = \$ret;
foreach my $field (@$fields) {
my $value = $hash->{$field};
$cursor = \($$cursor->{$value} //= {});
}
$$cursor = $sub ? do {
local $_ = $hash;
$sub->($hash);
} : $hash;
}
return $ret;
}
sub next_tick_p {
my $p = Mojo::Promise->new;
lib/Mojolicious/Plugin/BoardStreams.pm view on Meta::CPAN
my $message = encode_json $data;
my $whole_length = length $message;
if ($whole_length <= get_max_size) {
$c->tx or return !!0; # check if transaction is destroyed
$c->send({ binary => $message });
return 1;
}
my $identifier = $c->stash->{'boardstreams.outgoing_uuid'}++;
for (my ($i, $cursor, $sent_ending) = (0, 0, 0); ! $sent_ending; $i++) {
my $bytes_prefix;
my $ending_prefix = ":$identifier $i\$: ";
my $max_size = get_max_size;
if (length($ending_prefix) + $whole_length - $cursor <= $max_size) {
$bytes_prefix = $ending_prefix;
$sent_ending = 1;
} else {
$bytes_prefix = ":$identifier $i: ";
}
my $max_sublength = $max_size - length $bytes_prefix;
my $substring = $cursor <= $whole_length ? substr($message, $cursor, $max_sublength) : '';
$cursor += $max_sublength;
$c->tx or return !!0; # check if transaction is destroyed
$c->send({ binary => $bytes_prefix . $substring });
# don't cause other threads to hang if message is very large
await next_tick_p unless $sent_ending;
}
return 1;
}
lib/Mojolicious/Plugin/BoardStreams.pm view on Meta::CPAN
my $notification = encode_json({
id => int $new_event_id,
event => $new_event,
patch => $diff,
});
my $whole_length = length $notification;
if ($whole_length <= $NOTIFY_SIZE_LIMIT) {
$db->notify($pg_channel, $notification);
} else {
# send notification in chunks
for (my ($i, $cursor, $sent_ending) = (0, 0, 0); ! $sent_ending; $i++) {
my $bytes_prefix;
my $ending_bytes_prefix = ":$new_event_id $i\$: ";
# this assumes that length $ending_bytes_prefix < $NOTIFY_SIZE_LIMIT
if (length($ending_bytes_prefix) + $whole_length - $cursor <= $NOTIFY_SIZE_LIMIT) {
$bytes_prefix = $ending_bytes_prefix;
$sent_ending = 1;
} else {
$bytes_prefix = ":$new_event_id $i: ";
}
my $max_sublength = $NOTIFY_SIZE_LIMIT - length $bytes_prefix;
my $substring = $cursor <= $whole_length ? substr($notification, $cursor, $max_sublength) : '';
$cursor += $max_sublength;
$db->notify($pg_channel, $bytes_prefix . $substring);
# don't cause other threads to hang if notification is very large
await next_tick_p unless $stream_names->[-1] eq $sname and $sent_ending;
}
}
}
}
});
( run in 0.251 second using v1.01-cache-2.11-cpan-4d50c553e7e )