Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api/vod: Inline storage info in the asset resource #1014

Merged
merged 24 commits into from
May 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f0c503b
vod/asset: Create storageProviders field in asset
victorges Apr 20, 2022
31add1b
api: Update asset object when exporting to IPFS
victorges Apr 20, 2022
da29bbf
api: Allow patching storage providers
victorges Apr 20, 2022
43de9c2
api/scheduler: Make it a switch
victorges Apr 20, 2022
83067f0
api: Update asset with IPFS specs when done
victorges Apr 20, 2022
9d7c913
api: Create separate status field in asset
victorges Apr 21, 2022
2553399
api: Fix some of the asset schema mess
victorges Apr 23, 2022
2476b77
api: Return full asset on patch
victorges Apr 24, 2022
161354c
api: Create taskIds map in status
victorges Apr 25, 2022
0d068bd
api: Save failed task ID as well
victorges Apr 25, 2022
3f4abc9
api/asset: Disallow removing an asset from IPFS
victorges May 16, 2022
e5b0ec2
api/store: Create asset table to handle status compat
victorges May 16, 2022
979d0cd
api/asset: Call withPlaybackUrl from response middleware
victorges May 16, 2022
30f7a8a
api/asset: Create reconcileAssetStorage helper
victorges May 17, 2022
fe897af
api/scheduler: Expose error message on failed asset status
victorges May 17, 2022
0a4d871
api/store: Create mergeAssetStatus helper
victorges May 17, 2022
83a9640
www/asset: Use new status field in asset table
victorges May 17, 2022
a07d6f6
api/asset/patch: Check input before anything else
victorges May 17, 2022
479aa33
api/asset: Create admin API to migrate assets to new format
victorges May 17, 2022
8af6016
www/docs: Update the VOD docs with the new fields
victorges May 17, 2022
7629dbf
api: Add some tests to asset schema migration logic
victorges May 24, 2022
06353a3
api: Create tests for new asset logic
victorges May 25, 2022
f161d18
api: Fix some bugs found by the tests
victorges May 25, 2022
6d12ce8
api: Add some tests to the migrate-status API as well
victorges May 25, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"go-livepeer:broadcaster": "bin/livepeer -broadcaster -datadir ./bin/broadcaster -orchAddr 127.0.0.1:3086 -rtmpAddr 0.0.0.0:3035 -httpAddr :3085 -cliAddr :3075 -v 6 -authWebhookUrl http://127.0.0.1:3004/api/stream/hook -orchWebhookUrl http://127.0.0.1:3004/api/orchestrator",
"go-livepeer:orchestrator": "bin/livepeer -orchestrator -datadir ./bin/orchestrator -transcoder -serviceAddr 127.0.0.1:3086 -cliAddr :3076 -v 6",
"test": "jest \"${PWD}/src\" -i --silent",
"test:watch": "jest \"${PWD}/src\" -i --silent --watch",
"test:build": "parcel build --no-autoinstall --no-minify --bundle-node-modules -t browser --out-dir ../dist-worker ../src/worker.js",
"coverage": "yarn run test --coverage",
"pkg": "pkg --out-path=bin --no-bytecode --public-packages \"*\" --public . --compress GZip ",
Expand Down
1 change: 1 addition & 0 deletions packages/api/src/app-router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ export default async function makeApp(params: CliArgs) {
return {
router: app,
webhookCannon,
taskScheduler,
store,
db,
queue,
Expand Down
320 changes: 320 additions & 0 deletions packages/api/src/controllers/asset.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,320 @@
import serverPromise, { TestServer } from "../test-server";
import { TestClient, clearDatabase, setupUsers } from "../test-helpers";
import { v4 as uuid } from "uuid";
import { Asset, User } from "../schema/types";
import { db } from "../store";
import { WithID } from "../store/types";
import Table from "../store/table";
import schema from "../schema/schema.json";

// repeat the type here so we don't need to export it from store/asset-table.ts
type DBAsset =
| Omit<Asset, "status"> & {
id: string;
updatedAt?: Asset["status"]["updatedAt"];
status?: Asset["status"] | Asset["status"]["phase"];
};

let server: TestServer;
let mockAdminUserInput: User;
let mockNonAdminUserInput: User;
// db.asset migrates the objects on read, gotta go raw
let rawAssetTable: Table<DBAsset>;

// jest.setTimeout(70000)

beforeAll(async () => {
server = await serverPromise;

mockAdminUserInput = {
email: "user_admin@gmail.com",
password: "x".repeat(64),
};

mockNonAdminUserInput = {
email: "user_non_admin@gmail.com",
password: "y".repeat(64),
};

rawAssetTable = new Table<DBAsset>({
db,
schema: schema.components.schemas["asset"],
});
});

afterEach(async () => {
await clearDatabase(server);
});

describe("controllers/asset", () => {
let client: TestClient;
let adminUser: User;
let adminApiKey: string;
let nonAdminUser: User;
let nonAdminToken: string;

beforeEach(async () => {
await db.objectStore.create({
id: "mock_vod_store",
url: "http://user:password@localhost:8080/us-east-1/vod",
});
({ client, adminUser, adminApiKey, nonAdminUser, nonAdminToken } =
await setupUsers(server, mockAdminUserInput, mockNonAdminUserInput));
client.jwtAuth = nonAdminToken;
});

describe("assets status schema migration", () => {
it("should create assets in the new format", async () => {
const res = await client.post("/asset/request-upload", { name: "zoo" });
expect(res.status).toBe(200);
const { asset } = await res.json();
const expected = {
id: expect.any(String),
name: "zoo",
status: { phase: "waiting", updatedAt: expect.any(Number) },
};
expect(asset).toMatchObject(expected);
const dbAsset = await rawAssetTable.get(asset.id);
expect(dbAsset).toMatchObject(expected);
});

it("should support assets in the old format in database", async () => {
const asset = await rawAssetTable.create({
id: uuid(),
name: "test2",
createdAt: Date.now(),
updatedAt: Date.now(),
status: "ready",
userId: nonAdminUser.id,
});

let { updatedAt, ...expected } = asset;
expected = {
...expected,
downloadUrl: "https://test/asset/video",
status: { phase: "ready", updatedAt: asset.updatedAt },
};

const res = await client.get(`/asset/${asset.id}`);
expect(res.status).toBe(200);
expect(res.json()).resolves.toEqual(expected);
});

it("should disallow non-admins from calling migrate API", async () => {
const res = await client.post("/asset/migrate-status");
expect(res.status).toBe(403);
});

it("should migrate assets to the new format", async () => {
client.jwtAuth = null;
client.apiKey = adminApiKey;
const asset = await rawAssetTable.create({
id: uuid(),
name: "test2",
createdAt: Date.now(),
updatedAt: Date.now(),
status: "ready",
userId: nonAdminUser.id,
});

let { updatedAt, ...expected } = asset;
expected = {
...expected,
status: { phase: "ready", updatedAt: asset.updatedAt },
};

expect(rawAssetTable.get(asset.id)).resolves.not.toEqual(expected);

const res = await client.post("/asset/migrate-status");
expect(res.status).toBe(200);

expect(rawAssetTable.get(asset.id)).resolves.toEqual(expected);
});
});

describe("asset inline storage", () => {
let asset: WithID<Asset>;

beforeEach(async () => {
asset = await db.asset.create({
id: uuid(),
name: "test-storage",
createdAt: Date.now(),
status: {
phase: "ready",
updatedAt: Date.now(),
},
userId: nonAdminUser.id,
});
});

it("should allow editing asset name", async () => {
const res = await client.patch(`/asset/${asset.id}`, { name: "zoo" });
expect(res.status).toBe(200);
const body = await res.json();
expect(body).toMatchObject({ ...asset, name: "zoo" });
});

it("should start export task when adding IPFS storage", async () => {
let res = await client.patch(`/asset/${asset.id}`, {
storage: { ipfs: { nftMetadata: { a: "b" } } },
});
expect(res.status).toBe(200);
const patched = await res.json();
expect(patched).toMatchObject({
...asset,
storage: { ipfs: { nftMetadata: { a: "b" } } },
status: {
...asset.status,
updatedAt: expect.any(Number),
storage: {
ipfs: {
taskIds: {
pending: expect.any(String),
},
},
},
},
});

const taskId = patched.status.storage.ipfs.taskIds.pending;
res = await client.get("/task/" + taskId);
expect(res.status).toBe(200);
expect(res.json()).resolves.toMatchObject({
id: taskId,
type: "export",
inputAssetId: asset.id,
params: {
export: {
ipfs: {
nftMetadata: { a: "b" },
},
},
},
});
});

it("should update asset storage when manually exporting to IPFS", async () => {
let res = await client.post(`/asset/${asset.id}/export`, {
ipfs: { nftMetadata: { a: "b" } },
});
expect(res.status).toBe(201);
const { task } = await res.json();
expect(task).toMatchObject({
id: expect.any(String),
type: "export",
inputAssetId: asset.id,
});

res = await client.get(`/asset/${asset.id}`);
expect(res.status).toBe(200);
expect(res.json()).resolves.toMatchObject({
...asset,
storage: {
ipfs: { nftMetadata: { a: "b" } },
},
status: {
...asset.status,
updatedAt: expect.any(Number),
storage: {
ipfs: {
taskIds: {
pending: task.id,
},
},
},
},
});
});

it("should update asset status when task finishes", async () => {
let res = await client.patch(`/asset/${asset.id}`, {
storage: { ipfs: {} },
});
expect(res.status).toBe(200);
const patched = await res.json();
const taskId = patched.status.storage.ipfs.taskIds.pending;
await server.taskScheduler.processTaskEvent({
id: uuid(),
type: "task_result",
timestamp: Date.now(),
task: {
id: taskId,
type: "export",
snapshot: await db.task.get(taskId),
},
error: null,
output: {
export: {
ipfs: {
videoFileCid: "QmX",
nftMetadataCid: "QmY",
},
},
},
});

res = await client.get(`/asset/${asset.id}`);
expect(res.status).toBe(200);
expect(res.json()).resolves.toEqual({
...patched,
status: {
phase: "ready",
updatedAt: expect.any(Number),
storage: {
ipfs: {
taskIds: {
last: taskId,
},
data: {
videoFileCid: "QmX",
nftMetadataCid: "QmY",
},
},
},
},
});
});

it("should update asset status when task fails", async () => {
let res = await client.patch(`/asset/${asset.id}`, {
storage: { ipfs: {} },
});
expect(res.status).toBe(200);
const patched = await res.json();
const taskId = patched.status.storage.ipfs.taskIds.pending;
await server.taskScheduler.processTaskEvent({
id: uuid(),
type: "task_result",
timestamp: Date.now(),
task: {
id: taskId,
type: "export",
snapshot: await db.task.get(taskId),
},
error: {
message: "failed!",
unretriable: true,
},
output: null,
});

res = await client.get(`/asset/${asset.id}`);
expect(res.status).toBe(200);
expect(res.json()).resolves.toEqual({
...patched,
status: {
phase: "ready",
updatedAt: expect.any(Number),
storage: {
ipfs: {
taskIds: {
failed: taskId,
},
},
},
},
});
});
});
});
Loading