AnyEvent-WebSocket-Client
view release on metacpan or search on metacpan
t/anyevent_websocket_connection.t view on Meta::CPAN
};
{
my @test_data = (
{label => "single character", data => "a"},
{label => "5k bytes", data => "a" x 5000},
{label => "empty", data => ""},
{label => "0", data => 0},
{label => "utf8 charaters", data => 'ï¼µï¼´ï¼¦ï¼ ï¼·ï¼©ï¼¤ï¼¥ CHARACTERS'},
);
foreach my $case (@test_data)
{
subtest $case->{label} => sub {
is(
$round_trip->($case->{data}),
object {
call decoded_body => $case->{data};
},
'string'
);
is(
$round_trip->(AnyEvent::WebSocket::Message->new(body => $case->{data})),
object {
call decoded_body => $case->{data};
},
'object'
);
};
}
}
subtest 'close' => sub {
my $done = AnyEvent->condvar;
$y->on(finish => sub {
$done->send;
});
$x->send(
AnyEvent::WebSocket::Message->new(
opcode => 8,
body => pack('naa', 1005, 'b','b'),
),
);
$done->recv;
is(
$y,
object {
call close_code => 1005;
call close_reason => 'bb';
},
);
};
};
subtest 'masked attribute should control whether the frames sent by the Connection are masked or not' => sub {
foreach my $masked (0,1)
{
subtest "masked = $masked" => sub {
my ($x_conn, $y_handle) = create_connection_and_handle({masked => $masked});
my $cv_finish = AnyEvent->condvar;
$y_handle->on_read(sub {
my ($handle) = @_;
return if length($handle->{rbuf}) < 2;
is substr($handle->{rbuf}, 0, 2), pack("C*", 0x81, ($masked ? 0x85 : 0x05)), "frame header OK";
$cv_finish->send;
});
$x_conn->send("Hello");
$cv_finish->recv;
};
}
};
subtest 'Connection should respond to a ping frame with a pong frame' => sub {
my ($x_conn, $y_handle) = create_connection_and_handle;
my $parser = Protocol::WebSocket::Frame->new;
my $cv_finish = AnyEvent->condvar;
$y_handle->on_read(sub {
my ($handle) = @_;
$parser->append($handle->{rbuf});
my $payload = $parser->next_bytes;
return if !defined($payload);
is $parser->opcode, 10, "pong frame received";
is $payload, "foobar", "... payload is identical to what b_handle has sent.";
$cv_finish->send;
});
$y_handle->push_write(Protocol::WebSocket::Frame->new(type => "ping", buffer => "foobar")->to_bytes);
$cv_finish->recv;
};
subtest 'connection close data' => sub {
subtest 'ascii' => sub {
my($x, $y) = create_connection_pair;
my $cv = AnyEvent->condvar;
my $reason;
my $code;
$y->on(finish => sub {
my($con) = @_;
$code = $con->close_code;
$reason = $con->close_reason;
$cv->send;
});
$x->close(1009 => 'anything');
$cv->recv;
is $code, 1009, 'code is available in finish callback';
is $reason, 'anything', 'reason is available in finish callback';
is(
$y,
object {
call close_code => 1009;
call close_reason => 'anything';
},
'connection has finish code and reason',
);
};
subtest 'unicode' => sub {
my($x, $y) = create_connection_pair;
my $cv = AnyEvent->condvar;
my $reason;
my $code;
$y->on(finish => sub {
my($con) = @_;
$code = $con->close_code;
$reason = $con->close_reason;
$cv->send;
});
$x->close(1009 => 'ï¼µï¼´ï¼¦ï¼ ï¼·ï¼©ï¼¤ï¼¥ CHARACTERS');
$cv->recv;
is $code, 1009, 'code is available in finish callback';
is $reason, 'ï¼µï¼´ï¼¦ï¼ ï¼·ï¼©ï¼¤ï¼¥ CHARACTERS', 'reason is available in finish callback';
is(
$y,
object {
call close_code => 1009;
call close_reason => 'ï¼µï¼´ï¼¦ï¼ ï¼·ï¼©ï¼¤ï¼¥ CHARACTERS';
},
'connection has finish code and reason',
);
};
};
subtest 'Connection should not send after sending close frame, should not receive after receiving close frame' => sub {
subtest "it should not send after sending close frame", sub {
my ($x_conn, $y_handle) = create_connection_and_handle;
my $y_received;
my $cv_finish = AnyEvent->condvar;
$cv_finish->begin;
$cv_finish->begin;
$y_handle->on_read(sub { });
$y_handle->on_error(sub {
$y_received = $_[0]->{rbuf};
$_[0]->{rbuf} = "";
$cv_finish->end;
});
$x_conn->on(finish => sub {
$cv_finish->end;
});
$x_conn->close();
$x_conn->send("hoge");
$cv_finish->recv;
my $parser = Protocol::WebSocket::Frame->new();
$parser->append($y_received);
ok defined($parser->next_bytes), "received a complete frame";
ok $parser->is_close, "... and it's a close frame";
ok !defined($parser->next_bytes), "no more frame";
};
my $make_frame = sub {
Protocol::WebSocket::Frame->new(@_)->to_bytes;
};
subtest "it should not receive after receiving close frame", sub {
my ($x_conn, $y_handle) = create_connection_and_handle;
my @received_messages = ();
my $cv_finish = AnyEvent->condvar;
$x_conn->on(each_message => sub { push(@received_messages, $_[1]) });
$x_conn->on(finish => sub { $cv_finish->send });
$y_handle->push_write($make_frame->(type => "close"));
$y_handle->push_write($make_frame->(buffer => "hoge"));
$y_handle->push_shutdown;
$cv_finish->recv;
is scalar(@received_messages), 0, "the message 'hoge' should be discarded"
or diag($received_messages[0]->body);
};
};
subtest 'Connection should respond with close frame to close frame' => sub {
my ($x_conn, $y_handle) = create_connection_and_handle;
my $cv_b_recv = AnyEvent->condvar;
$y_handle->on_error(sub {
my $h = shift;
$cv_b_recv->send($h->{rbuf});
$h->{rbuf} = "";
});
$y_handle->on_read(sub {});
$y_handle->push_write(Protocol::WebSocket::Frame->new(buffer => "", type => "close")->to_bytes);
my $y_recv = $cv_b_recv->recv;
my $parser = Protocol::WebSocket::Frame->new;
$parser->append($y_recv);
ok defined($parser->next_bytes), "received a complete frame";
ok $parser->is_close, "... and it's a close frame";
};
subtest 'Connection should refuse extremely huge messages' => sub {
subtest "Connection should refuse huge frames", sub {
my ($x_conn, $y_handle) = create_connection_and_handle();
my $cv_finish = AnyEvent->condvar;
$cv_finish->begin;
$cv_finish->begin;
my @received_messages = ();
$x_conn->on(finish => sub {
$cv_finish->end;
});
$x_conn->on(each_message => sub {
push(@received_messages, $_[1]);
});
$y_handle->on_error(sub {
my $handle = shift;
$handle->push_shutdown;
$cv_finish->end;
});
$y_handle->on_read(sub { });
my $frame_header = pack("H*", "827f00000000ffffffff"); # frame payload size = 2**32 - 1 bytes
my $MAX_SEND_PAYLOAD = 1024; # for safety
my $count_send_payload = 0;
$y_handle->push_write($frame_header);
$y_handle->on_drain(sub {
my $handle = shift;
$count_send_payload++;
if($count_send_payload >= $MAX_SEND_PAYLOAD)
{
fail("Connection should be aborted by now.");
$handle->on_drain(undef);
$handle->push_shutdown;
$cv_finish->send;
return;
}
# push_write is delayed to prevent deep-recursion and to give
# $x_conn chance to receive data.
my $w; $w = AnyEvent->idle(cb => sub {
undef $w;
$handle->push_write("A" x 256);
});
});
$cv_finish->recv;
is scalar(@received_messages), 0, "the frame is too huge to receive.";
};
subtest "Connection should refuse messages with too many fragments", sub {
my ($x_conn, $y_handle) = create_connection_and_handle;
my $cv_finish = AnyEvent->condvar;
$cv_finish->begin;
$cv_finish->begin;
my @received_messages = ();
$x_conn->on(finish => sub {
$cv_finish->end;
});
$x_conn->on(each_message => sub {
push(@received_messages, $_[1])
});
$y_handle->on_error(sub {
my $handle = shift;
$handle->push_shutdown;
$cv_finish->end;
});
$y_handle->on_read(sub {});
my $MAX_SEND_FRAMES = 10000;
my $count_send_frame = 0;
$y_handle->push_write(Protocol::WebSocket::Frame->new(fin => 0, opcode => 1, buffer => "A")->to_bytes);
$y_handle->on_drain(sub {
my $handle = shift;
$count_send_frame++;
if($count_send_frame >= $MAX_SEND_FRAMES)
{
fail("Connection should be aborted by now.");
$handle->on_drain(undef);
$handle->push_shutdown;
$cv_finish->send;
return;
}
my $w; $w = AnyEvent->idle(cb => sub {
undef $w;
$handle->push_write(Protocol::WebSocket::Frame->new(fin => 0, opcode => 0, buffer => "A")->to_bytes);
});
});
$cv_finish->recv;
is scalar(@received_messages), 0, "the message consists of too many fragments to receive.";
};
};
subtest 'other end is closed' => sub {
my($x,$y) = create_connection_pair;
my $round_trip = sub {
my($message) = @_;
my $done = AnyEvent->condvar;
$y->on(next_message => sub {
my(undef, $message) = @_;
$done->send($message);
});
$x->send($message);
$done->recv;
};
my $closed = 0;
my $quit_cv = AnyEvent->condvar;
$y->on(finish => sub {
$closed = 1;
$quit_cv->send("finished");
});
is(
$round_trip->('a'),
object {
call decoded_body => 'a';
},
'single character',
);
is(
$round_trip->('quit'),
object {
call decoded_body => 'quit';
},
( run in 0.573 second using v1.01-cache-2.11-cpan-df04353d9ac )