Skip to content

Commit 6ffb878

Browse files
committed
wip
1 parent 22b25a7 commit 6ffb878

File tree

3 files changed

+131
-74
lines changed

3 files changed

+131
-74
lines changed

packages/ai/ai/src/McpSchema.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1403,7 +1403,6 @@ export type SuccessEncoded<Group extends RpcGroup.Any> = RpcGroup.Rpcs<
14031403
> ? {
14041404
readonly _tag: "Success"
14051405
readonly id: string | number
1406-
readonly method: _Tag
14071406
readonly result: _Success["Encoded"]
14081407
}
14091408
: never
@@ -1420,7 +1419,6 @@ export type FailureEncoded<Group extends RpcGroup.Any> = RpcGroup.Rpcs<
14201419
> ? {
14211420
readonly _tag: "Failure"
14221421
readonly id: string | number
1423-
readonly method: _Tag
14241422
readonly error: _Error["Encoded"]
14251423
}
14261424
: never

packages/ai/ai/src/McpServer.ts

Lines changed: 128 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,29 @@
22
* @since 1.0.0
33
*/
44
import * as Headers from "@effect/platform/Headers"
5-
import * as Rpc from "@effect/rpc/Rpc"
5+
import type * as Rpc from "@effect/rpc/Rpc"
66
import type * as RpcGroup from "@effect/rpc/RpcGroup"
77
import { RequestId } from "@effect/rpc/RpcMessage"
88
import * as RpcServer from "@effect/rpc/RpcServer"
99
import type { NonEmptyReadonlyArray } from "effect/Array"
1010
import * as Cause from "effect/Cause"
1111
import * as Context from "effect/Context"
1212
import * as Effect from "effect/Effect"
13+
import * as Either from "effect/Either"
14+
import * as Exit from "effect/Exit"
1315
import * as Layer from "effect/Layer"
1416
import type { ParseError } from "effect/ParseResult"
1517
import { TreeFormatter } from "effect/ParseResult"
1618
import * as Schema from "effect/Schema"
1719
import * as Scope from "effect/Scope"
18-
import type { ClientRequestRpcs, Implementation, ServerCapabilities } from "./McpSchema.js"
19-
import { ClientRpcs } from "./McpSchema.js"
20+
import type { ClientRequestRpcs, FromServerEncoded, Implementation, ServerCapabilities } from "./McpSchema.js"
21+
import { ClientRpcs, InvalidParams, MethodNotFound } from "./McpSchema.js"
2022
import * as McpTransport from "./McpTransport.js"
2123

24+
// TODO:
25+
// - Finish error types
26+
// - Investigate encoded errors in transport.send
27+
2228
export class McpServer extends Context.Tag("@effect/ai/McpServer")<
2329
McpServer,
2430
{}
@@ -57,44 +63,51 @@ export const make = Effect.gen(function*() {
5763
type Schemas = {
5864
readonly decode: (u: unknown) => Effect.Effect<Rpc.Payload<Rpcs>, ParseError>
5965
readonly encodeChunk: (u: ReadonlyArray<unknown>) => Effect.Effect<NonEmptyReadonlyArray<unknown>, ParseError>
60-
readonly encodeExit: (u: unknown) => Effect.Effect<Schema.ExitEncoded<unknown, unknown, unknown>, ParseError>
66+
readonly encodeSuccess: (u: unknown) => Effect.Effect<unknown, ParseError>
67+
readonly encodeFailure: (u: unknown) => Effect.Effect<unknown, ParseError>
6168
readonly context: Context.Context<never>
6269
}
6370

71+
type Client = {
72+
readonly id: number
73+
readonly schemas: Map<RequestId, Schemas>
74+
}
75+
const clients = new Map<number, Client>()
76+
6477
const schemasCache = new WeakMap<any, Schemas>()
65-
const getSchemas = (rpc: Rpc.AnyWithProps) => {
66-
let schemas = schemasCache.get(rpc)
67-
if (!schemas) {
78+
const getRequestSchemas = (rpc: Rpc.AnyWithProps) => {
79+
let metadata = schemasCache.get(rpc)
80+
if (!metadata) {
6881
const entry = context.unsafeMap.get(rpc.key) as Rpc.Handler<Rpcs["_tag"]>
69-
schemas = {
82+
metadata = {
7083
decode: Schema.decodeUnknown(rpc.payloadSchema as any),
71-
encodeChunk: Schema.encodeUnknown(Schema.Any) as any,
72-
encodeExit: Schema.encodeUnknown(Rpc.exitSchema(rpc as any)) as any,
84+
encodeChunk: Schema.encodeUnknown(Schema.Array(Schema.Any)) as any,
85+
encodeSuccess: Schema.encodeUnknown(rpc.successSchema) as any,
86+
encodeFailure: Schema.encodeUnknown(rpc.errorSchema as any) as any,
7387
context: entry.context
7488
}
75-
schemasCache.set(rpc, schemas)
89+
schemasCache.set(rpc, metadata)
7690
}
77-
return schemas
91+
return metadata
7892
}
7993

80-
type Client = {
81-
readonly id: number
82-
readonly schemas: Map<RequestId, Schemas>
83-
}
84-
const clients = new Map<number, Client>()
85-
8694
const handleEncode = <A, R>(
8795
client: Client,
8896
requestId: RequestId,
89-
effect: Effect.Effect<A, ParseError, R>,
90-
onSuccess: (value: A) => any
97+
encode: Effect.Effect<A, ParseError, R>,
98+
onSuccess: (value: A) => FromServerEncoded | ReadonlyArray<FromServerEncoded>
9199
) =>
92100
Effect.catchAllCause(
93-
Effect.flatMap(effect, (value) => transport.send(client.id, onSuccess(value))),
101+
Effect.flatMap(encode, (value) => transport.send(client.id, onSuccess(value))),
94102
(cause) => {
95103
client.schemas.delete(requestId)
96104
const _defect = Cause.squash(Cause.map(cause, TreeFormatter.formatErrorSync))
97-
// TODO
105+
// TODO: send appropriate error message for failure to encode response
106+
// return transport.send(client.id, {
107+
// _tag: "Failure",
108+
// id: Number(requestId),
109+
// error: new
110+
// })
98111
return Effect.void
99112
}
100113
)
@@ -108,41 +121,49 @@ export const make = Effect.gen(function*() {
108121
if (!client) return Effect.void
109122
switch (response._tag) {
110123
case "Chunk": {
111-
const schemas = client.schemas.get(response.requestId)
112-
if (!schemas) return Effect.void
124+
const metadata = client.schemas.get(response.requestId)
125+
if (!metadata) return Effect.void
113126
return handleEncode(
114127
client,
115128
response.requestId,
116-
Effect.provide(schemas.encodeChunk(response.values), schemas.context),
129+
Effect.provide(metadata.encodeChunk(response.values), metadata.context),
117130
(results) =>
118131
results.map((result) => ({
119132
_tag: "Success",
120133
id: Number(response.requestId),
121-
method: "",
122-
result
134+
result: result as any
123135
}))
124136
)
125137
}
126138
case "Exit": {
127-
const schemas = client.schemas.get(response.requestId)
128-
if (!schemas) return Effect.void
139+
const metadata = client.schemas.get(response.requestId)
140+
if (!metadata) return Effect.void
129141
client.schemas.delete(response.requestId)
130142
return handleEncode(
131143
client,
132144
response.requestId,
133-
Effect.provide(schemas.encodeExit(response.exit), schemas.context),
134-
(exit) => {
135-
if (exit._tag === "Success") {
136-
return {
137-
_tag: "Success",
138-
id: Number(response.requestId),
139-
method: "",
140-
result: exit.value
141-
}
142-
}
143-
// TODO
144-
return Effect.void
145-
}
145+
Exit.match(response.exit, {
146+
onFailure: (cause): Effect.Effect<Either.Either<unknown, unknown>, ParseError> =>
147+
metadata.encodeFailure(Cause.squash(cause)).pipe(
148+
Effect.map((error) => Either.left(error))
149+
),
150+
onSuccess: (value): Effect.Effect<Either.Either<unknown, unknown>, ParseError> =>
151+
metadata.encodeSuccess(value).pipe(
152+
Effect.map((value) => Either.right(value))
153+
)
154+
}),
155+
Either.match({
156+
onLeft: (error) => ({
157+
_tag: "Failure",
158+
id: Number(response.requestId),
159+
error: error as any
160+
}),
161+
onRight: (result) => ({
162+
_tag: "Success",
163+
id: Number(response.requestId),
164+
result: result as any
165+
})
166+
})
146167
)
147168
}
148169
case "ClientEnd": {
@@ -170,23 +191,38 @@ export const make = Effect.gen(function*() {
170191

171192
const rpc = ClientRpcs.requests.get(request.method)
172193
if (!rpc) {
173-
// TODO: send error
174-
return Effect.void
194+
// For unknown notifications, ignore the request
195+
if (request._tag === "Notification") return
196+
// For unknown requests, respond with an error
197+
return yield* transport.send(clientId, {
198+
_tag: "Failure",
199+
id: request.id,
200+
error: new MethodNotFound({
201+
message: "Method not found",
202+
data: { method: request.method }
203+
})
204+
})
175205
}
176206

177207
switch (request._tag) {
178208
case "Request": {
179209
const requestId = RequestId(typeof request.id === "string" ? request.id : BigInt(request.id))
180-
const schemas = getSchemas(rpc)
210+
const metadata = getRequestSchemas(rpc)
181211
return yield* Effect.matchEffect(
182-
Effect.provide(schemas.decode(request.payload), schemas.context),
212+
Effect.provide(metadata.decode(request.payload), metadata.context),
183213
{
184214
onFailure: (_error) => {
185-
// TODO: send error
186-
return Effect.void
215+
return transport.send(clientId, {
216+
_tag: "Failure",
217+
id: request.id,
218+
error: new InvalidParams({
219+
message: `Invalid parameters for method: "${request.method}"`,
220+
data: { params: request.payload }
221+
})
222+
})
187223
},
188224
onSuccess: (payload) => {
189-
client.schemas.set(requestId, schemas)
225+
client.schemas.set(requestId, metadata)
190226
return server.write(client.id, {
191227
_tag: "Request",
192228
id: requestId,
@@ -215,42 +251,63 @@ export const make = Effect.gen(function*() {
215251
return {} as const
216252
})
217253

254+
const ClientRpcHandlers = ClientRpcs.toLayer(
255+
Effect.gen(function*() {
256+
const serverInfo = yield* McpServerImplementation
257+
const { capabilities = {} } = yield* McpServerOptions
258+
return {
259+
// Requests
260+
ping: () => Effect.succeed({}),
261+
initialize: Effect.fnUntraced(function*(params) {
262+
const requestedVersion = params.protocolVersion
263+
return {
264+
capabilities,
265+
serverInfo,
266+
protocolVersion: SUPPORTED_PROTOCOL_VERSIONS.includes(requestedVersion)
267+
? requestedVersion
268+
: LATEST_PROTOCOL_VERSION
269+
}
270+
}),
271+
"completion/complete": () => Effect.dieMessage("Not implemented"),
272+
"logging/setLevel": () => Effect.dieMessage("Not implemented"),
273+
"prompts/get": () => Effect.dieMessage("Not implemented"),
274+
"prompts/list": () => Effect.dieMessage("Not implemented"),
275+
"resources/list": () => Effect.dieMessage("Not implemented"),
276+
"resources/read": () => Effect.dieMessage("Not implemented"),
277+
"resources/subscribe": () => Effect.dieMessage("Not implemented"),
278+
"resources/unsubscribe": () => Effect.dieMessage("Not implemented"),
279+
"resources/templates/list": () => Effect.dieMessage("Not implemented"),
280+
"tools/call": () => Effect.dieMessage("Not implemented"),
281+
"tools/list": () => Effect.dieMessage("Not implemented"),
282+
283+
// Notifications
284+
"notifications/cancelled": () => Effect.dieMessage("Not implemented"),
285+
"notifications/initialized": () => Effect.dieMessage("Not implemented"),
286+
"notifications/progress": () => Effect.dieMessage("Not implemented"),
287+
"notifications/roots/list_changed": () => Effect.dieMessage("Not implemented")
288+
}
289+
})
290+
)
291+
218292
export const layer = (
219293
serverInfo: Context.Tag.Service<McpServerImplementation>,
220294
options?: Context.Tag.Service<McpServerOptions>
221295
) =>
222296
Layer.effect(McpServer, make).pipe(
223-
Layer.provide(ClientRpcs.toLayer(
224-
Effect.gen(function*() {
225-
const serverInfo = yield* McpServerImplementation
226-
const { capabilities = {} } = yield* McpServerOptions
227-
return {
228-
ping: () => Effect.succeed({}),
229-
initialize: Effect.fnUntraced(function*(params) {
230-
const requestedVersion = params.protocolVersion
231-
return {
232-
capabilities,
233-
serverInfo,
234-
protocolVersion: SUPPORTED_PROTOCOL_VERSIONS.includes(requestedVersion)
235-
? requestedVersion
236-
: LATEST_PROTOCOL_VERSION
237-
}
238-
}),
239-
"notifications/initialized": () => Effect.void
240-
// TODO: remove
241-
} as any
242-
})
243-
)),
297+
Layer.provide(ClientRpcHandlers),
244298
Layer.provideMerge(Layer.succeed(McpServerImplementation, serverInfo)),
245299
Layer.provideMerge(Layer.succeed(McpServerOptions, options ?? {}))
246300
)
247301

302+
// Usage
303+
248304
const MainLayer = layer({
249305
name: "Demo Server",
250306
version: "1.0.0"
251307
}, {
252308
capabilities: {
253-
logging: {}
309+
logging: {},
310+
prompts: {}
254311
}
255312
}).pipe(
256313
Layer.provide(McpTransport.layerTransportStdio())

packages/ai/ai/src/McpTransport.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ import type {
1919
} from "./McpSchema.js"
2020

2121
/**
22+
* Represents a transport which can send and receive messages.
23+
*
2224
* @since 1.0.0
2325
* @category Transport
2426
*/
@@ -111,7 +113,7 @@ export const makeTransportStdio = Effect.fnUntraced(function*(options?: {
111113
case "Notification": {
112114
encoded = parser.encode({
113115
jsonrpc: JSON_RPC_VERSION,
114-
method: message._tag,
116+
method: message.method,
115117
params: message.payload
116118
})
117119
break

0 commit comments

Comments
 (0)