MCP
view release on metacpan or search on metacpan
t/streaming.t view on Meta::CPAN
use Mojo::Base -strict, -signatures;
use Test::More;
use Mojolicious::Lite;
use Test::Mojo;
use Mojo::IOLoop;
use Mojo::JSON qw(from_json true);
use Mojo::Promise;
use MCP::Client;
use MCP::Server;
my $server = MCP::Server->new;
$server->tool(
name => 'push_log',
code => sub ($tool, $args) {
$tool->context->notify('notifications/message', {level => 'info', data => 'hello stream'});
return 'pushed';
}
);
$server->tool(
name => 'notify_status',
code => sub ($tool, $args) {
my $sent = $tool->context->notify('notifications/message', {data => 'x'});
return $sent ? 'sent' : 'no stream';
}
);
$server->tool(
name => 'progress',
code => sub ($tool, $args) {
my $sent = $tool->context->notify_progress(1, 2, 'halfway');
return $sent ? 'sent' : 'no token';
}
);
$server->tool(
name => 'async_progress',
code => sub ($tool, $args) {
my $context = $tool->context;
my $promise = Mojo::Promise->new;
Mojo::IOLoop->timer(
0.1 => sub {
$context->notify_progress(1, 2, 'late');
$promise->resolve('done');
}
);
return $promise;
}
);
any '/mcp' => $server->to_action({streaming => 1, heartbeat => 0, session_timeout => 0.5});
my $t = Test::Mojo->new;
subtest 'No session' => sub {
$t->get_ok('/mcp')->status_is(400)->json_is('/error' => 'Missing session ID');
$t->delete_ok('/mcp')->status_is(400)->json_is('/error' => 'Missing session ID');
};
subtest 'Unknown session' => sub {
$t->get_ok('/mcp' => {'Mcp-Session-Id' => 'nope'})->status_is(404);
$t->delete_ok('/mcp' => {'Mcp-Session-Id' => 'nope'})->status_is(404);
my $client = MCP::Client->new(ua => $t->ua, url => $t->ua->server->url->path('/mcp'));
eval { $client->session_id('nope'); $client->ping };
like $@, qr/404 response/, 'POST with unknown session is rejected';
};
subtest 'List changed' => sub {
my $client = MCP::Client->new(ua => $t->ua, url => $t->ua->server->url->path('/mcp'));
my $caps = $client->initialize_session->{capabilities};
is $caps->{tools}{listChanged}, true, 'tools listChanged advertised';
is $caps->{prompts}{listChanged}, true, 'prompts listChanged advertised';
is $caps->{resources}{listChanged}, true, 'resources listChanged advertised';
my $got_notification = Mojo::Promise->new;
my $msg;
my $url = $t->ua->server->url->path('/mcp');
my $tx = $t->ua->build_tx(GET => $url => {Accept => 'text/event-stream', 'Mcp-Session-Id' => $client->session_id});
$tx->res->content->on(
sse => sub ($content, $event = undef) {
return if $msg;
return unless $event && $event->{text} && (my $parsed = eval { from_json($event->{text}) });
$msg = $parsed;
$got_notification->resolve;
}
);
$t->ua->start_p($tx)->catch(sub { });
Mojo::IOLoop->one_tick until $tx->res->code || $tx->error;
ok $server->notify_list_changed('tools'), 'broadcast attempted';
$got_notification->timeout(5)->wait;
is $msg->{jsonrpc}, '2.0', 'JSON-RPC version';
is $msg->{method}, 'notifications/tools/list_changed', 'notification method';
$client->delete_session;
};
subtest 'List changed (no streams)' => sub {
ok $server->notify_list_changed('prompts'), 'broadcast attempted';
};
subtest 'Bidirectional flow' => sub {
my $client = MCP::Client->new(ua => $t->ua, url => $t->ua->server->url->path('/mcp'));
$client->initialize_session;
ok $client->session_id, 'session id set';
my $got_notification = Mojo::Promise->new;
my $msg;
my $url = $t->ua->server->url->path('/mcp');
my $tx = $t->ua->build_tx(GET => $url => {Accept => 'text/event-stream', 'Mcp-Session-Id' => $client->session_id});
t/streaming.t view on Meta::CPAN
$client->initialize_session;
my $session_id = $client->session_id;
my $url = $t->ua->server->url->path('/mcp');
my $tx = $t->ua->build_tx(GET => $url => {Accept => 'text/event-stream', 'Mcp-Session-Id' => $session_id});
$t->ua->start_p($tx)->catch(sub { });
Mojo::IOLoop->one_tick until $tx->res->code || $tx->error;
ok $transport->sessions->{$session_id}->stream, 'stream registered';
my $closed = Mojo::Promise->new;
$tx->on(finish => sub { $closed->resolve });
$transport->sessions->{$session_id}->stream->finish;
$closed->timeout(5)->wait;
ok !$transport->sessions->{$session_id}->stream, 'stream cleared on finish';
$client->delete_session;
};
subtest 'Heartbeat' => sub {
my $transport = $server->transport;
$transport->heartbeat(1);
my $client = MCP::Client->new(ua => $t->ua, url => $t->ua->server->url->path('/mcp'));
$client->initialize_session;
my $session_id = $client->session_id;
my $url = $t->ua->server->url->path('/mcp');
my $tx = $t->ua->build_tx(GET => $url => {Accept => 'text/event-stream', 'Mcp-Session-Id' => $session_id});
$t->ua->start_p($tx)->catch(sub { });
Mojo::IOLoop->one_tick until $tx->res->code || $tx->error;
is $tx->res->code, 200, 'stream open';
# SSE parser strips comments
my $bytes = '';
Mojo::IOLoop->stream($tx->connection)->on(read => sub ($stream, $chunk) { $bytes .= $chunk });
my $deadline = Mojo::Promise->new;
Mojo::IOLoop->timer(1.5 => sub { $deadline->resolve });
$deadline->wait;
like $bytes, qr/: keepalive/, 'heartbeat sent';
$transport->heartbeat(0);
$client->delete_session;
};
subtest 'Session expiration' => sub {
my $sessions = $server->transport->sessions;
my $idle = MCP::Client->new(ua => $t->ua, url => $t->ua->server->url->path('/mcp'));
$idle->initialize_session;
my $idle_id = $idle->session_id;
my $open = MCP::Client->new(ua => $t->ua, url => $t->ua->server->url->path('/mcp'));
$open->initialize_session;
my $open_id = $open->session_id;
my $url = $t->ua->server->url->path('/mcp');
my $tx = $t->ua->build_tx(GET => $url => {Accept => 'text/event-stream', 'Mcp-Session-Id' => $open_id});
$t->ua->start_p($tx)->catch(sub { });
Mojo::IOLoop->one_tick until $tx->res->code || $tx->error;
ok exists $sessions->{$idle_id}, 'idle session registered';
ok exists $sessions->{$open_id}, 'streaming session registered';
my $tick = Mojo::Promise->new;
Mojo::IOLoop->timer(1.5 => sub { $tick->resolve });
$tick->wait;
ok !exists $sessions->{$idle_id}, 'idle session swept';
ok exists $sessions->{$open_id}, 'streaming session survives sweep';
$open->delete_session;
eval { $idle->ping };
like $@, qr/404 response/, 'POST for swept session is rejected';
};
done_testing;
( run in 0.845 second using v1.01-cache-2.11-cpan-5735350b133 )