AnyEvent-WebSocket-Client

 view release on metacpan or  search on metacpan

lib/AnyEvent/WebSocket/Connection.pm  view on Meta::CPAN

  default => sub { 0 },
);


has subprotocol => (
  is => 'ro',
);


has max_payload_size => (
  is => 'ro',
);


has max_fragments => (
  is => 'ro',
);


has close_code => (
  is => 'rw',
);


has close_reason => (
  is => 'rw',
);


has close_error => (
  is => 'rw',
);

foreach my $type (qw( each_message next_message finish parse_error ))
{
  has "_${type}_cb" => (
    is       => 'ro',
    init_arg => undef,
    default  => sub { [] },
  );
}

foreach my $flag (qw( _is_read_open _is_write_open ))
{
  has $flag => (
    is       => 'rw',
    init_arg => undef,
    default  => sub { 1 },
  );
}

has "_is_finished" => (
  is       => 'rw',
  init_arg => undef,
  default  => sub { 0 },
);

sub BUILD
{
  my $self = shift;
  Scalar::Util::weaken $self;

  my @temp_messages = ();
  my $are_callbacks_supposed_to_be_ready = 0;

  my $finish = sub {
    my(undef, undef, $message) = @_;
    my $strong_self = $self; # preserve $self because otherwise $self can be destroyed in the callbacks.
    return if $self->_is_finished;
    eval
    {
      $self->_process_message($_) foreach @temp_messages;
    };
    @temp_messages = ();
    $self->_is_finished(1);
    $self->handle->push_shutdown;
    $self->_is_read_open(0);
    $self->_is_write_open(0);
    $self->close_error($message) if defined $message;
    $_->($self, $message) for @{ $self->_finish_cb };
  };
  $self->handle->on_error($finish);
  $self->handle->on_eof($finish);

  my $frame = Protocol::WebSocket::Frame->new(
    maybe max_payload_size => $self->max_payload_size,
    maybe max_fragments_amount => $self->max_fragments,
  );

  my $read_cb = sub {
    my ($handle) = @_;
    local $@;
    my $strong_self = $self; # preserve $self because otherwise $self can be destroyed in the callbacks
    my $success = eval
    {
      $frame->append($handle->{rbuf});
      while(defined(my $body = $frame->next_bytes))
      {
        next if !$self->_is_read_open; # not 'last' but 'next' in order to consume data in $frame buffer.
        my $message = AnyEvent::WebSocket::Message->new(
          body   => $body,
          opcode => $frame->opcode,
        );
        if($are_callbacks_supposed_to_be_ready)
        {
          $self->_process_message($message);
        }
        else
        {
          push(@temp_messages, $message);
        }
      }
      1; # succeed to parse.
    };
    if(!$success)
    {
      $self->_force_shutdown();
      $_->($self, $@) for @{ $self->_parse_error_cb };
    }
  };



( run in 1.877 second using v1.01-cache-2.11-cpan-39bf76dae61 )