Skip to content

Commit 4317a2d

Browse files
committed
wip
1 parent 54ae882 commit 4317a2d

File tree

5 files changed

+45
-441
lines changed

5 files changed

+45
-441
lines changed

packages/ai/ai/src/McpSchema.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,9 @@ export class InvalidParams extends Schema.TaggedError<InvalidParams>()("InvalidP
266266
export class InternalError extends Schema.TaggedError<InternalError>()("InternalError", {
267267
...McpError.fields,
268268
code: Schema.tag(INTERNAL_ERROR_CODE)
269-
}) {}
269+
}) {
270+
static readonly notImplemented = new InternalError({ message: "Not implemented" })
271+
}
270272

271273
// =============================================================================
272274
// Ping
@@ -1049,7 +1051,8 @@ export class SetLevel extends Rpc.make("logging/setLevel", {
10491051
* severe) to the client as notifications/message.
10501052
*/
10511053
level: LoggingLevel
1052-
}
1054+
},
1055+
error: McpError
10531056
}) {}
10541057

10551058
export class LoggingMessageNotification extends Rpc.make("notifications/message", {

packages/ai/ai/src/McpServer.ts

Lines changed: 35 additions & 267 deletions
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,14 @@
22
* @since 1.0.0
33
*/
44
import * as Headers from "@effect/platform/Headers"
5-
import { RpcMessage } from "@effect/rpc"
65
import type * as Rpc from "@effect/rpc/Rpc"
7-
import type * as RpcGroup from "@effect/rpc/RpcGroup"
8-
import { RequestId } from "@effect/rpc/RpcMessage"
96
import * as RpcServer from "@effect/rpc/RpcServer"
10-
import type { NonEmptyReadonlyArray } from "effect/Array"
11-
import * as Cause from "effect/Cause"
127
import * as Context from "effect/Context"
138
import * as Effect from "effect/Effect"
14-
import * as Either from "effect/Either"
15-
import * as Exit from "effect/Exit"
169
import * as Layer from "effect/Layer"
17-
import type { ParseError } from "effect/ParseResult"
18-
import { TreeFormatter } from "effect/ParseResult"
19-
import * as Schema from "effect/Schema"
20-
import * as Scope from "effect/Scope"
21-
import type { ClientRequestRpcs, FromServerEncoded, Implementation, ServerCapabilities } from "./McpSchema.js"
22-
import { ClientRpcs, INTERNAL_ERROR_CODE, INVALID_PARAMS_ERROR_CODE, METHOD_NOT_FOUND_ERROR_CODE } from "./McpSchema.js"
23-
import * as McpTransport from "./McpTransport.js"
24-
25-
// TODO:
26-
// - Finish error types
27-
// - Investigate encoded errors in transport.send
10+
import { McpSchema } from "./index.js"
11+
import type { Implementation, ServerCapabilities } from "./McpSchema.js"
12+
import { ClientRpcs } from "./McpSchema.js"
2813

2914
export class McpServer extends Context.Tag("@effect/ai/McpServer")<
3015
McpServer,
@@ -54,245 +39,39 @@ export const SUPPORTED_PROTOCOL_VERSIONS = [
5439
* @since 1.0.0
5540
* @category Constructors
5641
*/
57-
export const makeRpc = Effect.gen(function*() {
42+
export const make = Effect.gen(function*() {
5843
const protocol = yield* RpcServer.Protocol
5944
const handlers = yield* Layer.build(ClientRpcHandlers)
6045

6146
const patchedProtocol = RpcServer.Protocol.of({
6247
...protocol,
6348
run(f) {
6449
return protocol.run((clientId, request) => {
65-
if (request._tag === "Request" && request.id === "") {
50+
if (request._tag === "Request" && request.tag.startsWith("notifications/")) {
6651
if (request.tag === "notifications/cancelled") {
6752
return f(clientId, {
6853
_tag: "Interrupt",
6954
requestId: String((request.payload as any).requestId)
7055
})
7156
}
72-
// TODO: notification handlers
57+
const handler = handlers.unsafeMap.get(request.tag) as Rpc.Handler<string>
58+
return handler
59+
? handler.handler(request.payload, Headers.fromInput(request.headers)) as Effect.Effect<void>
60+
: Effect.void
7361
}
7462
return f(clientId, request)
7563
})
7664
}
7765
})
7866

79-
yield* RpcServer.make(ClientRpcs, {
67+
return yield* RpcServer.make(ClientRpcs, {
8068
spanPrefix: "McpServer",
8169
disableFatalDefects: true
8270
}).pipe(
8371
Effect.provideService(RpcServer.Protocol, patchedProtocol),
84-
Effect.provide(handlers),
85-
Effect.forkScoped,
86-
Effect.interruptible
72+
Effect.provide(handlers)
8773
)
88-
89-
return {} as const
90-
})
91-
92-
/**
93-
* @since 1.0.0
94-
* @category Mcp Server
95-
*/
96-
export const make = Effect.gen(function*() {
97-
const transport = yield* McpTransport.McpTransport
98-
const context = yield* Effect.context<never>()
99-
const scope = yield* Scope.make()
100-
101-
type Rpcs = RpcGroup.Rpcs<typeof ClientRequestRpcs>
102-
103-
type Schemas = {
104-
readonly decode: (u: unknown) => Effect.Effect<Rpc.Payload<Rpcs>, ParseError>
105-
readonly encodeChunk: (u: ReadonlyArray<unknown>) => Effect.Effect<NonEmptyReadonlyArray<unknown>, ParseError>
106-
readonly encodeSuccess: (u: unknown) => Effect.Effect<unknown, ParseError>
107-
readonly encodeFailure: (u: unknown) => Effect.Effect<unknown, ParseError>
108-
readonly context: Context.Context<never>
109-
}
110-
111-
type Client = {
112-
readonly id: number
113-
readonly schemas: Map<RequestId, Schemas>
114-
}
115-
const clients = new Map<number, Client>()
116-
117-
const schemasCache = new WeakMap<any, Schemas>()
118-
const getRequestSchemas = (rpc: Rpc.AnyWithProps) => {
119-
let metadata = schemasCache.get(rpc)
120-
if (!metadata) {
121-
const entry = context.unsafeMap.get(rpc.key) as Rpc.Handler<Rpcs["_tag"]>
122-
metadata = {
123-
decode: Schema.decodeUnknown(rpc.payloadSchema as any),
124-
encodeChunk: Schema.encodeUnknown(Schema.Array(Schema.Any)) as any,
125-
encodeSuccess: Schema.encodeUnknown(rpc.successSchema) as any,
126-
encodeFailure: Schema.encodeUnknown(rpc.errorSchema as any) as any,
127-
context: entry.context
128-
}
129-
schemasCache.set(rpc, metadata)
130-
}
131-
return metadata
132-
}
133-
134-
const handleEncode = <A, R>(
135-
client: Client,
136-
requestId: RequestId,
137-
encode: Effect.Effect<A, ParseError, R>,
138-
onSuccess: (value: A) => FromServerEncoded | ReadonlyArray<FromServerEncoded>
139-
) =>
140-
Effect.catchAllCause(
141-
Effect.flatMap(encode, (value) => transport.send(client.id, onSuccess(value))),
142-
(cause) => {
143-
client.schemas.delete(requestId)
144-
const message = Cause.squash(Cause.map(cause, TreeFormatter.formatErrorSync))
145-
return transport.send(client.id, {
146-
_tag: "Failure",
147-
id: Number(requestId),
148-
error: {
149-
code: INTERNAL_ERROR_CODE,
150-
message: `Failed to encode response\n${message}`
151-
}
152-
})
153-
}
154-
)
155-
156-
const server = yield* RpcServer.makeNoSerialization(ClientRpcs, {
157-
disableClientAcks: true,
158-
disableSpanPropagation: true,
159-
disableTracing: true,
160-
onFromServer(response) {
161-
const client = clients.get(response.clientId)
162-
if (!client) return Effect.void
163-
switch (response._tag) {
164-
case "Chunk": {
165-
const metadata = client.schemas.get(response.requestId)
166-
if (!metadata) return Effect.void
167-
return handleEncode(
168-
client,
169-
response.requestId,
170-
Effect.provide(metadata.encodeChunk(response.values), metadata.context),
171-
(results) =>
172-
results.map((result) => ({
173-
_tag: "Success",
174-
id: Number(response.requestId),
175-
result: result as any
176-
}))
177-
)
178-
}
179-
case "Exit": {
180-
const metadata = client.schemas.get(response.requestId)
181-
if (!metadata) return Effect.void
182-
client.schemas.delete(response.requestId)
183-
return handleEncode(
184-
client,
185-
response.requestId,
186-
Exit.match(response.exit, {
187-
onFailure: (cause): Effect.Effect<Either.Either<unknown, unknown>, ParseError> =>
188-
metadata.encodeFailure(Cause.squash(cause)).pipe(
189-
Effect.map((error) => Either.left(error))
190-
),
191-
onSuccess: (value): Effect.Effect<Either.Either<unknown, unknown>, ParseError> =>
192-
metadata.encodeSuccess(value).pipe(
193-
Effect.map((value) => Either.right(value))
194-
)
195-
}),
196-
Either.match({
197-
onLeft: (error) => ({
198-
_tag: "Failure",
199-
id: Number(response.requestId),
200-
error: error as any
201-
}),
202-
onRight: (result) => ({
203-
_tag: "Success",
204-
id: Number(response.requestId),
205-
result: result as any
206-
})
207-
})
208-
)
209-
}
210-
case "ClientEnd": {
211-
clients.delete(response.clientId)
212-
return transport.end(response.clientId)
213-
}
214-
case "Defect": {
215-
// TODO: defects do not contain the requestId (?)
216-
return Effect.void
217-
}
218-
}
219-
}
220-
}).pipe(Scope.extend(scope))
221-
222-
yield* transport.run(
223-
Effect.fnUntraced(function*(clientId, request) {
224-
let client = clients.get(clientId)
225-
if (!client) {
226-
client = {
227-
id: clientId,
228-
schemas: new Map()
229-
}
230-
clients.set(clientId, client)
231-
}
232-
233-
const rpc = ClientRpcs.requests.get(request.method)
234-
if (!rpc) {
235-
// For unknown notifications, ignore the request
236-
if (request._tag === "Notification") return
237-
// For unknown requests, respond with an error
238-
return yield* transport.send(clientId, {
239-
_tag: "Failure",
240-
id: request.id,
241-
error: {
242-
code: METHOD_NOT_FOUND_ERROR_CODE,
243-
message: "Method not found",
244-
data: { method: request.method }
245-
}
246-
})
247-
}
248-
249-
switch (request._tag) {
250-
case "Request": {
251-
const requestId = RequestId(typeof request.id === "string" ? request.id : BigInt(request.id))
252-
const metadata = getRequestSchemas(rpc)
253-
return yield* Effect.matchEffect(
254-
Effect.provide(metadata.decode(request.payload), metadata.context),
255-
{
256-
onFailure: (_error) => {
257-
return transport.send(clientId, {
258-
_tag: "Failure",
259-
id: request.id,
260-
error: {
261-
code: INVALID_PARAMS_ERROR_CODE,
262-
message: `Invalid parameters for method: "${request.method}"`,
263-
data: { params: request.payload }
264-
}
265-
})
266-
},
267-
onSuccess: (payload) => {
268-
client.schemas.set(requestId, metadata)
269-
return server.write(client.id, {
270-
_tag: "Request",
271-
id: requestId,
272-
tag: request.method,
273-
payload,
274-
headers: Headers.empty,
275-
sampled: false,
276-
spanId: "",
277-
traceId: ""
278-
})
279-
}
280-
}
281-
)
282-
}
283-
case "Notification": {
284-
return Effect.void
285-
}
286-
}
287-
})
288-
).pipe(
289-
Effect.interruptible,
290-
Effect.tapErrorCause((cause) => Effect.sync(() => console.error("BUG: McpServer protocol crashed", cause))),
291-
Effect.onExit((exit) => Scope.close(scope, exit))
292-
)
293-
294-
return {} as const
295-
})
74+
}).pipe(Effect.scoped)
29675

29776
const ClientRpcHandlers = ClientRpcs.toLayer(
29877
Effect.gen(function*() {
@@ -301,27 +80,28 @@ const ClientRpcHandlers = ClientRpcs.toLayer(
30180
return {
30281
// Requests
30382
ping: () => Effect.succeed({}),
304-
initialize: Effect.fnUntraced(function*(params) {
83+
initialize(params) {
84+
console.error("MCP Server initialized with params:", params)
30585
const requestedVersion = params.protocolVersion
306-
return {
86+
return Effect.succeed({
30787
capabilities,
30888
serverInfo,
30989
protocolVersion: SUPPORTED_PROTOCOL_VERSIONS.includes(requestedVersion)
31090
? requestedVersion
31191
: LATEST_PROTOCOL_VERSION
312-
}
313-
}),
314-
"completion/complete": () => Effect.dieMessage("Not implemented"),
315-
"logging/setLevel": () => Effect.dieMessage("Not implemented"),
316-
"prompts/get": () => Effect.dieMessage("Not implemented"),
317-
"prompts/list": () => Effect.dieMessage("Not implemented"),
318-
"resources/list": () => Effect.dieMessage("Not implemented"),
319-
"resources/read": () => Effect.dieMessage("Not implemented"),
320-
"resources/subscribe": () => Effect.dieMessage("Not implemented"),
321-
"resources/unsubscribe": () => Effect.dieMessage("Not implemented"),
322-
"resources/templates/list": () => Effect.dieMessage("Not implemented"),
323-
"tools/call": () => Effect.dieMessage("Not implemented"),
324-
"tools/list": () => Effect.dieMessage("Not implemented"),
92+
})
93+
},
94+
"completion/complete": () => McpSchema.InternalError.notImplemented,
95+
"logging/setLevel": () => McpSchema.InternalError.notImplemented,
96+
"prompts/get": () => McpSchema.InternalError.notImplemented,
97+
"prompts/list": () => McpSchema.InternalError.notImplemented,
98+
"resources/list": () => McpSchema.InternalError.notImplemented,
99+
"resources/read": () => McpSchema.InternalError.notImplemented,
100+
"resources/subscribe": () => McpSchema.InternalError.notImplemented,
101+
"resources/unsubscribe": () => McpSchema.InternalError.notImplemented,
102+
"resources/templates/list": () => McpSchema.InternalError.notImplemented,
103+
"tools/call": () => McpSchema.InternalError.notImplemented,
104+
"tools/list": () => McpSchema.InternalError.notImplemented,
325105

326106
// Notifications
327107
"notifications/cancelled": (_) => Effect.void,
@@ -335,22 +115,10 @@ const ClientRpcHandlers = ClientRpcs.toLayer(
335115
export const layer = (
336116
serverInfo: Context.Tag.Service<McpServerImplementation>,
337117
options?: Context.Tag.Service<McpServerOptions>
338-
) => Layer.mergeAll(Layer.succeed(McpServerImplementation, serverInfo), Layer.succeed(McpServerOptions, options ?? {}))
339-
340-
// Usage
341-
//
342-
// const MainLayer = layer({
343-
// name: "Demo Server",
344-
// version: "1.0.0"
345-
// }, {
346-
// capabilities: {
347-
// logging: {},
348-
// prompts: {}
349-
// }
350-
// }).pipe(
351-
// Layer.provide(McpTransport.layerTransportStdio())
352-
// )
353-
354-
// Layer.launch(MainLayer).pipe(
355-
// Effect.runPromise
356-
// )
118+
) =>
119+
Layer.scopedDiscard(Effect.forkScoped(make)).pipe(
120+
Layer.provide([
121+
Layer.succeed(McpServerImplementation, serverInfo),
122+
Layer.succeed(McpServerOptions, options ?? {})
123+
])
124+
)

0 commit comments

Comments
 (0)