Skip to content

Commit a5d5866

Browse files
authored
Revert "Implement locking stream pull to avoid multiple catalyst pulling the … (#2108)" (#2114)
This reverts commit 55d9116.
1 parent 55aee64 commit a5d5866

File tree

4 files changed

+1
-127
lines changed

4 files changed

+1
-127
lines changed

packages/api/src/controllers/stream.test.ts

-83
Original file line numberDiff line numberDiff line change
@@ -524,77 +524,6 @@ describe("controllers/stream", () => {
524524
expect(server.db.stream.addDefaultFields(document)).toEqual(stream);
525525
});
526526

527-
it("should lock pull for a new stream", async () => {
528-
// Create stream pull
529-
const now = Date.now();
530-
const res = await client.put("/stream/pull", postMockPullStream);
531-
expect(res.status).toBe(201);
532-
const stream = await res.json();
533-
534-
// Request pull lock
535-
const resLockPull = await client.post(`/stream/${stream.id}/lockPull`);
536-
expect(resLockPull.status).toBe(204);
537-
538-
// Check that the pullLockedAt is marked with the correct date
539-
const res2 = await client.get(`/stream/${stream.id}`);
540-
expect(res2.status).toBe(200);
541-
const stream2 = await res2.json();
542-
expect(stream2.pullLockedAt).toBeGreaterThan(now);
543-
});
544-
545-
it("should not lock pull for an active stream", async () => {
546-
// Create stream pull
547-
const res = await client.put("/stream/pull", postMockPullStream);
548-
expect(res.status).toBe(201);
549-
const stream = await res.json();
550-
551-
// Mark stream as active
552-
await db.stream.update(stream.id, { isActive: true });
553-
554-
// Requesting pull lock should fail, because the stream is active (so it should be replicated instead of being pulled)
555-
const reslockPull = await client.post(`/stream/${stream.id}/lockPull`);
556-
expect(reslockPull.status).toBe(423);
557-
});
558-
559-
it("should not lock pull for already locked pull", async () => {
560-
// Create stream pull
561-
const res = await client.put("/stream/pull", postMockPullStream);
562-
expect(res.status).toBe(201);
563-
const stream = await res.json();
564-
565-
// Request pull lock by many processes at the same time, only one should acquire a lock
566-
const promises = [];
567-
for (let i = 0; i < 10; i++) {
568-
promises.push(client.post(`/stream/${stream.id}/lockPull`));
569-
}
570-
const resPulls = await Promise.all(promises);
571-
expect(resPulls.filter((r) => r.status === 204).length).toBe(1);
572-
expect(resPulls.filter((r) => r.status === 423).length).toBe(9);
573-
});
574-
575-
it("should lock pull for already locked pull if lease has expired", async () => {
576-
// Create stream pull
577-
const res = await client.put("/stream/pull", postMockPullStream);
578-
expect(res.status).toBe(201);
579-
const stream = await res.json();
580-
581-
// Request pull lock
582-
const resLockPull = await client.post(`/stream/${stream.id}/lockPull`, {
583-
leaseTimeout: 1,
584-
});
585-
expect(resLockPull.status).toBe(204);
586-
587-
// Wait until lease has expired
588-
await sleep(1);
589-
590-
// Request pull lock should succeed, because the lock lease has expired (so we assume the stream is not being pulled at the moment)
591-
const resLockPull2 = await client.post(
592-
`/stream/${stream.id}/lockPull`,
593-
{ leaseTimeout: 1 }
594-
);
595-
expect(resLockPull2.status).toBe(204);
596-
});
597-
598527
it("should update a stream if it has the same pull source", async () => {
599528
let res = await client.put("/stream/pull", postMockPullStream);
600529
expect(res.status).toBe(201);
@@ -831,18 +760,6 @@ describe("controllers/stream", () => {
831760
expect(updatedStream.lastSeen).toBeGreaterThan(timeBeforeBump);
832761
});
833762

834-
it("start pull should update lastPullAt", async () => {
835-
const stream = await createAndActivateStream();
836-
const timeBeforeBump = Date.now();
837-
expect(stream.lastSeen).toBeLessThan(timeBeforeBump);
838-
839-
const res = await client.post(`/stream/${stream.id}/heartbeat`);
840-
841-
expect(res.status).toBe(204);
842-
const updatedStream = await server.db.stream.get(stream.id);
843-
expect(updatedStream.lastSeen).toBeGreaterThan(timeBeforeBump);
844-
});
845-
846763
it("should allow changing the mist host as well", async () => {
847764
const stream = await createAndActivateStream();
848765

packages/api/src/controllers/stream.ts

-35
Original file line numberDiff line numberDiff line change
@@ -1086,41 +1086,6 @@ app.put(
10861086
}
10871087
);
10881088

1089-
app.post("/:id/lockPull", authorizer({ anyAdmin: true }), async (req, res) => {
1090-
const { id } = req.params;
1091-
let { leaseTimeout } = req.body;
1092-
if (!leaseTimeout) {
1093-
// Sets the default lock lease to 60s
1094-
leaseTimeout = 60 * 1000;
1095-
}
1096-
logger.info(`got /lockPull for stream=${id}`);
1097-
1098-
const stream = await db.stream.get(id, { useReplica: false });
1099-
if (!stream || (stream.deleted && !req.user.admin)) {
1100-
res.status(404);
1101-
return res.json({ errors: ["not found"] });
1102-
}
1103-
if (stream.isActive) {
1104-
return res.status(423).end();
1105-
}
1106-
1107-
const updateRes = await db.stream.update(
1108-
[
1109-
sql`id = ${stream.id}`,
1110-
sql`COALESCE((data->>'pullLockedAt')::bigint,0) < ${
1111-
Date.now() - leaseTimeout
1112-
}`,
1113-
],
1114-
{ pullLockedAt: Date.now() },
1115-
{ throwIfEmpty: false }
1116-
);
1117-
1118-
if (updateRes.rowCount > 0) {
1119-
res.status(204).end();
1120-
}
1121-
res.status(423).end();
1122-
});
1123-
11241089
function terminateDelay(stream: DBStream) {
11251090
if (!stream.lastTerminatedAt) {
11261091
return 0;

packages/api/src/schema/db-schema.yaml

-3
Original file line numberDiff line numberDiff line change
@@ -702,9 +702,6 @@ components:
702702
properties:
703703
source:
704704
index: true
705-
pullLockedAt:
706-
type: number
707-
example: 1587667174725
708705
playbackId:
709706
unique: true
710707
mistHost:

packages/api/src/store/stream-table.ts

+1-6
Original file line numberDiff line numberDiff line change
@@ -408,12 +408,7 @@ export default class StreamTable extends Table<DBStream> {
408408
}
409409
}
410410

411-
const adminOnlyFields = [
412-
"mistHost",
413-
"broadcasterHost",
414-
"createdByTokenId",
415-
"pullLockedAt",
416-
];
411+
const adminOnlyFields = ["mistHost", "broadcasterHost", "createdByTokenId"];
417412

418413
const privateFields = [
419414
"recordObjectStoreId",

0 commit comments

Comments
 (0)