AnyEvent-WebSocket-Client

 view release on metacpan or  search on metacpan

Changes  view on Meta::CPAN

0.39      2017-07-07 18:32:02 -0400
  - Require AnyEvent 7.13 or better to address SSL/TLS error (gh#22)
  - Updated example to use Mojo::Redis2 and modern Mojolicious (Kivanc Yazan gh#29)

0.38      2017-01-31 13:11:24 -0500
  - Fix skip in t/mojo_close_codes.t which requires EV to be installed (José Joaquín Atria gh#28)

0.37      2016-11-04 22:24:11 -0400
  - Added max_payload_size to AnyEvent::WebSocket::Client and 
    AnyEvent::WebSocket::Connection for limiting the size of
    received frames
  - Sending payloads larger than 65536 no longer causes an exception
  - Added parse_error to AnyEvent::WebSocket::Connection

0.36      2016-10-27 11:57:12 -0400
  - Production version functionally identical to the 0.35_02 release.

0.35_02   2016-10-25 13:04:54 -0400
  - Added http_headers attribute which allows you to provide additional
    arbitrary headers

Changes  view on Meta::CPAN

0.21      2014-04-08 12:52:51 -0400
  - Perl 5.8 support
    Only useful if you force install Protocol::WebSocket, since it
    has a failing test on 5.8

0.20      2013-11-05 10:19:20 -0500
  - prevent multiple finish callbacks in unusual cases (Toshio Ito gh#15)

0.19      2013-11-04 07:26:19 -0500
  [AE::WS::Connection enhacements (thanks Toshio Ito)]
  - Automatically respond to a close frame with a close frame (RFC6455 5.5.1)
  - Make sure "finish" callbacks are called only once.
  - Automatically shutdown the socket on "finish" event. This makes sure EOF signal is sent to the peer.
  - Refuse to send/receive frames after it sends/receives a close frame. (RFC6455 1.4, 5.5.1)
  - Abort the connection when it receives a too huge message. The size limit is imposed by Protocol::WebSocket::Frame (RFC6455 10.4)
  - Automatically respond to a ping frame with a pong frame of the same payload (RFC6455 5.5.2, 5.5.3)
  - Add "masked" attribute. If set, outgoing frames are masked.
  - Client's Connection now sets masked => true (RFC6455 5.3, 6.1).
  - documentation

0.18      2013-10-21 14:55:47 -0400
  - documentation

0.17      2013-10-16 10:58:41 -0400
  - public API for creating Connection instance (thanks Toshio Ito gh#10)
  - bug fix messages sent by server immediately after connect may have been lost (thanks Toshio Ito gh#12)

README  view on Meta::CPAN

    Will generate:

     X-Foo: bar
     X-Baz: abc
     X-Baz: def

    Although, the order cannot be guaranteed when using the hash style.

 max_payload_size

    The maximum payload size for received frames. Currently defaults to
    whatever Protocol::WebSocket defaults to.

 max_fragments

    The maximum number of fragments for received frames. Currently defaults
    to whatever Protocol::WebSocket defaults to.

 env_proxy

    If you set true to this boolean attribute, it loads proxy settings from
    environment variables. If it finds valid proxy settings, connect method
    will use that proxy.

    Default: false.

lib/AnyEvent/WebSocket/Client.pm  view on Meta::CPAN

Will generate:

 X-Foo: bar
 X-Baz: abc
 X-Baz: def

Although, the order cannot be guaranteed when using the hash style.

=head2 max_payload_size

The maximum payload size for received frames.  Currently defaults to whatever
L<Protocol::WebSocket> defaults to.

=head2 max_fragments

The maximum number of fragments for received frames.  Currently defaults to whatever
L<Protocol::WebSocket> defaults to.

=head2 env_proxy

If you set true to this boolean attribute, it loads proxy settings
from environment variables. If it finds valid proxy settings,
C<connect> method will use that proxy.

Default: false.

lib/AnyEvent/WebSocket/Connection.pm  view on Meta::CPAN

    $self->_is_finished(1);
    $self->handle->push_shutdown;
    $self->_is_read_open(0);
    $self->_is_write_open(0);
    $self->close_error($message) if defined $message;
    $_->($self, $message) for @{ $self->_finish_cb };
  };
  $self->handle->on_error($finish);
  $self->handle->on_eof($finish);

  my $frame = Protocol::WebSocket::Frame->new(
    maybe max_payload_size => $self->max_payload_size,
    maybe max_fragments_amount => $self->max_fragments,
  );

  my $read_cb = sub {
    my ($handle) = @_;
    local $@;
    my $strong_self = $self; # preserve $self because otherwise $self can be destroyed in the callbacks
    my $success = eval
    {
      $frame->append($handle->{rbuf});
      while(defined(my $body = $frame->next_bytes))
      {
        next if !$self->_is_read_open; # not 'last' but 'next' in order to consume data in $frame buffer.
        my $message = AnyEvent::WebSocket::Message->new(
          body   => $body,
          opcode => $frame->opcode,
        );
        if($are_callbacks_supposed_to_be_ready)
        {
          $self->_process_message($message);
        }
        else
        {
          push(@temp_messages, $message);
        }
      }

lib/AnyEvent/WebSocket/Connection.pm  view on Meta::CPAN

  my ($self) = @_;
  $self->handle->push_shutdown;
  $self->_is_write_open(0);
  $self->_is_read_open(0);
}


sub send
{
  my($self, $message) = @_;
  my $frame;

  return $self if !$self->_is_write_open;

  if(ref $message)
  {
    $frame = Protocol::WebSocket::Frame->new(opcode => $message->opcode, buffer => $message->body, masked => $self->masked, max_payload_size => 0);
  }
  else
  {
    $frame = Protocol::WebSocket::Frame->new(buffer => $message, masked => $self->masked, max_payload_size => 0);
  }
  $self->handle->push_write($frame->to_bytes);
  $self;
}


sub _cancel_for
{
  my( $self, $event, $handler ) = @_;

  my $handler_id = Scalar::Util::refaddr($handler);

lib/AnyEvent/WebSocket/Connection.pm  view on Meta::CPAN

=head2 handle

The underlying L<AnyEvent::Handle> object used for the connection.
WebSocket handshake MUST be already completed using this handle.
You should not use the handle directly after creating L<AnyEvent::WebSocket::Connection> object.

Usually only useful for creating server connections, see below.

=head2 masked

If set to true, it masks outgoing frames. The default is false.

=head2 subprotocol

The subprotocol returned by the server.  If no subprotocol was requested, it
may be C<undef>.

=head2 max_payload_size

The maximum payload size for received frames.  Currently defaults to whatever
L<Protocol::WebSocket> defaults to.

=head2 max_fragments

The maximum number of fragments for received frames.  Currently defaults to whatever
L<Protocol::WebSocket> defaults to.

=head2 close_code

If provided by the other side, the code that was provided when the
connection was closed.

=head2 close_reason

If provided by the other side, the reason for closing the connection.

lib/AnyEvent/WebSocket/Connection.pm  view on Meta::CPAN

result in a callback called on the next message instead of the current
one. There was a bug in previous versions where the callback would be
called immediately after current set of callbacks with the same message.

=head3 parse_error

 $cb->($connection, $text_error_message)

Called if there is an error parsing a message sent from the remote end.
After this callback is called, the connection will be closed.
Among other possible errors, this event will trigger if a frame has a
payload which is larger that C<max_payload_size>.

=head3 finish

 $cb->($connection, $message)

Called when the connection is terminated.  If the connection is terminated
due to an error, the message will be provided as the second argument.
On a cleanly closed connection this will be `undef`.

t/anyevent_websocket_client.t  view on Meta::CPAN

      note "version  = " . $opt->{handshake}->version;
      if($opt->{handshake}->req->resource_name =~ /\/count\/(\d+)/)
      { $max = $1 }
      note "max = $max";
    },

    message => sub {  # message
      my $opt = { @_ };
      eval q{
        note "send $counter";
        $opt->{hdl}->push_write($opt->{frame}->new($counter++)->to_bytes);
        if($counter >= $max)
        {
          $opt->{hdl}->push_write($opt->{frame}->new(type => 'close')->to_bytes);
          $opt->{hdl}->push_shutdown;
        }
      };
    },
  );

  $uri->path('/count/10');
  note $uri;

  subtest basic => sub {

t/anyevent_websocket_client.t  view on Meta::CPAN

    });

    eval { $connection->send($data) };
    is $@, '';

    $cv->recv;

  };

  # test the double standard that we can send any sized
  # frame, but will not accept large ones.
  subtest 'receive message > max_payload_size' => sub {

    my $data = 'x' x 65540;

    my $connection = $client->connect($uri)->recv;

    my $cv = AE::cv;
    $connection->on(parse_error => sub {
      my($connection, $error) = @_;
      isnt $error, '', "Error is: $error";

t/anyevent_websocket_client__scope.t  view on Meta::CPAN

use Test::Memory::Cycle;

my $finished = 0;
my $done1 = AnyEvent->condvar;
my $done2 = AnyEvent->condvar;

my $uri = start_server(
  message => sub {
    my $opt = { @_ };

    return if !$opt->{frame}->is_text && !$opt->{frame}->is_binary;

    $opt->{hdl}->push_write($opt->{frame}->new(buffer => $opt->{message}, max_payload_size => 0 )->to_bytes);

  },
  end => sub {
    $finished = 1;
    $done1->send;
  },
);

my $client = AnyEvent::WebSocket::Client->new;
my $connection = $client->connect($uri)->recv;

t/anyevent_websocket_client__ssl.t  view on Meta::CPAN

    note "max = $max";
    note "resource = " . $opt->{handshake}->req->resource_name;
    if($opt->{handshake}->req->resource_name =~ /\/count\/(\d+)/)
    { $max = $1 }
    note "max = $max";
  },
  message => sub {  # message
    my $opt = { @_ };
    eval q{
      note "send $counter";
      $opt->{hdl}->push_write($opt->{frame}->new($counter++)->to_bytes);
      if($counter >= $max)
      {
        $opt->{hdl}->push_write($opt->{frame}->new(type => 'close')->to_bytes);
        $opt->{hdl}->push_shutdown;
      }
    };
  },
);

$uri->path('/count/10');
note $uri;

my $connection = eval { AnyEvent::WebSocket::Client->new( ssl_no_verify => 1 )->connect($uri)->recv };

t/anyevent_websocket_connection.t  view on Meta::CPAN

      $y,
      object {
        call close_code   => 1005;
        call close_reason => 'bb';
      },
    );
  };

};

subtest 'masked attribute should control whether the frames sent by the Connection are masked or not' => sub {

  foreach my $masked (0,1)
  {

    subtest "masked = $masked" => sub {
      my ($x_conn, $y_handle) = create_connection_and_handle({masked => $masked});
      my $cv_finish = AnyEvent->condvar;
      $y_handle->on_read(sub {
        my ($handle) = @_;
        return if length($handle->{rbuf}) < 2;
        is substr($handle->{rbuf}, 0, 2), pack("C*", 0x81, ($masked ? 0x85 : 0x05)), "frame header OK";
        $cv_finish->send;
      });
      $x_conn->send("Hello");
      $cv_finish->recv;
    };

  }

};

subtest 'Connection should respond to a ping frame with a pong frame' => sub {

  my ($x_conn, $y_handle) = create_connection_and_handle;

  my $parser = Protocol::WebSocket::Frame->new;
  my $cv_finish = AnyEvent->condvar;
  $y_handle->on_read(sub {
    my ($handle) = @_;
    $parser->append($handle->{rbuf});
    my $payload = $parser->next_bytes;
    return if !defined($payload);
    is $parser->opcode, 10, "pong frame received";
    is $payload, "foobar", "... payload is identical to what b_handle has sent.";
    $cv_finish->send;
  });
  $y_handle->push_write(Protocol::WebSocket::Frame->new(type => "ping", buffer => "foobar")->to_bytes);

  $cv_finish->recv;
};

subtest 'connection close data' => sub {

t/anyevent_websocket_connection.t  view on Meta::CPAN

      object {
        call close_code   => 1009;
        call close_reason => 'UTF8 WIDE CHARACTERS';
      },
      'connection has finish code and reason',
    );

  };
};

subtest 'Connection should not send after sending close frame, should not receive after receiving close frame' => sub {

  subtest "it should not send after sending close frame", sub {
    my ($x_conn, $y_handle) = create_connection_and_handle;

    my $y_received;
    my $cv_finish = AnyEvent->condvar;
    $cv_finish->begin;
    $cv_finish->begin;
    $y_handle->on_read(sub { });
    $y_handle->on_error(sub {
      $y_received = $_[0]->{rbuf};
      $_[0]->{rbuf} = "";

t/anyevent_websocket_connection.t  view on Meta::CPAN

    });
    $x_conn->on(finish => sub {
      $cv_finish->end;
    });
    $x_conn->close();
    $x_conn->send("hoge");
    $cv_finish->recv;

    my $parser = Protocol::WebSocket::Frame->new();
    $parser->append($y_received);
    ok defined($parser->next_bytes), "received a complete frame";
    ok $parser->is_close, "... and it's a close frame";
    ok !defined($parser->next_bytes), "no more frame";
  };

  my $make_frame = sub {
    Protocol::WebSocket::Frame->new(@_)->to_bytes;
  };

  subtest "it should not receive after receiving close frame", sub {
    my ($x_conn, $y_handle) = create_connection_and_handle;

    my @received_messages = ();
    my $cv_finish = AnyEvent->condvar;
    $x_conn->on(each_message => sub { push(@received_messages, $_[1]) });
    $x_conn->on(finish => sub { $cv_finish->send });
    $y_handle->push_write($make_frame->(type => "close"));
    $y_handle->push_write($make_frame->(buffer => "hoge"));
    $y_handle->push_shutdown;
    $cv_finish->recv;
    is scalar(@received_messages), 0, "the message 'hoge' should be discarded"
        or diag($received_messages[0]->body);
  };

};

subtest 'Connection should respond with close frame to close frame' => sub {

  my ($x_conn, $y_handle) = create_connection_and_handle;

  my $cv_b_recv = AnyEvent->condvar;
  $y_handle->on_error(sub {
    my $h = shift;
    $cv_b_recv->send($h->{rbuf});
    $h->{rbuf} = "";
  });
  $y_handle->on_read(sub {});
  $y_handle->push_write(Protocol::WebSocket::Frame->new(buffer => "", type => "close")->to_bytes);

  my $y_recv = $cv_b_recv->recv;
  my $parser = Protocol::WebSocket::Frame->new;
  $parser->append($y_recv);
  ok defined($parser->next_bytes), "received a complete frame";
  ok $parser->is_close, "... and it's a close frame";

};

subtest 'Connection should refuse extremely huge messages' => sub {

  subtest "Connection should refuse huge frames", sub {

    my ($x_conn, $y_handle) = create_connection_and_handle();
    my $cv_finish = AnyEvent->condvar;
    $cv_finish->begin;
    $cv_finish->begin;
    my @received_messages = ();
    $x_conn->on(finish => sub {
      $cv_finish->end;
    });
    $x_conn->on(each_message => sub {
      push(@received_messages, $_[1]);
    });
    $y_handle->on_error(sub {
      my $handle = shift;
      $handle->push_shutdown;
      $cv_finish->end;
    });
    $y_handle->on_read(sub { });

    my $frame_header = pack("H*", "827f00000000ffffffff"); # frame payload size = 2**32 - 1 bytes
    my $MAX_SEND_PAYLOAD = 1024; # for safety
    my $count_send_payload = 0;
    $y_handle->push_write($frame_header);
    $y_handle->on_drain(sub {
      my $handle = shift;
      $count_send_payload++;
      if($count_send_payload >= $MAX_SEND_PAYLOAD)
      {
        fail("Connection should be aborted by now.");
        $handle->on_drain(undef);
        $handle->push_shutdown;
        $cv_finish->send;
        return;

t/anyevent_websocket_connection.t  view on Meta::CPAN


      # push_write is delayed to prevent deep-recursion and to give
      # $x_conn chance to receive data.
      my $w; $w = AnyEvent->idle(cb => sub {
        undef $w;
        $handle->push_write("A" x 256);
      });
    });
    $cv_finish->recv;

    is scalar(@received_messages), 0, "the frame is too huge to receive.";
  };


  subtest "Connection should refuse messages with too many fragments", sub {
    my ($x_conn, $y_handle) = create_connection_and_handle;
    my $cv_finish = AnyEvent->condvar;
    $cv_finish->begin;
    $cv_finish->begin;
    my @received_messages = ();
    $x_conn->on(finish => sub {

t/anyevent_websocket_connection.t  view on Meta::CPAN

      push(@received_messages, $_[1])
    });
    $y_handle->on_error(sub {
      my $handle = shift;
      $handle->push_shutdown;
      $cv_finish->end;
    });
    $y_handle->on_read(sub {});

    my $MAX_SEND_FRAMES = 10000;
    my $count_send_frame = 0;
    $y_handle->push_write(Protocol::WebSocket::Frame->new(fin => 0, opcode => 1, buffer => "A")->to_bytes);
    $y_handle->on_drain(sub {
      my $handle = shift;
      $count_send_frame++;
      if($count_send_frame >= $MAX_SEND_FRAMES)
      {
        fail("Connection should be aborted by now.");
        $handle->on_drain(undef);
        $handle->push_shutdown;
        $cv_finish->send;
        return;
      }
      my $w; $w = AnyEvent->idle(cb => sub {
        undef $w;
        $handle->push_write(Protocol::WebSocket::Frame->new(fin => 0, opcode => 0, buffer => "A")->to_bytes);

t/anyevent_websocket_connection__payload_size.t  view on Meta::CPAN

my $connection;

{
  my @messages;
  my $message_cv = AE::cv;
  my $handle;

  ($connection, $handle) = create_connection_and_handle({ max_payload_size => 65538});
  note "connection.max_payload_size = @{[ $connection->max_payload_size ]}";

  my $frame = Protocol::WebSocket::Frame->new( max_payload_size => 0 );
  $handle->on_read(sub {
    #my($handle) = @_;
    $frame->append($handle->{rbuf});
    while(defined(my $body = $frame->next_bytes))
    {
      push @messages, AnyEvent::WebSocket::Message->new(
        body   => $body,
        opcode => $frame->opcode,
      );
      $message_cv->send;
    }
  });

  sub get_next_message
  {
    $message_cv->recv;
    $message_cv = AE::cv;
    shift @messages;

t/anyevent_websocket_connection__payload_size.t  view on Meta::CPAN

  {
    my($body,$cb) = @_;
    my $cv = AE::cv;
    if($cb)
    {
      $connection->on(next_message => sub {
        $cb->(@_);
        $cv->send;
      });
    }
    my $frame = Protocol::WebSocket::Frame->new(
      max_payload_size => 0,
      buffer => $body,
    );
    $handle->push_write($frame->to_bytes);
    $cv->recv if $cb;
  }

}

subtest 'send payload with size > 65536' => sub {

  my $data = 'x' x 65537;

  subtest 'plain string' => sub {

t/lib/Test2/Tools/WebSocket/Server.pm  view on Meta::CPAN


sub start_server
{
  my $opt = { @_ };
  $opt->{handshake} ||= sub {};
  $opt->{customize_server_response} ||= sub {};
  my $server_cv = AnyEvent->condvar;

  tcp_server undef, undef, sub {
    my $handshake = Protocol::WebSocket::Handshake::Server->new;
    my $frame     = Protocol::WebSocket::Frame->new( max_payload_size => 0 );

    my $hdl = AnyEvent::Handle->new(
      $opt->{tls} ? (tls => 'accept', tls_ctx => $opt->{tls}) : (),
      fh => shift,
      on_eof => sub {
        my $ctx = context();
        $ctx->note("on_eof called.");
        $ctx->release;
        $opt->{end}->() if $opt->{end};
      },

t/lib/Test2/Tools/WebSocket/Server.pm  view on Meta::CPAN

          $handshake->parse($chunk);
          if($handshake->is_done)
          {
            $opt->{customize_server_response}->($handshake);
            $hdl->push_write($handshake->to_string);
            $opt->{handshake}->(handshake => $handshake, hdl => $hdl);
          }
          return;
        }

        $frame->append($chunk);

        while(defined(my $message = $frame->next))
        {
          $opt->{message}->(frame => $frame, message => $message, hdl => $hdl);
        }
      }
    );
  }, sub {
    my($fh, $host, $port) = @_;
    $server_cv->send($port);
  };

  my $port = $server_cv->recv;

t/lib/Test2/Tools/WebSocket/Server.pm  view on Meta::CPAN

  $ctx->note("$uri");
  $ctx->release;
  $uri;
}

sub start_echo
{
  start_server(message => sub {
    my $opt = { @_ };

    return if !$opt->{frame}->is_text && !$opt->{frame}->is_binary;


    $opt->{hdl}->push_write($opt->{frame}->new(buffer => $opt->{message}, max_payload_size => 0 )->to_bytes);

    if($opt->{message} eq 'quit')
    {
      $opt->{hdl}->push_write($opt->{frame}->new(type => 'close')->to_bytes);
      $opt->{hdl}->push_shutdown;
    }
  });
}

1;



( run in 1.998 second using v1.01-cache-2.11-cpan-df04353d9ac )