AnyEvent-WebSocket-Client

 view release on metacpan or  search on metacpan

t/anyevent_websocket_connection.t  view on Meta::CPAN

  };

  {
    my @test_data = (
      {label => "single character", data => "a"},
      {label => "5k bytes", data => "a" x 5000},
      {label => "empty", data => ""},
      {label => "0", data => 0},
      {label => "utf8 charaters", data => 'UTF8 WIDE CHARACTERS'},
    );

    foreach my $case (@test_data)
    {
      subtest $case->{label} => sub {
        is(
          $round_trip->($case->{data}),
          object {
            call decoded_body => $case->{data};
          },
          'string'
        );
        is(
          $round_trip->(AnyEvent::WebSocket::Message->new(body => $case->{data})),
          object {
            call decoded_body => $case->{data};
          },
          'object'
        );
      };
    }
  }

  subtest 'close' => sub {

    my $done = AnyEvent->condvar;

    $y->on(finish => sub {
      $done->send;
    });

    $x->send(
      AnyEvent::WebSocket::Message->new(
        opcode => 8,
        body   => pack('naa', 1005, 'b','b'),
      ),
    );

    $done->recv;

    is(
      $y,
      object {
        call close_code   => 1005;
        call close_reason => 'bb';
      },
    );
  };

};

subtest 'masked attribute should control whether the frames sent by the Connection are masked or not' => sub {

  foreach my $masked (0,1)
  {

    subtest "masked = $masked" => sub {
      my ($x_conn, $y_handle) = create_connection_and_handle({masked => $masked});
      my $cv_finish = AnyEvent->condvar;
      $y_handle->on_read(sub {
        my ($handle) = @_;
        return if length($handle->{rbuf}) < 2;
        is substr($handle->{rbuf}, 0, 2), pack("C*", 0x81, ($masked ? 0x85 : 0x05)), "frame header OK";
        $cv_finish->send;
      });
      $x_conn->send("Hello");
      $cv_finish->recv;
    };

  }

};

subtest 'Connection should respond to a ping frame with a pong frame' => sub {

  my ($x_conn, $y_handle) = create_connection_and_handle;

  my $parser = Protocol::WebSocket::Frame->new;
  my $cv_finish = AnyEvent->condvar;
  $y_handle->on_read(sub {
    my ($handle) = @_;
    $parser->append($handle->{rbuf});
    my $payload = $parser->next_bytes;
    return if !defined($payload);
    is $parser->opcode, 10, "pong frame received";
    is $payload, "foobar", "... payload is identical to what b_handle has sent.";
    $cv_finish->send;
  });
  $y_handle->push_write(Protocol::WebSocket::Frame->new(type => "ping", buffer => "foobar")->to_bytes);

  $cv_finish->recv;
};

subtest 'connection close data' => sub {

  subtest 'ascii' => sub {

    my($x, $y) = create_connection_pair;

    my $cv = AnyEvent->condvar;
    my $reason;
    my $code;

    $y->on(finish => sub {
      my($con) = @_;
      $code   = $con->close_code;
      $reason = $con->close_reason;
      $cv->send;
    });

    $x->close(1009 => 'anything');

    $cv->recv;

    is $code,   1009,       'code is available in finish callback';
    is $reason, 'anything', 'reason is available in finish callback';

    is(
      $y,
      object {
        call close_code   => 1009;
        call close_reason => 'anything';
      },
      'connection has finish code and reason',
    );
  };

  subtest 'unicode' => sub {

    my($x, $y) = create_connection_pair;

    my $cv = AnyEvent->condvar;
    my $reason;
    my $code;

    $y->on(finish => sub {
      my($con) = @_;
      $code   = $con->close_code;
      $reason = $con->close_reason;
      $cv->send;
    });

    $x->close(1009 => 'UTF8 WIDE CHARACTERS');

    $cv->recv;

    is $code,   1009,                                     'code is available in finish callback';
    is $reason, 'UTF8 WIDE CHARACTERS', 'reason is available in finish callback';

    is(
      $y,
      object {
        call close_code   => 1009;
        call close_reason => 'UTF8 WIDE CHARACTERS';
      },
      'connection has finish code and reason',
    );

  };
};

subtest 'Connection should not send after sending close frame, should not receive after receiving close frame' => sub {

  subtest "it should not send after sending close frame", sub {
    my ($x_conn, $y_handle) = create_connection_and_handle;

    my $y_received;
    my $cv_finish = AnyEvent->condvar;
    $cv_finish->begin;
    $cv_finish->begin;
    $y_handle->on_read(sub { });
    $y_handle->on_error(sub {
      $y_received = $_[0]->{rbuf};
      $_[0]->{rbuf} = "";
      $cv_finish->end;
    });
    $x_conn->on(finish => sub {
      $cv_finish->end;
    });
    $x_conn->close();
    $x_conn->send("hoge");
    $cv_finish->recv;

    my $parser = Protocol::WebSocket::Frame->new();
    $parser->append($y_received);
    ok defined($parser->next_bytes), "received a complete frame";
    ok $parser->is_close, "... and it's a close frame";
    ok !defined($parser->next_bytes), "no more frame";
  };

  my $make_frame = sub {
    Protocol::WebSocket::Frame->new(@_)->to_bytes;
  };

  subtest "it should not receive after receiving close frame", sub {
    my ($x_conn, $y_handle) = create_connection_and_handle;

    my @received_messages = ();
    my $cv_finish = AnyEvent->condvar;
    $x_conn->on(each_message => sub { push(@received_messages, $_[1]) });
    $x_conn->on(finish => sub { $cv_finish->send });
    $y_handle->push_write($make_frame->(type => "close"));
    $y_handle->push_write($make_frame->(buffer => "hoge"));
    $y_handle->push_shutdown;
    $cv_finish->recv;
    is scalar(@received_messages), 0, "the message 'hoge' should be discarded"
        or diag($received_messages[0]->body);
  };

};

subtest 'Connection should respond with close frame to close frame' => sub {

  my ($x_conn, $y_handle) = create_connection_and_handle;

  my $cv_b_recv = AnyEvent->condvar;
  $y_handle->on_error(sub {
    my $h = shift;
    $cv_b_recv->send($h->{rbuf});
    $h->{rbuf} = "";
  });
  $y_handle->on_read(sub {});
  $y_handle->push_write(Protocol::WebSocket::Frame->new(buffer => "", type => "close")->to_bytes);

  my $y_recv = $cv_b_recv->recv;
  my $parser = Protocol::WebSocket::Frame->new;
  $parser->append($y_recv);
  ok defined($parser->next_bytes), "received a complete frame";
  ok $parser->is_close, "... and it's a close frame";

};

subtest 'Connection should refuse extremely huge messages' => sub {

  subtest "Connection should refuse huge frames", sub {

    my ($x_conn, $y_handle) = create_connection_and_handle();
    my $cv_finish = AnyEvent->condvar;
    $cv_finish->begin;
    $cv_finish->begin;
    my @received_messages = ();
    $x_conn->on(finish => sub {
      $cv_finish->end;
    });
    $x_conn->on(each_message => sub {
      push(@received_messages, $_[1]);
    });
    $y_handle->on_error(sub {
      my $handle = shift;
      $handle->push_shutdown;
      $cv_finish->end;
    });
    $y_handle->on_read(sub { });

    my $frame_header = pack("H*", "827f00000000ffffffff"); # frame payload size = 2**32 - 1 bytes
    my $MAX_SEND_PAYLOAD = 1024; # for safety
    my $count_send_payload = 0;
    $y_handle->push_write($frame_header);
    $y_handle->on_drain(sub {
      my $handle = shift;
      $count_send_payload++;
      if($count_send_payload >= $MAX_SEND_PAYLOAD)
      {
        fail("Connection should be aborted by now.");
        $handle->on_drain(undef);
        $handle->push_shutdown;
        $cv_finish->send;
        return;
      }

      # push_write is delayed to prevent deep-recursion and to give
      # $x_conn chance to receive data.
      my $w; $w = AnyEvent->idle(cb => sub {
        undef $w;
        $handle->push_write("A" x 256);
      });
    });
    $cv_finish->recv;

    is scalar(@received_messages), 0, "the frame is too huge to receive.";
  };


  subtest "Connection should refuse messages with too many fragments", sub {
    my ($x_conn, $y_handle) = create_connection_and_handle;
    my $cv_finish = AnyEvent->condvar;
    $cv_finish->begin;
    $cv_finish->begin;
    my @received_messages = ();
    $x_conn->on(finish => sub {
      $cv_finish->end;
    });
    $x_conn->on(each_message => sub {
      push(@received_messages, $_[1])
    });
    $y_handle->on_error(sub {
      my $handle = shift;
      $handle->push_shutdown;
      $cv_finish->end;
    });
    $y_handle->on_read(sub {});

    my $MAX_SEND_FRAMES = 10000;
    my $count_send_frame = 0;
    $y_handle->push_write(Protocol::WebSocket::Frame->new(fin => 0, opcode => 1, buffer => "A")->to_bytes);
    $y_handle->on_drain(sub {
      my $handle = shift;
      $count_send_frame++;
      if($count_send_frame >= $MAX_SEND_FRAMES)
      {
        fail("Connection should be aborted by now.");
        $handle->on_drain(undef);
        $handle->push_shutdown;
        $cv_finish->send;
        return;
      }
      my $w; $w = AnyEvent->idle(cb => sub {
        undef $w;
        $handle->push_write(Protocol::WebSocket::Frame->new(fin => 0, opcode => 0, buffer => "A")->to_bytes);
      });
    });
    $cv_finish->recv;

    is scalar(@received_messages), 0, "the message consists of too many fragments to receive.";
  };
};

subtest 'other end is closed' => sub {

  my($x,$y) = create_connection_pair;

  my $round_trip = sub {

    my($message) = @_;

    my $done = AnyEvent->condvar;

    $y->on(next_message => sub {
      my(undef, $message) = @_;
      $done->send($message);
    });

    $x->send($message);

    $done->recv;

  };

  my $closed = 0;

  my $quit_cv = AnyEvent->condvar;
  $y->on(finish => sub {
    $closed = 1;
    $quit_cv->send("finished");
  });

  is(
    $round_trip->('a'),
    object {
      call decoded_body => 'a';
    },
    'single character',
  );

  is(
    $round_trip->('quit'),
    object {
      call decoded_body => 'quit';
    },



( run in 0.573 second using v1.01-cache-2.11-cpan-df04353d9ac )