Skip to content

Commit 3559496

Browse files
committed
Replace subscriptionConfigurationHash with RpcRequest in RpcSubscriptionsPlan
1 parent 06617d9 commit 3559496

File tree

11 files changed

+58
-124
lines changed

11 files changed

+58
-124
lines changed

.changeset/silly-wombats-switch.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@solana/rpc-subscriptions-spec': patch
3+
'@solana/rpc-subscriptions-api': patch
4+
'@solana/rpc-subscriptions': patch
5+
---
6+
7+
Replace subscriptionConfigurationHash with RpcRequest in RpcSubscriptionPlan

packages/rpc-subscriptions-api/package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@
7272
],
7373
"dependencies": {
7474
"@solana/addresses": "workspace:*",
75-
"@solana/fast-stable-stringify": "workspace:*",
7675
"@solana/keys": "workspace:*",
7776
"@solana/rpc-types": "workspace:*",
7877
"@solana/rpc-subscriptions-spec": "workspace:*",

packages/rpc-subscriptions-api/src/index.ts

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import fastStableStringify from '@solana/fast-stable-stringify';
21
import {
32
createRpcSubscriptionsApi,
43
executeRpcPubSubSubscriptionPlan,
@@ -56,21 +55,15 @@ function createSolanaRpcSubscriptionsApi_INTERNAL<TApi extends RpcSubscriptionsA
5655
allowedNumericKeyPaths: getAllowedNumericKeypaths(),
5756
});
5857
return createRpcSubscriptionsApi<TApi>({
59-
getSubscriptionConfigurationHash(request) {
60-
return fastStableStringify([request.methodName, request.params]);
61-
},
6258
planExecutor({ request, ...rest }) {
63-
const transformedRequest = requestTransformer(request);
6459
return executeRpcPubSubSubscriptionPlan({
6560
...rest,
6661
responseTransformer,
67-
subscribeRequest: {
68-
...transformedRequest,
69-
methodName: transformedRequest.methodName.replace(/Notifications$/, 'Subscribe'),
70-
},
71-
unsubscribeMethodName: transformedRequest.methodName.replace(/Notifications$/, 'Unsubscribe'),
62+
subscribeRequest: { ...request, methodName: request.methodName.replace(/Notifications$/, 'Subscribe') },
63+
unsubscribeMethodName: request.methodName.replace(/Notifications$/, 'Unsubscribe'),
7264
});
7365
},
66+
requestTransformer,
7467
});
7568
}
7669

packages/rpc-subscriptions-spec/src/__tests__/rpc-subscriptions-api-test.ts

Lines changed: 8 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ describe('createRpcSubscriptionsApi', () => {
1010
it('calls the plan executor with the expected params', () => {
1111
const mockPlanExecutor = jest.fn().mockResolvedValue({
1212
executeSubscriptionPlan: jest.fn(),
13-
subscriptionConfigurationHash: 'MOCK_HASH',
13+
request: { methodName: 'foo', params: [] },
1414
} as RpcSubscriptionsPlan<unknown>);
1515
const api = createRpcSubscriptionsApi({ planExecutor: mockPlanExecutor });
1616
const expectedParams = [1, 'hi', 3];
@@ -28,48 +28,19 @@ describe('createRpcSubscriptionsApi', () => {
2828
});
2929
});
3030
});
31-
describe('subscriptionConfigurationHash', () => {
32-
it('does not call the hash creator before it is accessed', () => {
33-
const mockGetSubscriptionConfigurationHash = jest.fn();
34-
const api = createRpcSubscriptionsApi({
35-
getSubscriptionConfigurationHash: mockGetSubscriptionConfigurationHash,
36-
planExecutor: jest.fn(),
37-
});
38-
api.foo('hi');
39-
expect(mockGetSubscriptionConfigurationHash).not.toHaveBeenCalled();
40-
});
41-
it('calls the hash creator when it is accessed', () => {
42-
const mockGetSubscriptionConfigurationHash = jest.fn();
43-
const api = createRpcSubscriptionsApi({
44-
getSubscriptionConfigurationHash: mockGetSubscriptionConfigurationHash,
45-
planExecutor: jest.fn(),
46-
});
47-
const result = api.foo('hi');
48-
result.subscriptionConfigurationHash;
49-
expect(mockGetSubscriptionConfigurationHash).toHaveBeenCalledWith({
50-
methodName: 'foo',
51-
params: ['hi'],
52-
});
53-
});
54-
it('memoizes the result of the hash creator', () => {
55-
const mockGetSubscriptionConfigurationHash = jest.fn();
56-
const api = createRpcSubscriptionsApi({
57-
getSubscriptionConfigurationHash: mockGetSubscriptionConfigurationHash,
58-
planExecutor: jest.fn(),
59-
});
31+
describe('rpcRequest', () => {
32+
it('provides the initial request object by default', () => {
33+
const api = createRpcSubscriptionsApi({ planExecutor: jest.fn() });
6034
const result = api.foo('hi');
61-
result.subscriptionConfigurationHash;
62-
result.subscriptionConfigurationHash;
63-
expect(mockGetSubscriptionConfigurationHash).toHaveBeenCalledTimes(1);
35+
expect(result.request).toEqual({ methodName: 'foo', params: ['hi'] });
6436
});
65-
it('returns the result of the hash creator', () => {
66-
const mockGetSubscriptionConfigurationHash = jest.fn().mockReturnValue('MOCK_HASH');
37+
it('provides the transformed request object when a request transformer is provided', () => {
6738
const api = createRpcSubscriptionsApi({
68-
getSubscriptionConfigurationHash: mockGetSubscriptionConfigurationHash,
6939
planExecutor: jest.fn(),
40+
requestTransformer: jest.fn().mockReturnValue({ methodName: 'bar', params: [1, 2, 3] }),
7041
});
7142
const result = api.foo('hi');
72-
expect(result.subscriptionConfigurationHash).toBe('MOCK_HASH');
43+
expect(result.request).toEqual({ methodName: 'bar', params: [1, 2, 3] });
7344
});
7445
});
7546
});

packages/rpc-subscriptions-spec/src/rpc-subscriptions-api.ts

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
import { Callable, RpcRequest } from '@solana/rpc-spec-types';
1+
import { Callable, RpcRequest, RpcRequestTransformer } from '@solana/rpc-spec-types';
22
import { DataPublisher } from '@solana/subscribable';
33

44
import { RpcSubscriptionsChannel } from './rpc-subscriptions-channel';
55
import { RpcSubscriptionsTransportDataEvents } from './rpc-subscriptions-transport';
66

77
export type RpcSubscriptionsApiConfig<TApiMethods extends RpcSubscriptionsApiMethods> = Readonly<{
8-
getSubscriptionConfigurationHash?: (request: RpcRequest) => string | undefined;
98
planExecutor: RpcSubscriptionsPlanExecutor<ReturnType<TApiMethods[keyof TApiMethods]>>;
9+
requestTransformer?: RpcRequestTransformer;
1010
}>;
1111

1212
type RpcSubscriptionsPlanExecutor<TNotification> = (
@@ -28,10 +28,11 @@ export type RpcSubscriptionsPlan<TNotification> = Readonly<{
2828
}>,
2929
) => Promise<DataPublisher<RpcSubscriptionsTransportDataEvents<TNotification>>>;
3030
/**
31-
* This hash uniquely identifies the configuration of a subscription. It is typically used by
32-
* consumers of this API to deduplicate multiple subscriptions for the same notification.
31+
* This request is used to uniquely identify the subscription.
32+
* It typically comes from the method name and parameters of the subscription call,
33+
* after potentially being transformed by the RPC Subscriptions API.
3334
*/
34-
subscriptionConfigurationHash: string | undefined;
35+
request: RpcRequest;
3536
}>;
3637

3738
export type RpcSubscriptionsApi<TRpcSubscriptionMethods> = {
@@ -50,8 +51,6 @@ export interface RpcSubscriptionsApiMethods {
5051
[methodName: string]: RpcSubscriptionsApiMethod;
5152
}
5253

53-
const UNINITIALIZED = Symbol();
54-
5554
export function createRpcSubscriptionsApi<TRpcSubscriptionsApiMethods extends RpcSubscriptionsApiMethods>(
5655
config: RpcSubscriptionsApiConfig<TRpcSubscriptionsApiMethods>,
5756
): RpcSubscriptionsApi<TRpcSubscriptionsApiMethods> {
@@ -74,18 +73,13 @@ export function createRpcSubscriptionsApi<TRpcSubscriptionsApiMethods extends Rp
7473
: never
7574
>
7675
): RpcSubscriptionsPlan<ReturnType<TRpcSubscriptionsApiMethods[TNotificationName]>> {
77-
let _cachedSubscriptionHash: string | typeof UNINITIALIZED | undefined = UNINITIALIZED;
78-
const request = { methodName, params };
76+
const rawRequest = { methodName, params };
77+
const request = config.requestTransformer ? config.requestTransformer(rawRequest) : rawRequest;
7978
return {
8079
executeSubscriptionPlan(planConfig) {
8180
return config.planExecutor({ ...planConfig, request });
8281
},
83-
get subscriptionConfigurationHash() {
84-
if (_cachedSubscriptionHash === UNINITIALIZED) {
85-
_cachedSubscriptionHash = config?.getSubscriptionConfigurationHash?.(request);
86-
}
87-
return _cachedSubscriptionHash;
88-
},
82+
request,
8983
};
9084
};
9185
},

packages/rpc-subscriptions/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,4 @@ Given an `RpcSubscriptionsChannel`, will return a new channel that sends a ping
5151

5252
### `getRpcSubscriptionsTransportWithSubscriptionCoalescing(transport)`
5353

54-
Given an `RpcSubscriptionsTransport`, will return a new transport that coalesces identical subscriptions into a single subscription request to the server. The determination of whether a subscription is the same as another is based on the `subscriptionConfigurationHash` returned by its `RpcSubscriptionsPlan`. The subscription will only be aborted once all subscribers abort, or there is an error.
54+
Given an `RpcSubscriptionsTransport`, will return a new transport that coalesces identical subscriptions into a single subscription request to the server. The determination of whether a subscription is the same as another is based on the `rpcRequest` returned by its `RpcSubscriptionsPlan`. The subscription will only be aborted once all subscribers abort, or there is an error.

packages/rpc-subscriptions/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
],
7373
"dependencies": {
7474
"@solana/errors": "workspace:*",
75+
"@solana/fast-stable-stringify": "workspace:*",
7576
"@solana/functional": "workspace:*",
7677
"@solana/promises": "workspace:*",
7778
"@solana/rpc-subscriptions-api": "workspace:*",

packages/rpc-subscriptions/src/__tests__/rpc-subscriptions-coalescer-test.ts

Lines changed: 19 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,17 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => {
2020
mockInnerTransport.mockResolvedValue(expectedDataPublisher);
2121
const config = {
2222
executeSubscriptionPlan: jest.fn(),
23+
request: { methodName: 'foo', params: [] },
2324
signal: new AbortController().signal,
24-
subscriptionConfigurationHash: 'MOCK_HASH',
2525
};
2626
const transportPromise = coalescedTransport(config);
2727
await expect(transportPromise).resolves.toBe(expectedDataPublisher);
2828
});
2929
it('passes the `executeSubscriptionPlan` config to the inner transport', () => {
3030
const config = {
3131
executeSubscriptionPlan: jest.fn(),
32+
request: { methodName: 'foo', params: [] },
3233
signal: new AbortController().signal,
33-
subscriptionConfigurationHash: 'MOCK_HASH',
3434
};
3535
coalescedTransport(config).catch(() => {});
3636
expect(mockInnerTransport).toHaveBeenCalledWith(
@@ -39,16 +39,16 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => {
3939
}),
4040
);
4141
});
42-
it('passes the `subscriptionConfigurationHash` config to the inner transport', () => {
42+
it('passes the `rpcRequest` config to the inner transport', () => {
4343
const config = {
4444
executeSubscriptionPlan: jest.fn(),
45+
request: { methodName: 'foo', params: [] },
4546
signal: new AbortController().signal,
46-
subscriptionConfigurationHash: 'MOCK_HASH',
4747
};
4848
coalescedTransport(config).catch(() => {});
4949
expect(mockInnerTransport).toHaveBeenCalledWith(
5050
expect.objectContaining({
51-
subscriptionConfigurationHash: 'MOCK_HASH',
51+
request: { methodName: 'foo', params: [] },
5252
}),
5353
);
5454
});
@@ -58,11 +58,11 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => {
5858
signal: new AbortController().signal,
5959
};
6060
coalescedTransport({
61-
subscriptionConfigurationHash: 'MOCK_HASH_A',
61+
request: { methodName: 'methodA', params: [] },
6262
...config,
6363
}).catch(() => {});
6464
coalescedTransport({
65-
subscriptionConfigurationHash: 'MOCK_HASH_B',
65+
request: { methodName: 'methodB', params: [] },
6666
...config,
6767
}).catch(() => {});
6868
expect(mockInnerTransport).toHaveBeenCalledTimes(2);
@@ -73,46 +73,15 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => {
7373
executeSubscriptionPlan: jest.fn(),
7474
signal: new AbortController().signal,
7575
};
76-
await coalescedTransport({ ...config, subscriptionConfigurationHash: 'MOCK_HASH_A' });
77-
await coalescedTransport({ ...config, subscriptionConfigurationHash: 'MOCK_HASH_B' });
78-
expect(mockInnerTransport).toHaveBeenCalledTimes(2);
79-
});
80-
it("calls the inner transport once per subscriber when both subscribers' hashes are `undefined`, in the same runloop", () => {
81-
const config = {
82-
executeSubscriptionPlan: jest.fn(),
83-
signal: new AbortController().signal,
84-
};
85-
coalescedTransport({
86-
subscriptionConfigurationHash: undefined,
87-
...config,
88-
}).catch(() => {});
89-
coalescedTransport({
90-
subscriptionConfigurationHash: undefined,
91-
...config,
92-
}).catch(() => {});
93-
expect(mockInnerTransport).toHaveBeenCalledTimes(2);
94-
});
95-
it("calls the inner transport once per subscriber when both subscribers' hashes are `undefined`, in different runloops", async () => {
96-
expect.assertions(1);
97-
const config = {
98-
executeSubscriptionPlan: jest.fn(),
99-
signal: new AbortController().signal,
100-
};
101-
await coalescedTransport({
102-
subscriptionConfigurationHash: undefined,
103-
...config,
104-
});
105-
await coalescedTransport({
106-
subscriptionConfigurationHash: undefined,
107-
...config,
108-
});
76+
await coalescedTransport({ ...config, request: { methodName: 'methodA', params: [] } });
77+
await coalescedTransport({ ...config, request: { methodName: 'methodB', params: [] } });
10978
expect(mockInnerTransport).toHaveBeenCalledTimes(2);
11079
});
11180
it('only calls the inner transport once, in the same runloop', () => {
11281
const config = {
11382
executeSubscriptionPlan: jest.fn(),
83+
request: { methodName: 'foo', params: [] },
11484
signal: new AbortController().signal,
115-
subscriptionConfigurationHash: 'MOCK_HASH',
11685
};
11786
coalescedTransport(config).catch(() => {});
11887
coalescedTransport(config).catch(() => {});
@@ -122,8 +91,8 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => {
12291
expect.assertions(1);
12392
const config = {
12493
executeSubscriptionPlan: jest.fn(),
94+
request: { methodName: 'foo', params: [] },
12595
signal: new AbortController().signal,
126-
subscriptionConfigurationHash: 'MOCK_HASH',
12796
};
12897
await coalescedTransport(config);
12998
await coalescedTransport(config);
@@ -133,8 +102,8 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => {
133102
expect.assertions(1);
134103
const config = {
135104
executeSubscriptionPlan: jest.fn(),
105+
request: { methodName: 'foo', params: [] },
136106
signal: new AbortController().signal,
137-
subscriptionConfigurationHash: 'MOCK_HASH',
138107
};
139108
const [publisherA, publisherB] = await Promise.all([coalescedTransport(config), coalescedTransport(config)]);
140109
expect(publisherA).toBe(publisherB);
@@ -143,8 +112,8 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => {
143112
expect.assertions(1);
144113
const config = {
145114
executeSubscriptionPlan: jest.fn(),
115+
request: { methodName: 'foo', params: [] },
146116
signal: new AbortController().signal,
147-
subscriptionConfigurationHash: 'MOCK_HASH',
148117
};
149118
const publisherA = await coalescedTransport(config);
150119
const publisherB = await coalescedTransport(config);
@@ -154,7 +123,7 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => {
154123
jest.useFakeTimers();
155124
const config = {
156125
executeSubscriptionPlan: jest.fn(),
157-
subscriptionConfigurationHash: 'MOCK_HASH',
126+
request: { methodName: 'foo', params: [] },
158127
};
159128
const abortControllerB = new AbortController();
160129
coalescedTransport({ ...config, signal: new AbortController().signal }).catch(() => {});
@@ -167,7 +136,7 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => {
167136
jest.useFakeTimers();
168137
const config = {
169138
executeSubscriptionPlan: jest.fn(),
170-
subscriptionConfigurationHash: 'MOCK_HASH',
139+
request: { methodName: 'foo', params: [] },
171140
};
172141
const abortControllerA = new AbortController();
173142
const abortControllerB = new AbortController();
@@ -182,7 +151,7 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => {
182151
expect.assertions(1);
183152
const config = {
184153
executeSubscriptionPlan: jest.fn(),
185-
subscriptionConfigurationHash: 'MOCK_HASH',
154+
request: { methodName: 'foo', params: [] },
186155
};
187156
const abortControllerA = new AbortController();
188157
const abortControllerB = new AbortController();
@@ -198,7 +167,7 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => {
198167
it('does not fire the inner abort signal if the subscriber count is non zero at the end of the runloop, despite having aborted all in the middle of it', () => {
199168
const config = {
200169
executeSubscriptionPlan: jest.fn(),
201-
subscriptionConfigurationHash: 'MOCK_HASH',
170+
request: { methodName: 'foo', params: [] },
202171
};
203172
const abortControllerA = new AbortController();
204173
coalescedTransport({ ...config, signal: abortControllerA.signal }).catch(() => {});
@@ -212,7 +181,7 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => {
212181
jest.useFakeTimers();
213182
const config = {
214183
executeSubscriptionPlan: jest.fn(),
215-
subscriptionConfigurationHash: 'MOCK_HASH',
184+
request: { methodName: 'foo', params: [] },
216185
};
217186
coalescedTransport({ ...config, signal: new AbortController().signal }).catch(() => {});
218187
await jest.runAllTimersAsync();
@@ -225,7 +194,7 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => {
225194
jest.useFakeTimers();
226195
const config = {
227196
executeSubscriptionPlan: jest.fn(),
228-
subscriptionConfigurationHash: 'MOCK_HASH',
197+
request: { methodName: 'foo', params: [] },
229198
};
230199
const abortControllerA = new AbortController();
231200
/**

0 commit comments

Comments
 (0)