Stardust
view release on metacpan or search on metacpan
lib/Stardust.pm view on Meta::CPAN
C(
Home => [ '/' ],
get => sub {
my ($self) = @_;
$self->headers->{'Content-Type'} = 'text/plain';
return $info;
},
),
# ChannelList - [public]
# This returns a list of all channel names currently in use.
C(
ChannelList => [ '/channel' ],
get => sub {
my ($self) = @_;
encode_json([ sort keys %channels ]);
}
),
# Channel
# To generate messages on a channel, POST a JSON object to this controller
# using the CGI variable 'm'.
#
# NOTE:
# The post method of this controller is meant for INTERNAL USE ONLY.
# By default, only clients from 127.0.0.1 can access this controller.
# Everyone else is rejected.
C(
Channel => [ '/channel/([\w+]+)' ],
# [public] It should return a list of channel objects.
get => sub {
my ($self, $channels) = @_;
my @ch = split(/\+/, $channels);
encode_json([ map { my $ch = channel($_); $ch->to_hash } @ch ]);
},
# [private] It should accept a JSON object and send it to the appropriate channels.
post => sub {
my ($self, $channels) = @_;
my $m = $self->input->{m};
return unless $m;
my @ch = split(/\+/, $channels);
my @ev;
my $messages = (ref($m) eq 'ARRAY') ? $m : [$m];
@ev = map { decode_json($_) } @$messages;
for my $name (@ch) {
for my $event (@ev) {
channel($name)->write($event);
}
}
},
),
# Message - [public]
# This controller emits a stream of messages to long-polling clients.
C(
Message => [ '/channel/([\w+]+)/stream/([.\d]+)' ],
get => sub {
warn "coro [$Coro::current]" if $CONFIG{debug};
my ($self, $channels, $client_id) = @_;
my $input = $self->input;
my $cr = $self->cr;
my @ch = split(/\+/, $channels);
my $last = time;
while (1) {
# Output
warn "top of loop" if $CONFIG{debug};
my @messages =
grep { defined }
map { my $ch = channel($_); $ch->read_since($last) } @ch;
my $x = async {
warn "printing...".encode_json(\@messages) if $CONFIG{debug};
$cr->print(encode_json(\@messages));
};
$x->join;
$last = time;
# Hold for a brief moment until the next long poll request comes in.
warn "waiting for next request" if ($CONFIG{debug});
$cr->next;
# Start 1 coro for each channel we're listening to.
# Each coro will have the same Coro::Signal object, $activity.
my $activity = Coro::Signal->new;
my @coros = map {
my $ch = channel($_);
async { $ch->signal->wait; $activity->broadcast };
} @ch;
# When running this behind a reverse proxy,
# it's useful to timeout before your proxy kills the connection.
push @coros, async {
my $timeout = Coro::Timer::timeout $CONFIG{timeout};
while (not $timeout) {
Coro::schedule;
}
warn "timeout\n" if $CONFIG{debug};
$activity->broadcast;
};
# The first coro that does $activity->broadcast wins.
warn "waiting for activity on any of (@ch); last is $last" if $CONFIG{debug};
$activity->wait;
# Cancel the remaining coros.
for (@coros) { $_->cancel }
}
},
continuity => 1,
),
);
1;
__END__
=head1 NAME
Stardust - the simplest COMET server I could imagine
=head1 SYNOPSIS
Installing Stardust:
$ sudo cpan Stardust
Running the COMET server on port 5555:
$ stardust.pl --port=5555 --base=/comet
Making pages subscribe to channel 'foo':
<script>
var uniqueId = Math.random().toString();
$.ev.loop('/comet/channel/foo/'+uniqueId, {
"*": function(ev) {
}
});
</script>
Posting JSON messages to channel 'foo':
curl -d 'm={ "type": "TestMessage", "data": [3, 2, 1] }' \
http://localhost:5555/comet/channel/foo
=head1 DESCRIPTION
Stardust is a simple COMET server that can be integrated alongside existing
web applications.
=head1 CONCEPTS
=head2 Message
Messages are just abritrary JSON objects.
=head2 Channel
Channels are where messages travel trough.
=head1 API
Communication with the Stardust COMET server uses JSON over HTTP.
( run in 1.040 second using v1.01-cache-2.11-cpan-39bf76dae61 )