Skip to content

Commit a9385bd

Browse files
authored
subscriptions support in the query planner (#2389)
* Subscriptions support in the query planner
1 parent 1782766 commit a9385bd

File tree

13 files changed

+491
-50
lines changed

13 files changed

+491
-50
lines changed

.changeset/forty-hairs-flow.md

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
"@apollo/composition": minor
3+
"@apollo/query-planner": minor
4+
---
5+
6+
Addition of new query planner node types to enable federated subscriptions support
7+

.changeset/many-rats-allow.md

-11
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
import { FEDERATION2_LINK_WITH_FULL_IMPORTS, FieldDefinition } from '@apollo/federation-internals';
2+
import gql from 'graphql-tag';
3+
import { composeServices } from '../compose';
4+
5+
describe('subscription composition tests', () => {
6+
it('type subscription appears in the supergraph', () => {
7+
const subgraphA = {
8+
name: 'subgraphA',
9+
typeDefs: gql`
10+
extend schema
11+
${FEDERATION2_LINK_WITH_FULL_IMPORTS}
12+
type Query {
13+
me: User!
14+
}
15+
16+
type Subscription {
17+
onNewUser: User!
18+
}
19+
20+
type User {
21+
id: ID!
22+
name: String!
23+
}
24+
`,
25+
};
26+
27+
const subgraphB = {
28+
name: 'subgraphB',
29+
typeDefs:gql`
30+
extend schema
31+
${FEDERATION2_LINK_WITH_FULL_IMPORTS}
32+
type Query {
33+
foo: Int
34+
}
35+
36+
type Subscription {
37+
bar: Int
38+
}
39+
`,
40+
};
41+
42+
const { errors, schema } = composeServices([subgraphA, subgraphB]);
43+
expect(errors).toBeUndefined();
44+
expect(schema).toBeDefined();
45+
const onNewUser = schema?.elementByCoordinate('Subscription.onNewUser') as FieldDefinition<any>;
46+
expect(onNewUser.appliedDirectives?.[0].toString()).toBe('@join__field(graph: SUBGRAPHA)');
47+
});
48+
49+
it.each([
50+
{ directive: '@shareable', errorMsg: 'Fields on root level subscription object cannot be marked as shareable'},
51+
])('directives that are incompatible with subscriptions wont compose', ({ directive, errorMsg }) => {
52+
const subgraphA = {
53+
name: 'subgraphA',
54+
typeDefs: gql`
55+
extend schema
56+
${FEDERATION2_LINK_WITH_FULL_IMPORTS}
57+
type Query {
58+
me: User!
59+
}
60+
61+
type Subscription {
62+
onNewUser: User! ${directive}
63+
}
64+
65+
type User {
66+
id: ID!
67+
name: String!
68+
}
69+
`,
70+
};
71+
72+
const subgraphB = {
73+
name: 'subgraphB',
74+
typeDefs:gql`
75+
extend schema
76+
${FEDERATION2_LINK_WITH_FULL_IMPORTS}
77+
type Query {
78+
foo: Int
79+
}
80+
81+
type Subscription {
82+
bar: Int
83+
}
84+
`,
85+
};
86+
87+
const { errors, schema } = composeServices([subgraphA, subgraphB]);
88+
expect(errors?.length).toBe(1);
89+
expect(errors?.[0].message).toBe(errorMsg);
90+
expect(schema).toBeUndefined();
91+
});
92+
93+
it('subscription name collisions across subgraphs should not compose', () => {
94+
const subgraphA = {
95+
name: 'subgraphA',
96+
typeDefs: gql`
97+
extend schema
98+
${FEDERATION2_LINK_WITH_FULL_IMPORTS}
99+
type Query {
100+
me: User!
101+
}
102+
103+
type Subscription {
104+
onNewUser: User
105+
foo: Int!
106+
}
107+
108+
type User {
109+
id: ID!
110+
name: String!
111+
}
112+
`,
113+
};
114+
115+
const subgraphB = {
116+
name: 'subgraphB',
117+
typeDefs:gql`
118+
extend schema
119+
${FEDERATION2_LINK_WITH_FULL_IMPORTS}
120+
type Query {
121+
foo: Int
122+
}
123+
124+
type Subscription {
125+
foo: Int!
126+
}
127+
`,
128+
};
129+
130+
const { errors, schema } = composeServices([subgraphA, subgraphB]);
131+
expect(errors?.length).toBe(1);
132+
expect(errors?.[0].message).toBe('Non-shareable field "Subscription.foo" is resolved from multiple subgraphs: it is resolved from subgraphs "subgraphA" and "subgraphB" and defined as non-shareable in all of them');
133+
expect(schema).toBeUndefined();
134+
});
135+
});

composition-js/src/merging/merge.ts

+37-4
Original file line numberDiff line numberDiff line change
@@ -791,6 +791,7 @@ class Merger {
791791
private mergeObject(sources: (ObjectType | undefined)[], dest: ObjectType) {
792792
const isEntity = this.hintOnInconsistentEntity(sources, dest);
793793
const isValueType = !isEntity && !dest.isRootType();
794+
const isSubscription = dest.isSubscriptionRootType();
794795

795796
this.addFieldsShallow(sources, dest);
796797
if (!dest.hasFields()) {
@@ -806,7 +807,15 @@ class Merger {
806807
const subgraphFields = sources.map(t => t?.field(destField.name));
807808
const mergeContext = this.validateOverride(subgraphFields, destField);
808809

809-
this.mergeField(subgraphFields, destField, mergeContext);
810+
if (isSubscription) {
811+
this.validateSubscriptionField(subgraphFields);
812+
}
813+
814+
this.mergeField({
815+
sources: subgraphFields,
816+
dest: destField,
817+
mergeContext,
818+
});
810819
this.validateFieldSharing(subgraphFields, destField, mergeContext);
811820
}
812821
}
@@ -1085,7 +1094,7 @@ class Merger {
10851094
const { subgraphsWithOverride, subgraphMap } = sources.map((source, idx) => {
10861095
if (!source) {
10871096
// While the subgraph may not have the field directly, it could have "stand-in" for that field
1088-
// through @interfaceObject, and it is those stand-ins that would be effectively overridden.
1097+
// through @interfaceObject, and it is those stand-ins that would be effectively overridden.
10891098
const interfaceObjectAbstractingFields = this.fieldsInSourceIfAbstractedByInterfaceObject(dest, idx);
10901099
if (interfaceObjectAbstractingFields.length > 0) {
10911100
return {
@@ -1251,7 +1260,15 @@ class Merger {
12511260
}).filter(isDefined);
12521261
}
12531262

1254-
private mergeField(sources: FieldOrUndefinedArray, dest: FieldDefinition<any>, mergeContext: FieldMergeContext = new FieldMergeContext(sources)) {
1263+
private mergeField({
1264+
sources,
1265+
dest,
1266+
mergeContext = new FieldMergeContext(sources),
1267+
}: {
1268+
sources: FieldOrUndefinedArray,
1269+
dest: FieldDefinition<any>,
1270+
mergeContext: FieldMergeContext,
1271+
}) {
12551272
if (sources.every((s, i) => s === undefined ? this.fieldsInSourceIfAbstractedByInterfaceObject(dest, i).every((f) => this.isExternal(i, f)) : this.isExternal(i, s))) {
12561273
const definingSubgraphs = sources.map((source, i) => {
12571274
if (source) {
@@ -1790,7 +1807,11 @@ class Merger {
17901807
}
17911808
const subgraphFields = sources.map(t => t?.field(destField.name));
17921809
const mergeContext = this.validateOverride(subgraphFields, destField);
1793-
this.mergeField(subgraphFields, destField, mergeContext);
1810+
this.mergeField({
1811+
sources: subgraphFields,
1812+
dest: destField,
1813+
mergeContext,
1814+
});
17941815
}
17951816
}
17961817

@@ -2805,4 +2826,16 @@ class Merger {
28052826
: err;
28062827
});
28072828
}
2829+
2830+
private validateSubscriptionField(sources: FieldOrUndefinedArray) {
2831+
// no subgraph marks field as @shareable
2832+
const fieldsWithShareable = sources.filter((src, idx) => src && src.appliedDirectivesOf(this.metadata(idx).shareableDirective()).length > 0);
2833+
if (fieldsWithShareable.length > 0) {
2834+
const nodes = sourceASTs(...fieldsWithShareable);
2835+
this.errors.push(ERRORS.INVALID_FIELD_SHARING.err(
2836+
`Fields on root level subscription object cannot be marked as shareable`,
2837+
{ nodes},
2838+
));
2839+
}
2840+
}
28082841
}

federation-integration-testsuite-js/src/matchers/toCallService.ts

+7-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { QueryPlan, PlanNode } from '@apollo/query-planner';
1+
import { QueryPlan, PlanNode, SubscriptionNode } from '@apollo/query-planner';
22
import { astSerializer, queryPlanSerializer } from '../snapshotSerializers';
33
import prettyFormat from 'pretty-format';
44

@@ -41,9 +41,9 @@ function toCallService(
4141
let pass = false;
4242
// let initialServiceCall = null;
4343
// recurse the node, find first match of service name, return
44-
function walkExecutionNode(node?: PlanNode) {
44+
function walkExecutionNode(node?: PlanNode | SubscriptionNode) {
4545
if (!node) return;
46-
if (node.kind === 'Fetch' && node.serviceName === service) {
46+
if ((node.kind === 'Fetch') && node.serviceName === service) {
4747
pass = true;
4848
// initialServiceCall = node;
4949
return;
@@ -56,6 +56,10 @@ function toCallService(
5656
case 'Sequence':
5757
node.nodes.forEach(walkExecutionNode);
5858
break;
59+
case 'Subscription':
60+
walkExecutionNode(node.primary);
61+
walkExecutionNode(node.rest);
62+
break;
5963
default:
6064
return;
6165
}

gateway-js/src/executeQueryPlan.ts

+4
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,10 @@ export async function executeQueryPlan(
129129
requestContext.metrics && requestContext.metrics.captureTraces
130130
);
131131

132+
if (queryPlan.node?.kind === 'Subscription') {
133+
throw new Error('Execution of subscriptions not supported by gateway');
134+
}
135+
132136
if (queryPlan.node) {
133137
const traceNode = await executeNode(
134138
context,

internals-js/src/definitions.ts

+8
Original file line numberDiff line numberDiff line change
@@ -2091,6 +2091,14 @@ export class ObjectType extends FieldBasedType<ObjectType, ObjectTypeReferencer>
20912091
return schema.schemaDefinition.root('query')?.type === this;
20922092
}
20932093

2094+
/**
2095+
* Whether this type is the "subscription" root type of the schema (will return false if the type is detached).
2096+
*/
2097+
isSubscriptionRootType(): boolean {
2098+
const schema = this.schema();
2099+
return schema.schemaDefinition.root('subscription')?.type === this;
2100+
}
2101+
20942102
protected removeReferenceRecursive(ref: ObjectTypeReferencer): void {
20952103
// Note that the ref can also be a`SchemaDefinition`, but don't have anything to do then.
20962104
switch (ref.kind) {

query-planner-js/src/QueryPlan.ts

+11-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ export type ResponsePath = (string | number)[];
1111

1212
export interface QueryPlan {
1313
kind: 'QueryPlan';
14-
node?: PlanNode;
14+
node?: PlanNode | SubscriptionNode;
1515
}
1616

1717
export type PlanNode = SequenceNode | ParallelNode | FetchNode | FlattenNode | DeferNode | ConditionNode;
@@ -26,6 +26,12 @@ export interface ParallelNode {
2626
nodes: PlanNode[];
2727
}
2828

29+
export interface SubscriptionNode {
30+
kind: 'Subscription';
31+
primary: FetchNode;
32+
rest?: PlanNode;
33+
}
34+
2935
export interface FetchNode {
3036
kind: 'Fetch';
3137
serviceName: string;
@@ -229,3 +235,7 @@ export const trimSelectionNodes = (
229235

230236
return remapped;
231237
};
238+
239+
export const isPlanNode = (node: PlanNode | SubscriptionNode | undefined): node is PlanNode => {
240+
return !!node && node.kind !== 'Subscription';
241+
}

query-planner-js/src/__tests__/buildPlan.interfaceObject.test.ts

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
import { operationFromDocument } from '@apollo/federation-internals';
1+
import { assert, operationFromDocument } from '@apollo/federation-internals';
22
import gql from 'graphql-tag';
3+
import { isPlanNode } from '../QueryPlan';
34
import { composeAndCreatePlanner, findFetchNodes } from "./testHelper";
45

56
describe('basic @key on interface/@interfaceObject handling', () => {
@@ -306,6 +307,7 @@ describe('basic @key on interface/@interfaceObject handling', () => {
306307
}
307308
`);
308309

310+
assert(isPlanNode(plan.node), 'buildQueryPlan should return QueryPlan');
309311
const rewrites = findFetchNodes('S2', plan.node)[0].inputRewrites;
310312
expect(rewrites).toBeDefined();
311313
expect(rewrites?.length).toBe(1);

0 commit comments

Comments
 (0)