MCP

 view release on metacpan or  search on metacpan

t/streaming.t  view on Meta::CPAN

use Mojo::Base -strict, -signatures;

use Test::More;

use Mojolicious::Lite;
use Test::Mojo;
use Mojo::IOLoop;
use Mojo::JSON qw(from_json true);
use Mojo::Promise;
use MCP::Client;
use MCP::Server;

my $server = MCP::Server->new;

$server->tool(
  name => 'push_log',
  code => sub ($tool, $args) {
    $tool->context->notify('notifications/message', {level => 'info', data => 'hello stream'});
    return 'pushed';
  }
);
$server->tool(
  name => 'notify_status',
  code => sub ($tool, $args) {
    my $sent = $tool->context->notify('notifications/message', {data => 'x'});
    return $sent ? 'sent' : 'no stream';
  }
);
$server->tool(
  name => 'progress',
  code => sub ($tool, $args) {
    my $sent = $tool->context->notify_progress(1, 2, 'halfway');
    return $sent ? 'sent' : 'no token';
  }
);
$server->tool(
  name => 'async_progress',
  code => sub ($tool, $args) {
    my $context = $tool->context;
    my $promise = Mojo::Promise->new;
    Mojo::IOLoop->timer(
      0.1 => sub {
        $context->notify_progress(1, 2, 'late');
        $promise->resolve('done');
      }
    );
    return $promise;
  }
);

any '/mcp' => $server->to_action({streaming => 1, heartbeat => 0, session_timeout => 0.5});

my $t = Test::Mojo->new;

subtest 'No session' => sub {
  $t->get_ok('/mcp')->status_is(400)->json_is('/error' => 'Missing session ID');
  $t->delete_ok('/mcp')->status_is(400)->json_is('/error' => 'Missing session ID');
};

subtest 'Unknown session' => sub {
  $t->get_ok('/mcp' => {'Mcp-Session-Id' => 'nope'})->status_is(404);
  $t->delete_ok('/mcp' => {'Mcp-Session-Id' => 'nope'})->status_is(404);

  my $client = MCP::Client->new(ua => $t->ua, url => $t->ua->server->url->path('/mcp'));
  eval { $client->session_id('nope'); $client->ping };
  like $@, qr/404 response/, 'POST with unknown session is rejected';
};

subtest 'List changed' => sub {
  my $client = MCP::Client->new(ua => $t->ua, url => $t->ua->server->url->path('/mcp'));
  my $caps   = $client->initialize_session->{capabilities};
  is $caps->{tools}{listChanged},     true, 'tools listChanged advertised';
  is $caps->{prompts}{listChanged},   true, 'prompts listChanged advertised';
  is $caps->{resources}{listChanged}, true, 'resources listChanged advertised';

  my $got_notification = Mojo::Promise->new;
  my $msg;
  my $url = $t->ua->server->url->path('/mcp');
  my $tx  = $t->ua->build_tx(GET => $url => {Accept => 'text/event-stream', 'Mcp-Session-Id' => $client->session_id});
  $tx->res->content->on(
    sse => sub ($content, $event = undef) {
      return if $msg;
      return unless $event && $event->{text} && (my $parsed = eval { from_json($event->{text}) });
      $msg = $parsed;
      $got_notification->resolve;
    }
  );
  $t->ua->start_p($tx)->catch(sub { });
  Mojo::IOLoop->one_tick until $tx->res->code || $tx->error;

  ok $server->notify_list_changed('tools'), 'broadcast attempted';
  $got_notification->timeout(5)->wait;
  is $msg->{jsonrpc}, '2.0',                              'JSON-RPC version';
  is $msg->{method},  'notifications/tools/list_changed', 'notification method';

  $client->delete_session;
};

subtest 'List changed (no streams)' => sub {
  ok $server->notify_list_changed('prompts'), 'broadcast attempted';
};

subtest 'Bidirectional flow' => sub {
  my $client = MCP::Client->new(ua => $t->ua, url => $t->ua->server->url->path('/mcp'));
  $client->initialize_session;
  ok $client->session_id, 'session id set';

  my $got_notification = Mojo::Promise->new;
  my $msg;
  my $url = $t->ua->server->url->path('/mcp');
  my $tx  = $t->ua->build_tx(GET => $url => {Accept => 'text/event-stream', 'Mcp-Session-Id' => $client->session_id});

t/streaming.t  view on Meta::CPAN

  $client->initialize_session;
  my $session_id = $client->session_id;

  my $url = $t->ua->server->url->path('/mcp');
  my $tx  = $t->ua->build_tx(GET => $url => {Accept => 'text/event-stream', 'Mcp-Session-Id' => $session_id});
  $t->ua->start_p($tx)->catch(sub { });
  Mojo::IOLoop->one_tick until $tx->res->code || $tx->error;
  ok $transport->sessions->{$session_id}->stream, 'stream registered';

  my $closed = Mojo::Promise->new;
  $tx->on(finish => sub { $closed->resolve });
  $transport->sessions->{$session_id}->stream->finish;
  $closed->timeout(5)->wait;
  ok !$transport->sessions->{$session_id}->stream, 'stream cleared on finish';

  $client->delete_session;
};

subtest 'Heartbeat' => sub {
  my $transport = $server->transport;
  $transport->heartbeat(1);
  my $client = MCP::Client->new(ua => $t->ua, url => $t->ua->server->url->path('/mcp'));
  $client->initialize_session;
  my $session_id = $client->session_id;

  my $url = $t->ua->server->url->path('/mcp');
  my $tx  = $t->ua->build_tx(GET => $url => {Accept => 'text/event-stream', 'Mcp-Session-Id' => $session_id});
  $t->ua->start_p($tx)->catch(sub { });
  Mojo::IOLoop->one_tick until $tx->res->code || $tx->error;
  is $tx->res->code, 200, 'stream open';

  # SSE parser strips comments
  my $bytes = '';
  Mojo::IOLoop->stream($tx->connection)->on(read => sub ($stream, $chunk) { $bytes .= $chunk });

  my $deadline = Mojo::Promise->new;
  Mojo::IOLoop->timer(1.5 => sub { $deadline->resolve });
  $deadline->wait;
  like $bytes, qr/: keepalive/, 'heartbeat sent';

  $transport->heartbeat(0);
  $client->delete_session;
};

subtest 'Session expiration' => sub {
  my $sessions = $server->transport->sessions;

  my $idle = MCP::Client->new(ua => $t->ua, url => $t->ua->server->url->path('/mcp'));
  $idle->initialize_session;
  my $idle_id = $idle->session_id;

  my $open = MCP::Client->new(ua => $t->ua, url => $t->ua->server->url->path('/mcp'));
  $open->initialize_session;
  my $open_id = $open->session_id;
  my $url     = $t->ua->server->url->path('/mcp');
  my $tx      = $t->ua->build_tx(GET => $url => {Accept => 'text/event-stream', 'Mcp-Session-Id' => $open_id});
  $t->ua->start_p($tx)->catch(sub { });
  Mojo::IOLoop->one_tick until $tx->res->code || $tx->error;

  ok exists $sessions->{$idle_id}, 'idle session registered';
  ok exists $sessions->{$open_id}, 'streaming session registered';

  my $tick = Mojo::Promise->new;
  Mojo::IOLoop->timer(1.5 => sub { $tick->resolve });
  $tick->wait;

  ok !exists $sessions->{$idle_id}, 'idle session swept';
  ok exists $sessions->{$open_id},  'streaming session survives sweep';

  $open->delete_session;

  eval { $idle->ping };
  like $@, qr/404 response/, 'POST for swept session is rejected';
};

done_testing;



( run in 0.845 second using v1.01-cache-2.11-cpan-5735350b133 )