Skip to content

Commit 7dcba35

Browse files
authored
Add server-side Streamable HTTP transport support (#330)
- Changes IJsonRpcMessage to an abstract base class so RelatedTransport will always be available - Streamable HTTP supports multiple concurrent HTTP request with their own indpendent SSE response streams - RelatedTransport indicates the source or destination of the JsonRpcMessage - Changes the default RequestId to a JSON number for better compatibility with MCP servers in the wild
1 parent 7d77be6 commit 7dcba35

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1637
-395
lines changed

src/ModelContextProtocol.AspNetCore/HttpMcpServerBuilderExtensions.cs

+4-2
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,20 @@ public static class HttpMcpServerBuilderExtensions
1111
{
1212
/// <summary>
1313
/// Adds the services necessary for <see cref="M:McpEndpointRouteBuilderExtensions.MapMcp"/>
14-
/// to handle MCP requests and sessions using the MCP HTTP Streaming transport. For more information on configuring the underlying HTTP server
14+
/// to handle MCP requests and sessions using the MCP Streamable HTTP transport. For more information on configuring the underlying HTTP server
1515
/// to control things like port binding custom TLS certificates, see the <see href="https://learn.microsoft.com/aspnet/core/fundamentals/minimal-apis">Minimal APIs quick reference</see>.
1616
/// </summary>
1717
/// <param name="builder">The builder instance.</param>
18-
/// <param name="configureOptions">Configures options for the HTTP Streaming transport. This allows configuring per-session
18+
/// <param name="configureOptions">Configures options for the Streamable HTTP transport. This allows configuring per-session
1919
/// <see cref="McpServerOptions"/> and running logic before and after a session.</param>
2020
/// <returns>The builder provided in <paramref name="builder"/>.</returns>
2121
/// <exception cref="ArgumentNullException"><paramref name="builder"/> is <see langword="null"/>.</exception>
2222
public static IMcpServerBuilder WithHttpTransport(this IMcpServerBuilder builder, Action<HttpServerTransportOptions>? configureOptions = null)
2323
{
2424
ArgumentNullException.ThrowIfNull(builder);
2525
builder.Services.TryAddSingleton<StreamableHttpHandler>();
26+
builder.Services.TryAddSingleton<SseHandler>();
27+
builder.Services.AddHostedService<IdleTrackingBackgroundService>();
2628

2729
if (configureOptions is not null)
2830
{

src/ModelContextProtocol.AspNetCore/HttpMcpSession.cs

+60-6
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,61 @@
11
using ModelContextProtocol.Protocol.Transport;
2+
using ModelContextProtocol.Server;
23
using System.Security.Claims;
34

45
namespace ModelContextProtocol.AspNetCore;
56

6-
internal class HttpMcpSession
7+
internal sealed class HttpMcpSession<TTransport>(string sessionId, TTransport transport, ClaimsPrincipal user, TimeProvider timeProvider) : IAsyncDisposable
8+
where TTransport : ITransport
79
{
8-
public HttpMcpSession(SseResponseStreamTransport transport, ClaimsPrincipal user)
10+
private int _referenceCount;
11+
private int _getRequestStarted;
12+
private CancellationTokenSource _disposeCts = new();
13+
14+
public string Id { get; } = sessionId;
15+
public TTransport Transport { get; } = transport;
16+
public (string Type, string Value, string Issuer)? UserIdClaim { get; } = GetUserIdClaim(user);
17+
18+
public CancellationToken SessionClosed => _disposeCts.Token;
19+
20+
public bool IsActive => !SessionClosed.IsCancellationRequested && _referenceCount > 0;
21+
public long LastActivityTicks { get; private set; } = timeProvider.GetTimestamp();
22+
23+
public IMcpServer? Server { get; set; }
24+
public Task? ServerRunTask { get; set; }
25+
26+
public IDisposable AcquireReference()
927
{
10-
Transport = transport;
11-
UserIdClaim = GetUserIdClaim(user);
28+
Interlocked.Increment(ref _referenceCount);
29+
return new UnreferenceDisposable(this, timeProvider);
1230
}
1331

14-
public SseResponseStreamTransport Transport { get; }
15-
public (string Type, string Value, string Issuer)? UserIdClaim { get; }
32+
public bool TryStartGetRequest() => Interlocked.Exchange(ref _getRequestStarted, 1) == 0;
33+
34+
public async ValueTask DisposeAsync()
35+
{
36+
try
37+
{
38+
await _disposeCts.CancelAsync();
39+
40+
if (ServerRunTask is not null)
41+
{
42+
await ServerRunTask;
43+
}
44+
}
45+
catch (OperationCanceledException)
46+
{
47+
}
48+
finally
49+
{
50+
if (Server is not null)
51+
{
52+
await Server.DisposeAsync();
53+
}
54+
55+
await Transport.DisposeAsync();
56+
_disposeCts.Dispose();
57+
}
58+
}
1659

1760
public bool HasSameUserId(ClaimsPrincipal user)
1861
=> UserIdClaim == GetUserIdClaim(user);
@@ -36,4 +79,15 @@ private static (string Type, string Value, string Issuer)? GetUserIdClaim(Claims
3679

3780
return null;
3881
}
82+
83+
private sealed class UnreferenceDisposable(HttpMcpSession<TTransport> session, TimeProvider timeProvider) : IDisposable
84+
{
85+
public void Dispose()
86+
{
87+
if (Interlocked.Decrement(ref session._referenceCount) == 0)
88+
{
89+
session.LastActivityTicks = timeProvider.GetTimestamp();
90+
}
91+
}
92+
}
3993
}

src/ModelContextProtocol.AspNetCore/HttpServerTransportOptions.cs

+13
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,17 @@ public class HttpServerTransportOptions
2121
/// This is useful for running logic before a sessions starts and after it completes.
2222
/// </summary>
2323
public Func<HttpContext, IMcpServer, CancellationToken, Task>? RunSessionHandler { get; set; }
24+
25+
/// <summary>
26+
/// Represents the duration of time the server will wait between any active requests before timing out an
27+
/// MCP session. This is checked in background every 5 seconds. A client trying to resume a session will
28+
/// receive a 404 status code and should restart their session. A client can keep their session open by
29+
/// keeping a GET request open. The default value is set to 2 minutes.
30+
/// </summary>
31+
public TimeSpan IdleTimeout { get; set; } = TimeSpan.FromMinutes(2);
32+
33+
/// <summary>
34+
/// Used for testing the <see cref="IdleTimeout"/>.
35+
/// </summary>
36+
public TimeProvider TimeProvider { get; set; } = TimeProvider.System;
2437
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
using Microsoft.Extensions.Hosting;
2+
using Microsoft.Extensions.Logging;
3+
using Microsoft.Extensions.Options;
4+
using ModelContextProtocol.Protocol.Transport;
5+
6+
namespace ModelContextProtocol.AspNetCore;
7+
8+
internal sealed partial class IdleTrackingBackgroundService(
9+
StreamableHttpHandler handler,
10+
IOptions<HttpServerTransportOptions> options,
11+
ILogger<IdleTrackingBackgroundService> logger) : BackgroundService
12+
{
13+
// The compiler will complain about the parameter being unused otherwise despite the source generator.
14+
private ILogger _logger = logger;
15+
16+
// We can make this configurable once we properly harden the MCP server. In the meantime, anyone running
17+
// this should be taking a cattle not pets approach to their servers and be able to launch more processes
18+
// to handle more than 10,000 idle sessions at a time.
19+
private const int MaxIdleSessionCount = 10_000;
20+
21+
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
22+
{
23+
var timeProvider = options.Value.TimeProvider;
24+
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(5), timeProvider);
25+
26+
try
27+
{
28+
while (!stoppingToken.IsCancellationRequested && await timer.WaitForNextTickAsync(stoppingToken))
29+
{
30+
var idleActivityCutoff = timeProvider.GetTimestamp() - options.Value.IdleTimeout.Ticks;
31+
32+
var idleCount = 0;
33+
foreach (var (_, session) in handler.Sessions)
34+
{
35+
if (session.IsActive || session.SessionClosed.IsCancellationRequested)
36+
{
37+
// There's a request currently active or the session is already being closed.
38+
continue;
39+
}
40+
41+
idleCount++;
42+
if (idleCount == MaxIdleSessionCount)
43+
{
44+
// Emit critical log at most once every 5 seconds the idle count it exceeded,
45+
//since the IdleTimeout will no longer be respected.
46+
LogMaxSessionIdleCountExceeded();
47+
}
48+
else if (idleCount < MaxIdleSessionCount && session.LastActivityTicks > idleActivityCutoff)
49+
{
50+
continue;
51+
}
52+
53+
if (handler.Sessions.TryRemove(session.Id, out var removedSession))
54+
{
55+
LogSessionIdle(removedSession.Id);
56+
57+
// Don't slow down the idle tracking loop. DisposeSessionAsync logs. We only await during graceful shutdown.
58+
_ = DisposeSessionAsync(removedSession);
59+
}
60+
}
61+
}
62+
}
63+
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
64+
{
65+
}
66+
finally
67+
{
68+
if (stoppingToken.IsCancellationRequested)
69+
{
70+
List<Task> disposeSessionTasks = [];
71+
72+
foreach (var (sessionKey, _) in handler.Sessions)
73+
{
74+
if (handler.Sessions.TryRemove(sessionKey, out var session))
75+
{
76+
disposeSessionTasks.Add(DisposeSessionAsync(session));
77+
}
78+
}
79+
80+
await Task.WhenAll(disposeSessionTasks);
81+
}
82+
}
83+
}
84+
85+
private async Task DisposeSessionAsync(HttpMcpSession<StreamableHttpServerTransport> session)
86+
{
87+
try
88+
{
89+
await session.DisposeAsync();
90+
}
91+
catch (Exception ex)
92+
{
93+
LogSessionDisposeError(session.Id, ex);
94+
}
95+
}
96+
97+
[LoggerMessage(Level = LogLevel.Information, Message = "Closing idle session {sessionId}.")]
98+
private partial void LogSessionIdle(string sessionId);
99+
100+
[LoggerMessage(Level = LogLevel.Critical, Message = "Exceeded static maximum of 10,000 idle connections. Now clearing all inactive connections regardless of timeout.")]
101+
private partial void LogMaxSessionIdleCountExceeded();
102+
103+
[LoggerMessage(Level = LogLevel.Error, Message = "Error disposing the IMcpServer for session {sessionId}.")]
104+
private partial void LogSessionDisposeError(string sessionId, Exception ex);
105+
}
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1-
using Microsoft.AspNetCore.Routing;
1+
using Microsoft.AspNetCore.Http;
2+
using Microsoft.AspNetCore.Http.Metadata;
3+
using Microsoft.AspNetCore.Routing;
24
using Microsoft.Extensions.DependencyInjection;
35
using ModelContextProtocol.AspNetCore;
6+
using ModelContextProtocol.Protocol.Messages;
47
using System.Diagnostics.CodeAnalysis;
58

69
namespace Microsoft.AspNetCore.Builder;
@@ -11,21 +14,42 @@ namespace Microsoft.AspNetCore.Builder;
1114
public static class McpEndpointRouteBuilderExtensions
1215
{
1316
/// <summary>
14-
/// Sets up endpoints for handling MCP HTTP Streaming transport.
15-
/// See <see href="https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http">the protocol specification</see> for details about the Streamable HTTP transport.
17+
/// Sets up endpoints for handling MCP Streamable HTTP transport.
18+
/// See <see href="https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http">the 2025-03-26 protocol specification</see> for details about the Streamable HTTP transport.
19+
/// Also maps legacy SSE endpoints for backward compatibility at the path "/sse" and "/message". <see href="https://modelcontextprotocol.io/specification/2024-11-05/basic/transports#http-with-sse">the 2024-11-05 protocol specification</see> for details about the HTTP with SSE transport.
1620
/// </summary>
1721
/// <param name="endpoints">The web application to attach MCP HTTP endpoints.</param>
1822
/// <param name="pattern">The route pattern prefix to map to.</param>
1923
/// <returns>Returns a builder for configuring additional endpoint conventions like authorization policies.</returns>
2024
public static IEndpointConventionBuilder MapMcp(this IEndpointRouteBuilder endpoints, [StringSyntax("Route")] string pattern = "")
2125
{
22-
var handler = endpoints.ServiceProvider.GetService<StreamableHttpHandler>() ??
26+
var streamableHttpHandler = endpoints.ServiceProvider.GetService<StreamableHttpHandler>() ??
2327
throw new InvalidOperationException("You must call WithHttpTransport(). Unable to find required services. Call builder.Services.AddMcpServer().WithHttpTransport() in application startup code.");
2428

25-
var routeGroup = endpoints.MapGroup(pattern);
26-
routeGroup.MapGet("", handler.HandleRequestAsync);
27-
routeGroup.MapGet("/sse", handler.HandleRequestAsync);
28-
routeGroup.MapPost("/message", handler.HandleRequestAsync);
29-
return routeGroup;
29+
var mcpGroup = endpoints.MapGroup(pattern);
30+
var streamableHttpGroup = mcpGroup.MapGroup("")
31+
.WithDisplayName(b => $"MCP Streamable HTTP | {b.DisplayName}")
32+
.WithMetadata(new ProducesResponseTypeMetadata(StatusCodes.Status404NotFound, typeof(JsonRpcError), contentTypes: ["application/json"]));
33+
34+
streamableHttpGroup.MapPost("", streamableHttpHandler.HandlePostRequestAsync)
35+
.WithMetadata(new AcceptsMetadata(["application/json"]))
36+
.WithMetadata(new ProducesResponseTypeMetadata(StatusCodes.Status200OK, contentTypes: ["text/event-stream"]))
37+
.WithMetadata(new ProducesResponseTypeMetadata(StatusCodes.Status202Accepted));
38+
streamableHttpGroup.MapGet("", streamableHttpHandler.HandleGetRequestAsync)
39+
.WithMetadata(new ProducesResponseTypeMetadata(StatusCodes.Status200OK, contentTypes: ["text/event-stream"]));
40+
streamableHttpGroup.MapDelete("", streamableHttpHandler.HandleDeleteRequestAsync);
41+
42+
// Map legacy HTTP with SSE endpoints.
43+
var sseHandler = endpoints.ServiceProvider.GetRequiredService<SseHandler>();
44+
var sseGroup = mcpGroup.MapGroup("")
45+
.WithDisplayName(b => $"MCP HTTP with SSE | {b.DisplayName}");
46+
47+
sseGroup.MapGet("/sse", sseHandler.HandleSseRequestAsync)
48+
.WithMetadata(new ProducesResponseTypeMetadata(StatusCodes.Status200OK, contentTypes: ["text/event-stream"]));
49+
sseGroup.MapPost("/message", sseHandler.HandleMessageRequestAsync)
50+
.WithMetadata(new AcceptsMetadata(["application/json"]))
51+
.WithMetadata(new ProducesResponseTypeMetadata(StatusCodes.Status202Accepted));
52+
53+
return mcpGroup;
3054
}
3155
}

0 commit comments

Comments
 (0)