BoardStreams

 view release on metacpan or  search on metacpan

lib/BoardStreams/Registry.pm  view on Meta::CPAN

    die "not a member of '$stream_name'\n"
        unless defined $$count_ref and $$count_ref > 0;

    if (--$$count_ref == 0) {
        delete $self->_streams_to_conns->{$stream_name}{$int_c};
        delete $self->_streams_to_conns->{$stream_name} if ! $self->_streams_to_conns->{$stream_name}->%*;
        delete $self->_conns_to_streams->{$int_c}{$stream_name};
        delete $self->_conns_to_streams->{$int_c} if ! $self->_conns_to_streams->{$int_c}->%*;
        return 1;
    }

    return 0;
}

sub is_member_of ($self, $c, $stream_name) {
    return exists $self->_streams_to_conns->{$stream_name}{int $c};
}

sub get_conns_of_stream ($self, $stream_name) {
    return [
        values $self->_streams_to_conns->{$stream_name}->%*
    ];
}

sub get_streams_and_counts_of_conn ($self, $c) {
    return $self->_conns_to_streams->{int $c};
}

sub get_all_conns ($self) {
    return [
        values $self->_conns->%*
    ];
}

sub inc_pending_joins_by ($self, $c, $n) {
    my $pj_o = $self->pending_joins->{$c};
    my $pj_value = $pj_o->get_value;
    $pj_o->next($pj_value + $n);
}

### ACTIONS AND REQUESTS

sub set_action_request ($self, $type, $stream_def, $thing_name, $sub) {
    # validate
    belongs_to($type, [qw/ action request join_leave /])
        or die "invalid type '$type'";

    # pre-process
    $stream_def = [$stream_def] if ref $stream_def ne 'ARRAY';
    @$stream_def = map {
        my $thing = $_;
        if (! length ref $thing) {
            $thing =~ $BoardStreams::REs::ANY_STREAM_NAME or croak 'invalid stream definition';
            split /\:/, $thing;
        } else {
            $thing;
        }
    } @$stream_def;

    my $start = $self->_actions_requests;
    my $cursor_ref = \$start;
    SEGMENT:
    foreach my $segment (@$stream_def) {
        if (! length ref $segment) {
            $cursor_ref = \($$cursor_ref->{strings}{$segment} //= {});
        } else {
            foreach my $pair (pairs $$cursor_ref->{regexes}->@*) {
                my ($preexisting_regex, $hashref) = @$pair;
                if ("$segment" eq "$preexisting_regex") {
                    $cursor_ref = \$hashref;
                    next SEGMENT;
                }
            }
            push $$cursor_ref->{regexes}->@*, ($segment => {});
            $cursor_ref = \$$cursor_ref->{regexes}[-1];
        }
    }
    $$cursor_ref->{$type}{$thing_name} = $sub;
}

sub get_action_request ($self, $type, $stream_name, $thing_name) {
    # validate
    belongs_to($type, [qw/ action request join_leave /])
        or die "invalid type '$type'";

    my @segments = split /\:/, $stream_name;
    my @cursors = $self->_actions_requests;
    foreach my $segment (@segments) {
        my @new_cursors = grep defined, map $_->{strings}{$segment}, @cursors;
        foreach my $cursor (@cursors) {
            if ($cursor->{regexes}) {
                foreach my $pair (pairs $cursor->{regexes}->@*) {
                    my ($regex, $new_candidate_cursor) = @$pair;
                    if ($segment =~ $regex) {
                        push @new_cursors, $new_candidate_cursor;
                    }
                }
            }
        }
        @cursors = @new_cursors;
    }
    return (grep defined, map $_->{$type}{$thing_name}, @cursors)[0];
}

sub get_action ($self, $stream_name, $action_name) {
    return $self->get_action_request(action => $stream_name, $action_name);
}

sub get_request ($self, $stream_name, $request_name) {
    return $self->get_action_request(request => $stream_name, $request_name);
}

sub get_join ($self, $stream_name) {
    return $self->get_action_request(join_leave => $stream_name, 'join');
}

sub get_leave ($self, $stream_name) {
    return $self->get_action_request(join_leave => $stream_name, 'leave');
}

sub get_repair ($self, $stream_name) {
    return $self->get_action_request(join_leave => $stream_name, 'repair');
}

1;



( run in 0.449 second using v1.01-cache-2.11-cpan-39bf76dae61 )