Skip to content

Commit bc809e4

Browse files
victorgesthomshutt
andauthored
api: Create PUT /stream/pull API for idempotent pull stream (#2024)
* api/schema: Make lat/lon required fields * api: Create separate PUT /stream/pull endpoint * api: Filter by userId and non-deleted on queries scary * api: Update stream fields on PUT /pull API * api: Wait until stream is active after trigger * api: Return 200 when the stream already existed * api: Allow querying by creatorId/pull.source * api: Allow deduping streams by creatorId * api/test: Fix asset indexes test * api/test: Add tests for new API * api: Fix response not to omit default fields We had an object with a couple of undefined fields which then didn't get the default added by addDefaultFields Fix it by re-reading the obj from the DB, which will not only get the actually serialized version (undefined is omited) but get the actual final state in the db (maybe someone wrote at the same time 🤷) * api: Fix indexes Remove 'name' from /pull potential keys. It doesn't have an index and we won't need it anyway for now, so let's KISS. Also moved index: directive to db-schema --------- Co-authored-by: Thom Shutt <[email protected]>
1 parent b611d60 commit bc809e4

File tree

6 files changed

+366
-56
lines changed

6 files changed

+366
-56
lines changed

packages/api/src/controllers/asset.ts

+1
Original file line numberDiff line numberDiff line change
@@ -609,6 +609,7 @@ const fieldsMap = {
609609
createdAt: { val: `asset.data->'createdAt'`, type: "int" },
610610
updatedAt: { val: `asset.data->'status'->'updatedAt'`, type: "int" },
611611
userId: `asset.data->>'userId'`,
612+
creatorId: `stream.data->'creatorId'->>'value'`,
612613
playbackId: `asset.data->>'playbackId'`,
613614
playbackRecordingId: `asset.data->>'playbackRecordingId'`,
614615
phase: `asset.data->'status'->>'phase'`,

packages/api/src/controllers/stream.test.ts

+145
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ let mockUser: User;
3333
let mockAdminUser: User;
3434
let mockNonAdminUser: User;
3535
let postMockStream: Stream;
36+
let postMockPullStream: Stream;
3637
// jest.setTimeout(70000)
3738

3839
beforeAll(async () => {
@@ -59,6 +60,12 @@ beforeAll(async () => {
5960
renditions: ["random_prefix_bbb_160p"],
6061
},
6162
];
63+
postMockPullStream = {
64+
...postMockStream,
65+
pull: {
66+
source: "https://playback.space/video+7bbb3wee.flv",
67+
},
68+
};
6269

6370
mockUser = {
6471
@@ -478,6 +485,144 @@ describe("controllers/stream", () => {
478485
});
479486
});
480487

488+
describe("pull stream idempotent creation", () => {
489+
beforeEach(async () => {
490+
// TODO: Remove this once experiment is done
491+
await client.post("/experiment", {
492+
name: "stream-pull-source",
493+
audienceUserIds: [adminUser.id],
494+
});
495+
});
496+
497+
it("should require a pull configuration", async () => {
498+
let res = await client.put("/stream/pull", postMockStream);
499+
expect(res.status).toBe(400);
500+
const errors = await res.json();
501+
expect(errors).toMatchObject({
502+
errors: [
503+
expect.stringContaining("stream pull configuration is required"),
504+
],
505+
});
506+
507+
res = await client.put("/stream/pull", {
508+
...postMockStream,
509+
pull: {}, // an empty object is missing the 'source' field
510+
});
511+
expect(res.status).toBe(422);
512+
});
513+
514+
it("should create a stream if a pull config is present", async () => {
515+
const now = Date.now();
516+
const res = await client.put("/stream/pull", postMockPullStream);
517+
expect(res.status).toBe(201);
518+
const stream = await res.json();
519+
expect(stream.id).toBeDefined();
520+
expect(stream.kind).toBe("stream");
521+
expect(stream.name).toBe("test_stream");
522+
expect(stream.createdAt).toBeGreaterThanOrEqual(now);
523+
const document = await db.stream.get(stream.id);
524+
expect(server.db.stream.addDefaultFields(document)).toEqual(stream);
525+
});
526+
527+
it("should update a stream if it has the same pull source", async () => {
528+
let res = await client.put("/stream/pull", postMockPullStream);
529+
expect(res.status).toBe(201);
530+
const stream = await res.json();
531+
532+
const now = Date.now();
533+
res = await client.put("/stream/pull", {
534+
...postMockPullStream,
535+
name: "updated_stream",
536+
profiles: [],
537+
});
538+
expect(res.status).toBe(200);
539+
const updatedStream = await res.json();
540+
expect(updatedStream.id).toBe(stream.id);
541+
expect(updatedStream.name).toBe("updated_stream");
542+
expect(updatedStream.profiles).toEqual([]);
543+
544+
const document = await db.stream.get(stream.id);
545+
expect(db.stream.addDefaultFields(document)).toEqual(updatedStream);
546+
});
547+
548+
it("should fail to dedup streams by a random key", async () => {
549+
let res = await client.put(
550+
"/stream/pull?key=invalid",
551+
postMockPullStream
552+
);
553+
expect(res.status).toBe(400);
554+
const errors = await res.json();
555+
expect(errors).toMatchObject({
556+
errors: [expect.stringContaining("key must be one of")],
557+
});
558+
});
559+
560+
it("should fail to dedup streams by creatorId if not provided", async () => {
561+
let res = await client.put(
562+
"/stream/pull?key=creatorId",
563+
postMockPullStream
564+
);
565+
expect(res.status).toBe(400);
566+
const errors = await res.json();
567+
expect(errors).toMatchObject({
568+
errors: [expect.stringContaining("must be present in the payload")],
569+
});
570+
});
571+
572+
it("should dedup streams by creatorId if requested", async () => {
573+
let res = await client.put("/stream/pull?key=creatorId", {
574+
...postMockPullStream,
575+
creatorId: "0xjest",
576+
});
577+
expect(res.status).toBe(201);
578+
const stream = await res.json();
579+
580+
res = await client.put("/stream/pull", {
581+
...postMockPullStream,
582+
creatorId: "0xjest",
583+
name: "updated_stream",
584+
profiles: [],
585+
});
586+
expect(res.status).toBe(200);
587+
const updatedStream = await res.json();
588+
expect(updatedStream.id).toBe(stream.id);
589+
expect(updatedStream.name).toBe("updated_stream");
590+
expect(updatedStream.profiles).toEqual([]);
591+
592+
const document = await db.stream.get(stream.id);
593+
expect(db.stream.addDefaultFields(document)).toEqual(updatedStream);
594+
});
595+
596+
it("should wait for stream to become active if requested", async () => {
597+
let responded = false;
598+
const resProm = client.put(
599+
"/stream/pull?waitActive=true",
600+
postMockPullStream
601+
);
602+
resProm.then(() => (responded = true));
603+
604+
// give some time for API to create object in DB
605+
await sleep(100);
606+
607+
const [streams] = await db.stream.find();
608+
expect(streams).toHaveLength(1);
609+
expect(streams[0].isActive).toBe(false);
610+
611+
// stream not active yet
612+
expect(responded).toBe(false);
613+
614+
// set stream active
615+
await db.stream.update(streams[0].id, { isActive: true });
616+
617+
const res = await resProm;
618+
expect(responded).toBe(true); // make sure this works
619+
expect(res.status).toBe(201);
620+
const stream = await res.json();
621+
expect(stream.id).toBe(streams[0].id);
622+
expect(stream.isActive).toBe(true);
623+
});
624+
});
625+
481626
it("should create a stream, delete it, and error when attempting additional delete or replace", async () => {
482627
const res = await client.post("/stream", { ...postMockStream });
483628
expect(res.status).toBe(201);

0 commit comments

Comments
 (0)