AnyEvent-WebSocket-Client
view release on metacpan or search on metacpan
lib/AnyEvent/WebSocket/Connection.pm view on Meta::CPAN
has subprotocol => (
is => 'ro',
);
has max_payload_size => (
is => 'ro',
);
has max_fragments => (
is => 'ro',
);
has close_code => (
is => 'rw',
);
has close_reason => (
is => 'rw',
);
has close_error => (
is => 'rw',
);
foreach my $type (qw( each_message next_message finish parse_error ))
{
has "_${type}_cb" => (
is => 'ro',
init_arg => undef,
default => sub { [] },
);
}
foreach my $flag (qw( _is_read_open _is_write_open ))
{
has $flag => (
is => 'rw',
init_arg => undef,
default => sub { 1 },
);
}
has "_is_finished" => (
is => 'rw',
init_arg => undef,
default => sub { 0 },
);
sub BUILD
{
my $self = shift;
Scalar::Util::weaken $self;
my @temp_messages = ();
my $are_callbacks_supposed_to_be_ready = 0;
my $finish = sub {
my(undef, undef, $message) = @_;
my $strong_self = $self; # preserve $self because otherwise $self can be destroyed in the callbacks.
return if $self->_is_finished;
eval
{
$self->_process_message($_) foreach @temp_messages;
};
@temp_messages = ();
$self->_is_finished(1);
$self->handle->push_shutdown;
$self->_is_read_open(0);
$self->_is_write_open(0);
$self->close_error($message) if defined $message;
$_->($self, $message) for @{ $self->_finish_cb };
};
$self->handle->on_error($finish);
$self->handle->on_eof($finish);
my $frame = Protocol::WebSocket::Frame->new(
maybe max_payload_size => $self->max_payload_size,
maybe max_fragments_amount => $self->max_fragments,
);
my $read_cb = sub {
my ($handle) = @_;
local $@;
my $strong_self = $self; # preserve $self because otherwise $self can be destroyed in the callbacks
my $success = eval
{
$frame->append($handle->{rbuf});
while(defined(my $body = $frame->next_bytes))
{
next if !$self->_is_read_open; # not 'last' but 'next' in order to consume data in $frame buffer.
my $message = AnyEvent::WebSocket::Message->new(
body => $body,
opcode => $frame->opcode,
);
if($are_callbacks_supposed_to_be_ready)
{
$self->_process_message($message);
}
else
{
push(@temp_messages, $message);
}
}
1; # succeed to parse.
};
if(!$success)
{
$self->_force_shutdown();
$_->($self, $@) for @{ $self->_parse_error_cb };
}
};
# Message processing (calling _process_message) is delayed by
# $are_callbacks_supposed_to_be_ready flag. This is necessary to
# make sure all received messages are delivered to each_message and
# next_message callbacks. If there is some data in rbuf, changing
# the on_read callback makes the callback fire, but there is of
# course no each_message/next_message callback to receive the
# message yet. So we put messages to @temp_messages for a
# while. After the control is returned to the user, who sets up
# each_message/next_message callbacks, @temp_messages are processed.
# An alternative approach would be temporarily disabling on_read by
# $self->handle->on_read(undef). However, this can cause a weird
# situation in TLS mode, because on_eof can fire even if we don't
# have any on_read (
# https://metacpan.org/pod/AnyEvent::Handle#I-get-different-callback-invocations-in-TLS-mode-Why-cant-I-pause-reading
# )
$self->handle->on_read($read_cb);
my $idle_w; $idle_w = AE::idle sub {
undef $idle_w;
if(defined($self))
{
my $strong_self = $self;
$are_callbacks_supposed_to_be_ready = 1;
local $@;
my $success = eval
{
$self->_process_message($_) foreach @temp_messages;
1;
};
@temp_messages = ();
if(!$success)
{
$self->_force_shutdown();
}
}
};
}
sub _process_message
{
my ($self, $received_message) = @_;
return if !$self->_is_read_open;
if($received_message->is_text || $received_message->is_binary)
{
# make a copy in order to allow specifying new callbacks inside the
# currently executed next_callback itself. otherwise, any next_callback
# added inside the currently executed callback would be added to the end
# of the array and executed for the currently processed message instead of
# actually the next one.
my @next_callbacks = @{ $self->_next_message_cb };
@{ $self->_next_message_cb } = ();
$_->($self, $received_message) for @next_callbacks;
# make a copy in case one of the callbacks get
# unregistered in the middle of the loop
my @callbacks = @{ $self->_each_message_cb };
$_->($self, $received_message, $self->_cancel_for(each_message => $_) )
for @callbacks;
}
elsif($received_message->is_close)
{
my $body = $received_message->body;
if($body)
{
my($code, $reason) = unpack 'na*', $body;
$self->close_code($code);
$self->close_reason(Encode::decode('UTF-8', $reason));
}
$self->_is_read_open(0);
$self->close();
}
elsif($received_message->is_ping)
{
$self->send(AnyEvent::WebSocket::Message->new(opcode => 10, body => $received_message->body));
}
}
sub _force_shutdown
{
my ($self) = @_;
$self->handle->push_shutdown;
$self->_is_write_open(0);
$self->_is_read_open(0);
}
sub send
{
my($self, $message) = @_;
my $frame;
return $self if !$self->_is_write_open;
if(ref $message)
{
$frame = Protocol::WebSocket::Frame->new(opcode => $message->opcode, buffer => $message->body, masked => $self->masked, max_payload_size => 0);
}
else
{
$frame = Protocol::WebSocket::Frame->new(buffer => $message, masked => $self->masked, max_payload_size => 0);
}
$self->handle->push_write($frame->to_bytes);
$self;
}
sub _cancel_for
{
my( $self, $event, $handler ) = @_;
my $handler_id = Scalar::Util::refaddr($handler);
return sub {
my $accessor = "_${event}_cb";
@{ $self->$accessor } = grep { Scalar::Util::refaddr($_) != $handler_id }
@{ $self->$accessor };
};
}
lib/AnyEvent/WebSocket/Connection.pm view on Meta::CPAN
If provided by the other side, the code that was provided when the
connection was closed.
=head2 close_reason
If provided by the other side, the reason for closing the connection.
=head2 close_error
If the connection is closed due to a network error, this will hold the
message.
=head1 METHODS
=head2 send
$connection->send($message);
Send a message to the other side. C<$message> may either be a string
(in which case a text message will be sent), or an instance of
L<AnyEvent::WebSocket::Message>.
=head2 on
$connection->on(each_message => $cb);
$connection->on(each_message => $cb);
$connection->on(finish => $cb);
Register a callback to a particular event.
For each event C<$connection> is the L<AnyEvent::WebSocket::Connection> and
and C<$message> is an L<AnyEvent::WebSocket::Message> (if available).
Returns a coderef that unregisters the callback when invoked.
my $cancel = $connection->on( each_message => sub { ... });
# later on...
$cancel->();
=head3 each_message
$cb->($connection, $message, $unregister)
Called each time a message is received from the WebSocket.
C<$unregister> is a coderef that removes this callback from
the active listeners when invoked.
=head3 next_message
$cb->($connection, $message)
Called only for the next message received from the WebSocket.
[0.49]
Adding a next_message callback from within a next_message callback will
result in a callback called on the next message instead of the current
one. There was a bug in previous versions where the callback would be
called immediately after current set of callbacks with the same message.
=head3 parse_error
$cb->($connection, $text_error_message)
Called if there is an error parsing a message sent from the remote end.
After this callback is called, the connection will be closed.
Among other possible errors, this event will trigger if a frame has a
payload which is larger that C<max_payload_size>.
=head3 finish
$cb->($connection, $message)
Called when the connection is terminated. If the connection is terminated
due to an error, the message will be provided as the second argument.
On a cleanly closed connection this will be `undef`.
=head2 close
$connection->close;
$connection->close($code);
$connection->close($code, $reason);
Close the connection. You may optionally provide a code and a reason.
See L<section 5.5.1|https://tools.ietf.org/html/rfc6455#section-5.5.1> and L<section 7.4.1|https://tools.ietf.org/html/rfc6455#section-7.4.1> of RFC6455.
The code is a 16-bit unsigned integer value that indicates why you close the connection. By default the code is 1000.
The reason is a character string (not an octet string) that further describes why you close the connection. By default the reason is an empty string.
=head1 SERVER CONNECTIONS
Although written originally to work with L<AnyEvent::WebSocket::Client>,
this class was designed to be used for either client or server WebSocket
connections. For details, contact the author and/or take a look at the
source for L<AnyEvent::WebSocket::Client> and the examples that come with
L<Protocol::WebSocket>.
=head1 SEE ALSO
=over 4
=item *
L<AnyEvent::WebSocket::Client>
=item *
L<AnyEvent::WebSocket::Message>
=item *
L<AnyEvent::WebSocket::Server>
=item *
L<AnyEvent>
=item *
( run in 0.456 second using v1.01-cache-2.11-cpan-39bf76dae61 )