Skip to content

Commit daa4afd

Browse files
Implement semaphores
Co-authored-by: Mark Wubben <[email protected]>
1 parent ec9eb5e commit daa4afd

File tree

6 files changed

+818
-11
lines changed

6 files changed

+818
-11
lines changed

Diff for: README.md

+66
Original file line numberDiff line numberDiff line change
@@ -81,3 +81,69 @@ const reserved = await context.reserve(1, 2, 3);
8181
// `reserved` will be an array containing those values that could be reserved.
8282
// It could be empty.
8383
```
84+
85+
### Semaphores
86+
87+
You can create a [counting semaphore](https://www.guru99.com/semaphore-in-operating-system.html) within a shared context:
88+
89+
```js
90+
const initialValue = 3; // Must be a non-negative integer.
91+
const semaphore = context.createSemaphore('my-semaphore', initialValue);
92+
```
93+
94+
Within the same context, semaphores with the same ID must be created with the same initial value. Semaphores created with a different value are unusable. Their methods will reject with a `SemaphoreCreationError`.
95+
96+
Semaphores have two methods: `acquire()` and `acquireNow()`. Use `acquire()` to decrement the semaphore's value. If the semaphore's value would become negative, instead `acquire()` waits until the semaphore's value is high enough.
97+
98+
```js
99+
const semaphore = context.createSemaphore('my-semaphore', 3);
100+
const release = await semaphore.acquire();
101+
```
102+
103+
`acquire()` returns a function, `release()`, which increments the semaphore's value by the same amount as was acquired.
104+
105+
The semaphore is _managed_: if you don't call `release()`, it'll be run automatically when the test worker exits. Any pending `acquire()` calls will also be removed from the queue at this time.
106+
107+
`acquireNow()` works like `acquire()`, except that if the semaphore can't be decremented immediately, `acquireNow()` rejects with a `SemaphoreDownError` rather than wait.
108+
109+
Semaphores are _weighted_. `acquire()` and `acquireNow()` accept a non-negative integer amount, defaulting to `1`, by which to decrement or increment the value:
110+
111+
```js
112+
await semaphore.acquire(0);
113+
await semaphore.acquireNow(2);
114+
```
115+
116+
You can also pass an amount to `release()` to release just part of the acquisition at a time:
117+
118+
```js
119+
const release = await semaphore.acquire(3); // Decrements the semaphore by 3
120+
release(1); // Increments the semaphore by 1
121+
release(); // Increments the semaphore by the remaining 2
122+
```
123+
124+
`acquire()` calls resolve in FIFO order. If the current value is `1`, and a call tries to acquire `2`, subsequent `acquire()` calls have to wait, even if they want to acquire just `1`.
125+
126+
`acquireNow()` skips the queue and decrements immediately if possible.
127+
128+
#### Lower-level, unmanaged semaphores
129+
130+
You can create a lower-level, _unmanaged_ semaphore which doesn't have any auto-release behavior. Instead you need to increment the semaphore in code.
131+
132+
```js
133+
const initialValue = 3; // Must be a non-negative integer.
134+
const semaphore = context.createUnmanagedSemaphore('my-semaphore', initialValue);
135+
```
136+
137+
Unmanaged semaphores mustn't use the same ID as a managed semaphore, within the same context. Semaphores with the same ID must be created with the same initial value. Mismatched managed and unmanaged semaphores, or those created with different values are unusable. Their methods will reject with a `SemaphoreCreationError`.
138+
139+
Unmanaged semaphores have three methods. `down()` and `downNow()` decrement the value and `up()` increments:
140+
141+
```js
142+
await semaphore.down(0);
143+
await semaphore.downNow(2);
144+
await semaphore.up(); // `amount` defaults to 1
145+
```
146+
147+
Like the `acquire()` and `acquireNow()` methods of managed semaphores, `down()` waits for the semaphore's value to be at least the requested amount, while `downNow()` rejects with `SemaphoreDownError` if the value cannot be decremented immediately.
148+
149+
These unmanaged semaphores do not release the "acquired" amount when a test worker exits.

Diff for: source/index.ts

+206-6
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import path from 'path';
2-
import {registerSharedWorker} from 'ava/plugin';
2+
import {registerSharedWorker, SharedWorker} from 'ava/plugin';
33
import never from 'never';
44

5-
import {Data, MessageType} from './types';
5+
import {Data, MessageType, SemaphoreCreationFailed} from './types';
6+
7+
type ReceivedMessage = SharedWorker.Plugin.Experimental.ReceivedMessage<Data>;
68

79
const protocol = registerSharedWorker<Data>({
810
filename: path.join(__dirname, 'worker.js'),
@@ -17,7 +19,7 @@ export class Lock {
1719
}
1820

1921
async acquire(): Promise<() => void> {
20-
// Allow reserve() to be called before the shared worker is availabe.
22+
// Allow acquire() to be called before the shared worker is availabe.
2123
await protocol.available;
2224

2325
const message = protocol.publish({
@@ -35,7 +37,6 @@ export class Lock {
3537
}
3638
}
3739

38-
/* c8 ignore next 2 */
3940
// The above loop will never actually break if the lock is not acquired.
4041
return never();
4142
}
@@ -63,7 +64,6 @@ export class Lock {
6364
}
6465
}
6566

66-
/* c8 ignore next 2 */
6767
// The above loop will never actually break if the lock is not acquired.
6868
return never();
6969
}
@@ -79,13 +79,214 @@ export class LockAcquisitionError extends Error {
7979
}
8080
}
8181

82+
export class ManagedSemaphore {
83+
readonly #context: SharedContext;
84+
85+
constructor(
86+
context: SharedContext,
87+
public readonly id: string,
88+
public readonly initialValue: number
89+
) {
90+
if (initialValue < 0 || !Number.isSafeInteger(initialValue)) {
91+
throw new RangeError('initialValue must be a non-negative safe integer');
92+
}
93+
94+
this.#context = context;
95+
}
96+
97+
async acquire(amount = 1) {
98+
if (amount < 0 || !Number.isSafeInteger(amount)) {
99+
throw new RangeError('amount must be a non-negative safe integer');
100+
}
101+
102+
// Allow acquire() to be called before the shared worker is availabe.
103+
await protocol.available;
104+
105+
const reply = await downSemaphore(this, this.#context.id, amount, true);
106+
return (release = amount) => {
107+
if (release < 0 || !Number.isSafeInteger(release) || release > amount) {
108+
throw new RangeError('Amount to release must be a non-negative safe integer and <= remaining amount');
109+
}
110+
111+
amount -= release;
112+
reply.reply({
113+
type: MessageType.SEMAPHORE_RELEASE,
114+
amount: release
115+
});
116+
};
117+
}
118+
119+
async acquireNow(amount = 1) {
120+
if (amount < 0 || !Number.isSafeInteger(amount)) {
121+
throw new RangeError('amount must be a non-negative safe integer');
122+
}
123+
124+
// Down immediately, which will fail if the protocol is not available.
125+
// "Now" should not mean "wait until we're ready."
126+
127+
const reply = await downSemaphore(this, this.#context.id, amount, false);
128+
return (release = amount) => {
129+
if (release < 0 || release > amount) {
130+
throw new RangeError('Amount to release must be >= 0 and <= remaining amount');
131+
}
132+
133+
amount -= release;
134+
reply.reply({
135+
type: MessageType.SEMAPHORE_RELEASE,
136+
amount: release
137+
});
138+
};
139+
}
140+
}
141+
142+
export class UnmanagedSemaphore {
143+
readonly #context: SharedContext;
144+
145+
constructor(
146+
context: SharedContext,
147+
public readonly id: string,
148+
public readonly initialValue: number
149+
) {
150+
if (initialValue < 0 || !Number.isSafeInteger(initialValue)) {
151+
throw new RangeError('initialValue must be a non-negative safe integer');
152+
}
153+
154+
this.#context = context;
155+
}
156+
157+
async down(amount = 1) {
158+
if (amount < 0 || !Number.isSafeInteger(amount)) {
159+
throw new RangeError('amount must be a non-negative safe integer');
160+
}
161+
162+
// Allow down() to be called before the shared worker is availabe.
163+
await protocol.available;
164+
165+
await downSemaphore(this, this.#context.id, amount, true);
166+
}
167+
168+
async downNow(amount = 1) {
169+
if (amount < 0 || !Number.isSafeInteger(amount)) {
170+
throw new RangeError('amount must be a non-negative safe integer');
171+
}
172+
173+
// Down immediately, which will fail if the protocol is not available.
174+
// "Now" should not mean "wait until we're ready."
175+
176+
await downSemaphore(this, this.#context.id, amount, false);
177+
}
178+
179+
async up(amount = 1) {
180+
if (amount < 0 || !Number.isSafeInteger(amount)) {
181+
throw new RangeError('amount must be a non-negative safe integer');
182+
}
183+
184+
// Allow up() to be called before the shared worker is availabe.
185+
await protocol.available;
186+
187+
const {id, initialValue} = this;
188+
const message = protocol.publish({
189+
type: MessageType.SEMAPHORE_UP,
190+
contextId: this.#context.id,
191+
semaphore: {managed: false, id, initialValue},
192+
amount
193+
});
194+
195+
for await (const reply of message.replies()) {
196+
if (reply.data.type === MessageType.SEMAPHORE_SUCCEEDED) {
197+
return;
198+
}
199+
200+
if (reply.data.type === MessageType.SEMAPHORE_CREATION_FAILED) {
201+
throw new SemaphoreCreationError(this, reply.data);
202+
}
203+
}
204+
205+
// The above loop will never actually break if the resources are not acquired.
206+
return never();
207+
}
208+
}
209+
210+
type Semaphore = ManagedSemaphore | UnmanagedSemaphore;
211+
212+
async function downSemaphore(semaphore: Semaphore, contextId: string, amount: number, wait: boolean): Promise<ReceivedMessage> {
213+
const {id, initialValue} = semaphore;
214+
const message = protocol.publish({
215+
type: MessageType.SEMAPHORE_DOWN,
216+
contextId,
217+
semaphore: {managed: semaphore instanceof ManagedSemaphore, id, initialValue},
218+
amount,
219+
wait
220+
});
221+
222+
for await (const reply of message.replies()) {
223+
if (reply.data.type === MessageType.SEMAPHORE_SUCCEEDED) {
224+
return reply;
225+
}
226+
227+
if (reply.data.type === MessageType.SEMAPHORE_FAILED) {
228+
throw new SemaphoreDownError(id, amount);
229+
}
230+
231+
if (reply.data.type === MessageType.SEMAPHORE_CREATION_FAILED) {
232+
throw new SemaphoreCreationError(semaphore, reply.data);
233+
}
234+
}
235+
236+
// The above loop will never actually break if the resources are not acquired.
237+
return never();
238+
}
239+
240+
export class SemaphoreDownError extends Error {
241+
get name() {
242+
return 'SemaphoreDownError';
243+
}
244+
245+
constructor(public readonly semaphoreId: string, public readonly amount: number) {
246+
super(`Could not immediately decrement with ${amount}`);
247+
}
248+
}
249+
250+
export class SemaphoreCreationError extends Error {
251+
readonly semaphoreId: string;
252+
253+
get name() {
254+
return 'SemaphoreCreationError';
255+
}
256+
257+
constructor(semaphore: Semaphore, {initialValue, managed}: SemaphoreCreationFailed) {
258+
const initialValueSuffix = `initial value ${semaphore.initialValue} (got ${initialValue})`;
259+
if (semaphore instanceof ManagedSemaphore) {
260+
if (managed) {
261+
super(`Failed to create semaphore: expected ${initialValueSuffix}`);
262+
} else {
263+
super(`Failed to create semaphore: expected unmanaged and ${initialValueSuffix}`);
264+
}
265+
} else if (managed) {
266+
super(`Failed to create unmanaged semaphore: expected managed and ${initialValueSuffix}`);
267+
} else {
268+
super(`Failed to create unmanaged semaphore: expected ${initialValueSuffix}`);
269+
}
270+
271+
this.semaphoreId = semaphore.id;
272+
}
273+
}
274+
82275
export class SharedContext {
83276
constructor(public readonly id: string) {}
84277

85278
createLock(id: string): Lock {
86279
return new Lock(this, id);
87280
}
88281

282+
createSemaphore(id: string, initialValue: number): ManagedSemaphore {
283+
return new ManagedSemaphore(this, id, initialValue);
284+
}
285+
286+
createUnmanagedSemaphore(id: string, initialValue: number): UnmanagedSemaphore {
287+
return new UnmanagedSemaphore(this, id, initialValue);
288+
}
289+
89290
async reserve<T extends bigint | number | string>(...values: T[]): Promise<T[]> {
90291
// Allow reserve() to be called before the shared worker is availabe.
91292
await protocol.available;
@@ -102,7 +303,6 @@ export class SharedContext {
102303
}
103304
}
104305

105-
/* c8 ignore next 2 */
106306
// The above loop will never actually break if the lock is not acquired.
107307
return never();
108308
}

Diff for: source/types.d.ts

+44-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@ export const enum MessageType {
55
LOCK_RELEASE = 13,
66
RESERVE = 20,
77
RESERVED_INDEXES = 21,
8+
SEMAPHORE_CREATION_FAILED = 30,
9+
SEMAPHORE_DOWN = 31,
10+
SEMAPHORE_FAILED = 32,
11+
SEMAPHORE_RELEASE = 33,
12+
SEMAPHORE_SUCCEEDED = 34,
13+
SEMAPHORE_UP = 35,
814
}
915

1016
export type Lock = {
@@ -33,6 +39,43 @@ export type ReservedIndexes = {
3339
indexes: number[];
3440
};
3541

42+
type SemaphoreSetup = {
43+
id: string;
44+
initialValue: number;
45+
managed: boolean;
46+
};
47+
48+
export type SemaphoreDown = {
49+
type: MessageType.SEMAPHORE_DOWN;
50+
contextId: string;
51+
semaphore: SemaphoreSetup;
52+
amount: number;
53+
wait: boolean;
54+
};
55+
56+
export type SemaphoreUp = {
57+
type: MessageType.SEMAPHORE_UP;
58+
contextId: string;
59+
semaphore: SemaphoreSetup;
60+
amount: number;
61+
};
62+
63+
export type SemaphoreRelease = {
64+
type: MessageType.SEMAPHORE_RELEASE;
65+
amount: number;
66+
};
67+
68+
export type SemaphoreResult = {
69+
type: MessageType.SEMAPHORE_SUCCEEDED | MessageType.SEMAPHORE_FAILED;
70+
};
71+
72+
export type SemaphoreCreationFailed = {
73+
type: MessageType.SEMAPHORE_CREATION_FAILED;
74+
initialValue: number;
75+
managed: boolean;
76+
};
77+
3678
export type Data =
3779
Lock | Locked | LockRelease |
38-
Reservation | ReservedIndexes;
80+
Reservation | ReservedIndexes |
81+
SemaphoreDown | SemaphoreResult | SemaphoreUp | SemaphoreRelease | SemaphoreCreationFailed;

0 commit comments

Comments
 (0)