MCP
view release on metacpan or search on metacpan
lib/MCP/Server/Transport/HTTP.pm view on Meta::CPAN
package MCP::Server::Transport::HTTP;
use Mojo::Base 'MCP::Server::Transport', -signatures;
use Crypt::Misc qw(random_v4uuid);
use MCP::Server::Context;
use MCP::Server::Session;
use Mojo::IOLoop;
use Mojo::JSON qw(to_json true);
use Mojo::Util qw(dumper);
use Scalar::Util qw(blessed weaken);
use constant DEBUG => $ENV{MCP_DEBUG} || 0;
has heartbeat => 30;
has session_timeout => 3600;
has sessions => sub { {} };
has streaming => 0;
sub notifications ($self) { $self->streaming ? 1 : 0 }
sub handle_request ($self, $c) {
my $method = $c->req->method;
return $self->_handle_post($c) if $method eq 'POST';
return $self->_handle_get($c) if $method eq 'GET' && $self->streaming;
return $self->_handle_delete($c) if $method eq 'DELETE' && $self->streaming;
return $c->render(json => {error => 'Method not allowed'}, status => 405);
}
sub notify ($self, $session_id, $method, $params = {}) {
return undef unless my $session = $self->sessions->{$session_id};
return undef unless my $stream = $session->stream;
$stream->write_sse({text => to_json({jsonrpc => '2.0', method => $method, params => $params})});
return 1;
}
sub notify_all ($self, $method, $params = {}) {
return undef unless $self->streaming;
my $payload = {text => to_json({jsonrpc => '2.0', method => $method, params => $params})};
for my $session (values %{$self->sessions}) {
next unless my $stream = $session->stream;
$stream->write_sse($payload);
}
return 1;
}
sub _extract_session_id ($self, $c) { return $c->req->headers->header('Mcp-Session-Id') }
sub _handle ($self, $data, $context) {
warn "-- MCP Request\n@{[dumper($data)]}\n" if DEBUG;
my $result = $self->server->handle($data, $context);
warn "-- MCP Response\n@{[dumper($result)]}\n" if DEBUG && $result;
return $result;
}
sub _handle_delete ($self, $c) {
return $c->render(json => {error => 'Missing session ID'}, status => 400)
unless my $session_id = $self->_extract_session_id($c);
return $c->render(json => {error => 'Session not found'}, status => 404)
unless my $session = delete $self->sessions->{$session_id};
if (my $stream = $session->stream) { $stream->finish }
$c->render(data => '', status => 204);
}
sub _handle_get ($self, $c) {
return $c->render(json => {error => 'Missing session ID'}, status => 400)
unless my $session_id = $self->_extract_session_id($c);
return $c->render(json => {error => 'Session not found'}, status => 404)
unless my $session = $self->sessions->{$session_id};
return $c->render(json => {error => 'Stream already open'}, status => 409) if $session->stream;
$c->inactivity_timeout(0);
$c->res->headers->header('Mcp-Session-Id' => $session_id);
$session->stream($c)->touch;
$c->write_sse;
my $heartbeat_id;
if (my $interval = $self->heartbeat) {
$heartbeat_id = Mojo::IOLoop->recurring($interval => sub { $c->write_sse({comment => 'keepalive'}) });
}
weaken(my $self_weak = $self);
$c->on(
finish => sub {
Mojo::IOLoop->remove($heartbeat_id) if $heartbeat_id;
return unless $self_weak;
return unless my $session = $self_weak->sessions->{$session_id};
return unless ($session->stream // 0) == $c;
$session->stream(undef)->touch;
}
);
}
sub _handle_initialization ($self, $c, $data) {
my $session_id = random_v4uuid;
my $result = $self->_handle($data, MCP::Server::Context->new);
if ($self->streaming) {
$self->sessions->{$session_id} = MCP::Server::Session->new(id => $session_id);
$self->_start_sweep;
}
$c->res->headers->header('Mcp-Session-Id' => $session_id);
$c->render(json => $result, status => 200);
}
sub _handle_post ($self, $c) {
my $session_id = $self->_extract_session_id($c);
return $c->render(json => {error => 'Invalid JSON'}, status => 400) unless my $data = $c->req->json;
return $c->render(json => {error => 'Invalid JSON', status => 400}) unless ref $data eq 'HASH';
if ($data->{method} && $data->{method} eq 'initialize') { $self->_handle_initialization($c, $data) }
else { $self->_handle_regular_request($c, $data, $session_id) }
}
sub _handle_regular_request ($self, $c, $data, $session_id) {
return $c->render(json => {error => 'Missing session ID'}, status => 400) unless $session_id;
if ($self->streaming) {
return $c->render(json => {error => 'Session not found'}, status => 404)
unless my $session = $self->sessions->{$session_id};
$session->touch;
}
$c->res->headers->header('Mcp-Session-Id' => $session_id);
my $context = MCP::Server::Context->new(transport => $self, session_id => $session_id, controller => $c);
return $c->render(data => '', status => 202) unless defined(my $result = $self->_handle($data, $context));
# Sync
return $c->render(json => $result, status => 200) if !blessed($result) || !$result->isa('Mojo::Promise');
# Async
$c->inactivity_timeout(0);
$c->write_sse;
$result->then(sub { $c->write_sse({text => to_json($_[0])})->finish });
}
sub _start_sweep ($self) {
return if $self->{_sweep_id};
return unless my $interval = $self->session_timeout;
weaken(my $self_weak = $self);
$self->{_sweep_id} = Mojo::IOLoop->recurring($interval => sub { $self_weak->_sweep if $self_weak });
}
sub _sweep ($self) {
return unless my $timeout = $self->session_timeout;
my $cutoff = time - $timeout;
my $sessions = $self->sessions;
for my $id (keys %$sessions) {
my $session = $sessions->{$id};
delete $sessions->{$id} if !$session->stream && $session->last_used < $cutoff;
}
}
1;
=encoding utf8
=head1 NAME
MCP::Server::Transport::HTTP - HTTP transport for MCP servers
=head1 SYNOPSIS
use MCP::Server::Transport::HTTP;
my $http = MCP::Server::Transport::HTTP->new;
=head1 DESCRIPTION
L<MCP::Server::Transport::HTTP> is a transport for MCP (Model Context Protocol) server that uses HTTP as the
underlying transport mechanism.
By default only C<POST> requests are handled. When L</"streaming"> is enabled, the transport additionally supports
the server-to-client SSE stream (C<GET>) and explicit session termination (C<DELETE>) defined by the Streamable
HTTP transport. Note that this requires per-process state and is therefore not compatible with pre-forking web
servers.
=head1 ATTRIBUTES
L<MCP::Server::Transport::HTTP> inherits all attributes from L<MCP::Server::Transport> and implements the following
new ones.
=head2 heartbeat
my $seconds = $http->heartbeat;
$http = $http->heartbeat(30);
Interval in seconds at which a keep-alive comment is sent on each open server-to-client stream. Defaults to C<30>;
set to C<0> to disable. Useful when running behind reverse proxies that close idle connections. Only used when
L</"streaming"> is enabled.
=head2 session_timeout
my $seconds = $http->session_timeout;
$http = $http->session_timeout(3600);
Idle timeout in seconds for sessions without an open server-to-client stream. Defaults to C<3600>; set to C<0> to
disable. A periodic sweep removes sessions whose last activity is older than this value, so the effective lifetime
of an idle session is up to twice the configured timeout. Only used when L</"streaming"> is enabled.
=head2 sessions
my $sessions = $http->sessions;
$http = $http->sessions({});
Per-process registry of active L<MCP::Server::Session> objects, keyed by session ID. Only used when L</"streaming">
is enabled.
=head2 streaming
my $bool = $http->streaming;
$http = $http->streaming(1);
Enable server-to-client streaming and session lifecycle management. Defaults to false. When enabled, the transport
tracks all sessions in L</"sessions">, accepts C<GET> requests to open a long-lived SSE stream the server can push
notifications to, and accepts C<DELETE> requests to terminate a session. Requests for unknown sessions are rejected
with status C<404>.
=head1 METHODS
L<MCP::Server::Transport::HTTP> inherits all methods from L<MCP::Server::Transport> and implements the following new
ones.
=head2 handle_request
$http->handle_request(Mojolicious::Controller->new);
Handles an incoming HTTP request.
=head2 notifications
my $bool = $http->notifications;
True when L</"streaming"> is enabled, false otherwise.
=head2 notify
my $bool = $http->notify($session_id, $method);
my $bool = $http->notify($session_id, $method, {foo => 'bar'});
Send a JSON-RPC notification to the open SSE stream of a session. Returns true on success, or C<undef> if the
session does not exist or has no open stream. Only available when L</"streaming"> is enabled.
=head2 notify_all
my $bool = $http->notify_all($method);
my $bool = $http->notify_all($method, {foo => 'bar'});
Send a JSON-RPC notification to the open SSE stream of every active session. Returns true on success, or C<undef>
when L</"streaming"> is disabled.
=head1 SEE ALSO
L<MCP>, L<https://mojolicious.org>, L<https://modelcontextprotocol.io>.
=cut
( run in 0.728 second using v1.01-cache-2.11-cpan-5735350b133 )