Skip to content

Add demand control for uplink #1564

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/source/api/apollo-gateway.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,21 @@ Provide this field **only** if you are using managed federation.
</tr>


<tr>
<td>

###### `fallbackPollIntervalInMs` (managed mode only)

`number`
</td>
<td>

Specify this option as a fallback if Uplink fails to provide a polling interval. This will also take effect if `fallbackPollIntervalInMs` is greater than the Uplink defined interval.

</td>
</tr>


<tr>
<td>

Expand Down
3 changes: 2 additions & 1 deletion gateway-js/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ This CHANGELOG pertains only to Apollo Federation packages in the 2.x range. The

> The changes noted within this `vNEXT` section have not been released yet. New PRs and commits which introduce changes should include an entry in this `vNEXT` section as part of their development. When a release is being prepared, a new header will be (manually) created below and the appropriate changes within that release will be moved into the new section.

- _Nothing yet. Stay tuned!_
- Respect the `minDelaySeconds` returning from Uplink when polling and retrying to fetch the supergraph schema from Uplink [PR #1564](https://github.com/apollographql/federation/pull/1564)
- Remove the previously deprecated `experimental_pollInterval` config option and deprecate `pollIntervalInMs` in favour of `fallbackPollIntervalInMs` (for managed mode only). [PR #1564](https://github.com/apollographql/federation/pull/1564)

## v2.0.0-preview.2

Expand Down
2 changes: 1 addition & 1 deletion gateway-js/src/__generated__/graphqlTypes.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions gateway-js/src/__tests__/gateway/lifecycle-hooks.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,15 @@ describe('lifecycle hooks', () => {

const [firstCall, secondCall] = mockDidUpdate.mock.calls;

// Note that we've composing our usual test fixtures here
// Note that we've composing our usual test fixtures here
const expectedFirstId = createHash('sha256').update(getTestingSupergraphSdl()).digest('hex');
expect(firstCall[0]!.compositionId).toEqual(expectedFirstId);
// first call should have no second "previous" argument
expect(firstCall[1]).toBeUndefined();

// Note that this assertion is a tad fragile in that every time we modify
// the supergraph (even just formatting differences), this ID will change
// and this test will have to updated.
// and this test will have to updated.
expect(secondCall[0]!.compositionId).toEqual(
'3ca7f295b11b070d1e1b56a698cbfabb70cb2b5912a4ff0ecae2fb91e8709838',
);
Expand Down Expand Up @@ -183,7 +183,7 @@ describe('lifecycle hooks', () => {
);
});

it('registers schema change callbacks when experimental_pollInterval is set for unmanaged configs', async () => {
it('registers schema change callbacks when pollIntervalInMs is set for unmanaged configs', async () => {
const experimental_updateServiceDefinitions: Experimental_UpdateServiceDefinitions =
jest.fn(async (_config) => {
return { serviceDefinitions, isNewSchema: true };
Expand Down
11 changes: 0 additions & 11 deletions gateway-js/src/__tests__/integration/configuration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -444,15 +444,4 @@ describe('deprecation warnings', () => {
'The `schemaConfigDeliveryEndpoint` option is deprecated and will be removed in a future version of `@apollo/gateway`. Please migrate to the equivalent (array form) `uplinkEndpoints` configuration option.',
);
});

it('warns with `experimental_pollInterval` option set', async () => {
new ApolloGateway({
experimental_pollInterval: 10000,
logger,
});

expect(logger.warn).toHaveBeenCalledWith(
'The `experimental_pollInterval` option is deprecated and will be removed in a future version of `@apollo/gateway`. Please migrate to the equivalent `pollIntervalInMs` configuration option.',
);
});
});
17 changes: 11 additions & 6 deletions gateway-js/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ export interface ServiceDefinitionUpdate {
export interface SupergraphSdlUpdate {
id: string;
supergraphSdl: string;
minDelaySeconds?: number;
}

export function isSupergraphSdlUpdate(
Expand Down Expand Up @@ -125,11 +126,6 @@ interface GatewayConfigBase {
// experimental observability callbacks
experimental_didResolveQueryPlan?: Experimental_DidResolveQueryPlanCallback;
experimental_didUpdateSupergraph?: Experimental_DidUpdateSupergraphCallback;
/**
* @deprecated use `pollIntervalInMs` instead
*/
experimental_pollInterval?: number;
pollIntervalInMs?: number;
experimental_approximateQueryPlanStoreMiB?: number;
experimental_autoFragmentization?: boolean;
fetcher?: typeof fetch;
Expand All @@ -150,6 +146,7 @@ export interface ServiceListGatewayConfig extends GatewayConfigBase {
| ((
service: ServiceEndpointDefinition,
) => Promise<HeadersInit> | HeadersInit);
pollIntervalInMs?: number;
}

export interface ManagedGatewayConfig extends GatewayConfigBase {
Expand All @@ -168,6 +165,11 @@ export interface ManagedGatewayConfig extends GatewayConfigBase {
*/
uplinkEndpoints?: string[];
uplinkMaxRetries?: number;
/**
* @deprecated use `fallbackPollIntervalInMs` instead
*/
pollIntervalInMs?: number;
fallbackPollIntervalInMs?: number;
}

// TODO(trevor:removeServiceList): migrate users to `supergraphSdl` function option
Expand All @@ -176,6 +178,7 @@ interface ManuallyManagedServiceDefsGatewayConfig extends GatewayConfigBase {
* @deprecated: use `supergraphSdl` instead (either as a `SupergraphSdlHook` or `SupergraphManager`)
*/
experimental_updateServiceDefinitions: Experimental_UpdateServiceDefinitions;
pollIntervalInMs?: number;
}

// TODO(trevor:removeServiceList): migrate users to `supergraphSdl` function option
Expand All @@ -185,6 +188,7 @@ interface ExperimentalManuallyManagedSupergraphSdlGatewayConfig
* @deprecated: use `supergraphSdl` instead (either as a `SupergraphSdlHook` or `SupergraphManager`)
*/
experimental_updateSupergraphSdl: Experimental_UpdateSupergraphSdl;
pollIntervalInMs?: number;
}

export function isManuallyManagedSupergraphSdlGatewayConfig(
Expand Down Expand Up @@ -238,7 +242,7 @@ type ManuallyManagedGatewayConfig =
| ManuallyManagedServiceDefsGatewayConfig
| ExperimentalManuallyManagedSupergraphSdlGatewayConfig
| ManuallyManagedSupergraphSdlGatewayConfig
// TODO(trevor:removeServiceList
// TODO(trevor:removeServiceList)
| ServiceListGatewayConfig;

// TODO(trevor:removeServiceList)
Expand Down Expand Up @@ -322,6 +326,7 @@ export function isManagedConfig(
return (
'schemaConfigDeliveryEndpoint' in config ||
'uplinkEndpoints' in config ||
'fallbackPollIntervalInMs' in config ||
(!isLocalConfig(config) &&
!isStaticSupergraphSdlConfig(config) &&
!isManuallyManagedConfig(config))
Expand Down
19 changes: 13 additions & 6 deletions gateway-js/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,12 @@ export class ApolloGateway implements GraphQLService {
this.experimental_didUpdateSupergraph =
config?.experimental_didUpdateSupergraph;

this.pollIntervalInMs =
config?.pollIntervalInMs ?? config?.experimental_pollInterval;
if (isManagedConfig(this.config)) {
this.pollIntervalInMs =
this.config.fallbackPollIntervalInMs ?? this.config.pollIntervalInMs;
} else if (isServiceListConfig(this.config)) {
this.pollIntervalInMs = this.config?.pollIntervalInMs;
}

this.issueConfigurationWarningsIfApplicable();

Expand Down Expand Up @@ -252,7 +256,7 @@ export class ApolloGateway implements GraphQLService {
'Polling Apollo services at a frequency of less than once per 10 ' +
'seconds (10000) is disallowed. Instead, the minimum allowed ' +
'pollInterval of 10000 will be used. Please reconfigure your ' +
'`pollIntervalInMs` accordingly. If this is problematic for ' +
'`fallbackPollIntervalInMs` accordingly. If this is problematic for ' +
'your team, please contact support.',
);
}
Expand Down Expand Up @@ -286,9 +290,11 @@ export class ApolloGateway implements GraphQLService {
);
}

if ('experimental_pollInterval' in this.config) {
if (isManagedConfig(this.config) && 'pollIntervalInMs' in this.config) {
this.logger.warn(
'The `experimental_pollInterval` option is deprecated and will be removed in a future version of `@apollo/gateway`. Please migrate to the equivalent `pollIntervalInMs` configuration option.',
'The `pollIntervalInMs` option is deprecated and will be removed in a future version of `@apollo/gateway`. ' +
'Please migrate to the equivalent `fallbackPollIntervalInMs` configuration option. ' +
'The poll interval is now defined by Uplink, this option will only be used if it is greater than the value defined by Uplink or as a fallback.',
);
}
}
Expand Down Expand Up @@ -406,7 +412,7 @@ export class ApolloGateway implements GraphQLService {
subgraphHealthCheck: this.config.serviceHealthCheck,
fetcher: this.fetcher,
logger: this.logger,
pollIntervalInMs: this.pollIntervalInMs ?? 10000,
fallbackPollIntervalInMs: this.pollIntervalInMs ?? 10000,
}),
);
}
Expand All @@ -424,6 +430,7 @@ export class ApolloGateway implements GraphQLService {
schema: this.schema!,
executor: this.executor,
};

}

private getUplinkEndpoints(config: ManagedGatewayConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ describe('loadSupergraphSdlFromStorage', () => {
compositionId: "originalId-1234",
maxRetries: 1,
roundRobinSeed: 0,
earliestFetchTime: null,
});

expect(result).toMatchObject({
Expand All @@ -88,6 +89,7 @@ describe('loadSupergraphSdlFromStorage', () => {
compositionId: "originalId-1234",
maxRetries: 1,
roundRobinSeed: 0,
earliestFetchTime: null,
}),
).rejects.toThrowError(
new UplinkFetcherError(
Expand Down Expand Up @@ -388,10 +390,49 @@ describe("loadSupergraphSdlFromUplinks", () => {
compositionId: "id-1234",
maxRetries: 5,
roundRobinSeed: 0,
earliestFetchTime: null,
});

expect(result).toBeNull();
expect(fetcher).toHaveBeenCalledTimes(1);
});

it("Waits the correct time before retrying", async () => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't use mocked timers here because there's an issue with jest and nock that me and Jeffery Burt couldn't figure out how to solve. Using real timers here as a workaround and not writing a test for the polling/math.max with pollIntervalInMs for this reason.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const timeoutSpy = jest.spyOn(global, 'setTimeout');

mockSupergraphSdlRequest('originalId-1234', mockCloudConfigUrl1).reply(500);
mockSupergraphSdlRequestIfAfter('originalId-1234', mockCloudConfigUrl2).reply(
200,
JSON.stringify({
data: {
routerConfig: {
__typename: 'RouterConfigResult',
id: 'originalId-1234',
supergraphSdl: getTestingSupergraphSdl()
},
},
}),
);
const fetcher = getDefaultFetcher();

await loadSupergraphSdlFromUplinks({
graphRef,
apiKey,
endpoints: [mockCloudConfigUrl1, mockCloudConfigUrl2],
errorReportingEndpoint: undefined,
fetcher: fetcher,
compositionId: "originalId-1234",
maxRetries: 1,
roundRobinSeed: 0,
earliestFetchTime: new Date(Date.now() + 1000),
});

// test if setTimeout was called with a value in range to deal with time jitter
const setTimeoutCall = timeoutSpy.mock.calls[1][1];
expect(setTimeoutCall).toBeLessThanOrEqual(1000);
expect(setTimeoutCall).toBeGreaterThanOrEqual(900);

timeoutSpy.mockRestore();
});
});

24 changes: 19 additions & 5 deletions gateway-js/src/supergraphManagers/UplinkFetcher/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { SubgraphHealthCheckFunction, SupergraphSdlUpdateFunction } from '../..'
import { loadSupergraphSdlFromUplinks } from './loadSupergraphSdlFromStorage';

export interface UplinkFetcherOptions {
pollIntervalInMs: number;
fallbackPollIntervalInMs: number;
subgraphHealthCheck?: boolean;
graphRef: string;
apiKey: string;
Expand All @@ -31,6 +31,8 @@ export class UplinkFetcher implements SupergraphManager {
process.env.APOLLO_OUT_OF_BAND_REPORTER_ENDPOINT ?? undefined;
private compositionId?: string;
private fetchCount: number = 0;
private minDelayMs: number | null = null;
private earliestFetchTime: Date | null = null;

constructor(options: UplinkFetcherOptions) {
this.config = options;
Expand All @@ -46,7 +48,12 @@ export class UplinkFetcher implements SupergraphManager {

let initialSupergraphSdl: string | null = null;
try {
initialSupergraphSdl = await this.updateSupergraphSdl();
const result = await this.updateSupergraphSdl();
initialSupergraphSdl = result?.supergraphSdl || null;
if (result?.minDelaySeconds) {
this.minDelayMs = 1000 * result?.minDelaySeconds;
this.earliestFetchTime = new Date(Date.now() + this.minDelayMs);
}
} catch (e) {
this.logUpdateFailure(e);
throw e;
Expand Down Expand Up @@ -83,6 +90,7 @@ export class UplinkFetcher implements SupergraphManager {
compositionId: this.compositionId ?? null,
maxRetries: this.config.maxRetries,
roundRobinSeed: this.fetchCount++,
earliestFetchTime: this.earliestFetchTime,
});

if (!result) {
Expand All @@ -91,7 +99,8 @@ export class UplinkFetcher implements SupergraphManager {
this.compositionId = result.id;
// the healthCheck fn is only assigned if it's enabled in the config
await this.healthCheck?.(result.supergraphSdl);
return result.supergraphSdl;
const { supergraphSdl, minDelaySeconds } = result;
return { supergraphSdl, minDelaySeconds };
}
}

Expand All @@ -107,7 +116,12 @@ export class UplinkFetcher implements SupergraphManager {

this.state.pollingPromise = pollingPromise;
try {
const maybeNewSupergraphSdl = await this.updateSupergraphSdl();
const result = await this.updateSupergraphSdl();
const maybeNewSupergraphSdl = result?.supergraphSdl || null;
if (result?.minDelaySeconds) {
this.minDelayMs = 1000 * result?.minDelaySeconds;
this.earliestFetchTime = new Date(Date.now() + this.minDelayMs);
}
if (maybeNewSupergraphSdl) {
this.update?.(maybeNewSupergraphSdl);
}
Expand All @@ -118,7 +132,7 @@ export class UplinkFetcher implements SupergraphManager {
}

this.poll();
}, this.config.pollIntervalInMs);
}, this.minDelayMs ? Math.max(this.minDelayMs, this.config.fallbackPollIntervalInMs) : this.config.fallbackPollIntervalInMs);
}

private logUpdateFailure(e: any) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export const SUPERGRAPH_SDL_QUERY = /* GraphQL */`#graphql
... on RouterConfigResult {
id
supergraphSdl: supergraphSDL
minDelaySeconds
}
... on FetchError {
code
Expand Down Expand Up @@ -56,6 +57,7 @@ export async function loadSupergraphSdlFromUplinks({
compositionId,
maxRetries,
roundRobinSeed,
earliestFetchTime,
}: {
graphRef: string;
apiKey: string;
Expand All @@ -65,6 +67,7 @@ export async function loadSupergraphSdlFromUplinks({
compositionId: string | null;
maxRetries: number,
roundRobinSeed: number,
earliestFetchTime: Date | null
}) : Promise<SupergraphSdlUpdate | null> {
// This Promise resolves with either an updated supergraph or null if no change.
// This Promise can reject in the case that none of the retries are successful,
Expand All @@ -81,6 +84,10 @@ export async function loadSupergraphSdlFromUplinks({
}),
{
retries: maxRetries,
onRetry: async () => {
const delayMS = earliestFetchTime ? earliestFetchTime.getTime() - Date.now(): 0;
if (delayMS > 0) await new Promise(resolve => setTimeout(resolve, delayMS));
}
},
);

Expand Down Expand Up @@ -176,9 +183,10 @@ export async function loadSupergraphSdlFromStorage({
const {
id,
supergraphSdl,
minDelaySeconds,
// messages,
} = routerConfig;
return { id, supergraphSdl: supergraphSdl! };
return { id, supergraphSdl: supergraphSdl!, minDelaySeconds };
} else if (routerConfig.__typename === 'FetchError') {
// FetchError case
const { code, message } = routerConfig;
Expand Down