Alice
view release on metacpan or search on metacpan
lib/Alice/HTTP/Stream/WebSocket.pm view on Meta::CPAN
package Alice::HTTP::Stream::WebSocket;
use JSON;
use Any::Moose;
use Digest::MD5 qw/md5/;
use Time::HiRes qw/time/;
extends 'Alice::HTTP::Stream';
has fh => (
is => 'ro',
required => 1,
);
has handle => (
is => 'rw',
);
has on_read => (
is => 'ro',
isa => 'CodeRef',
);
has is_xhr => (
is => 'ro',
default => 0,
);
has ws_version => (
is => 'rw',
required => 1,
);
sub BUILD {
my $self = shift;
my $h = AnyEvent::Handle->new(
fh => $self->fh,
rbuf_max => 1024 * 10,
);
$h->{ws_version} = $self->ws_version;
$h->on_error(sub {
$self->close;
undef $h;
$self->on_error->();
});
$h->on_eof(sub {
$self->close;
undef $h;
$self->on_error->();
});
$h->on_read(sub {
$_[0]->push_read(
'AnyEvent::Handle::Message::WebSocket',
sub { $self->on_read->(from_json $_[1]) }
);
});
$self->handle($h);
$self->send([{type => "identify", id => $self->id}]);
}
sub send {
my ($self, $messages) = @_;
$messages = [$messages] unless ref $messages eq "ARRAY";
my $line = to_json(
{queue => $messages},
{utf8 => 1, shrink => 1}
);
$self->send_raw($line);
}
sub send_raw {
my ($self, $string) = @_;
$self->handle->push_write(
'AnyEvent::Handle::Message::WebSocket',
$string
);
}
sub close {
my $self = shift;
$self->handle->destroy;
$self->closed(1);
}
__PACKAGE__->meta->make_immutable;
1;
( run in 0.672 second using v1.01-cache-2.11-cpan-39bf76dae61 )