AnyEvent-WebSocket-Client
view release on metacpan or search on metacpan
lib/AnyEvent/WebSocket/Connection.pm view on Meta::CPAN
default => sub { 0 },
);
has subprotocol => (
is => 'ro',
);
has max_payload_size => (
is => 'ro',
);
has max_fragments => (
is => 'ro',
);
has close_code => (
is => 'rw',
);
has close_reason => (
is => 'rw',
);
has close_error => (
is => 'rw',
);
foreach my $type (qw( each_message next_message finish parse_error ))
{
has "_${type}_cb" => (
is => 'ro',
init_arg => undef,
default => sub { [] },
);
}
foreach my $flag (qw( _is_read_open _is_write_open ))
{
has $flag => (
is => 'rw',
init_arg => undef,
default => sub { 1 },
);
}
has "_is_finished" => (
is => 'rw',
init_arg => undef,
default => sub { 0 },
);
sub BUILD
{
my $self = shift;
Scalar::Util::weaken $self;
my @temp_messages = ();
my $are_callbacks_supposed_to_be_ready = 0;
my $finish = sub {
my(undef, undef, $message) = @_;
my $strong_self = $self; # preserve $self because otherwise $self can be destroyed in the callbacks.
return if $self->_is_finished;
eval
{
$self->_process_message($_) foreach @temp_messages;
};
@temp_messages = ();
$self->_is_finished(1);
$self->handle->push_shutdown;
$self->_is_read_open(0);
$self->_is_write_open(0);
$self->close_error($message) if defined $message;
$_->($self, $message) for @{ $self->_finish_cb };
};
$self->handle->on_error($finish);
$self->handle->on_eof($finish);
my $frame = Protocol::WebSocket::Frame->new(
maybe max_payload_size => $self->max_payload_size,
maybe max_fragments_amount => $self->max_fragments,
);
my $read_cb = sub {
my ($handle) = @_;
local $@;
my $strong_self = $self; # preserve $self because otherwise $self can be destroyed in the callbacks
my $success = eval
{
$frame->append($handle->{rbuf});
while(defined(my $body = $frame->next_bytes))
{
next if !$self->_is_read_open; # not 'last' but 'next' in order to consume data in $frame buffer.
my $message = AnyEvent::WebSocket::Message->new(
body => $body,
opcode => $frame->opcode,
);
if($are_callbacks_supposed_to_be_ready)
{
$self->_process_message($message);
}
else
{
push(@temp_messages, $message);
}
}
1; # succeed to parse.
};
if(!$success)
{
$self->_force_shutdown();
$_->($self, $@) for @{ $self->_parse_error_cb };
}
};
( run in 1.877 second using v1.01-cache-2.11-cpan-39bf76dae61 )