Skip to content

Commit

Permalink
api/vod: Inline storage info in the asset resource (#1014)
Browse files Browse the repository at this point in the history
* vod/asset: Create storageProviders field in asset

* api: Update asset object when exporting to IPFS

* api: Allow patching storage providers

* api/scheduler: Make it a switch

* api: Update asset with IPFS specs when done

* api: Create separate status field in asset

* api: Fix some of the asset schema mess

* api: Return full asset on patch

* api: Create taskIds map in status

* api: Save failed task ID as well

* api/asset: Disallow removing an asset from IPFS

* api/store: Create asset table to handle status compat

* api/asset: Call withPlaybackUrl from response middleware

This avoid the bug where we would call withPlaybackUrl
on an un-compated asset as well.

* api/asset: Create reconcileAssetStorage helper

* api/scheduler: Expose error message on failed asset status

* api/store: Create mergeAssetStatus helper

* www/asset: Use new status field in asset table

* api/asset/patch: Check input before anything else

* api/asset: Create admin API to migrate assets to new format

* www/docs: Update the VOD docs with the new fields

* api: Add some tests to asset schema migration logic

* api: Create tests for new asset logic

* api: Fix some bugs found by the tests

* api: Add some tests to the migrate-status API as well
  • Loading branch information
victorges authored May 26, 2022
1 parent ee0c219 commit c691933
Show file tree
Hide file tree
Showing 15 changed files with 753 additions and 124 deletions.
1 change: 1 addition & 0 deletions packages/api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"go-livepeer:broadcaster": "bin/livepeer -broadcaster -datadir ./bin/broadcaster -orchAddr 127.0.0.1:3086 -rtmpAddr 0.0.0.0:3035 -httpAddr :3085 -cliAddr :3075 -v 6 -authWebhookUrl http://127.0.0.1:3004/api/stream/hook -orchWebhookUrl http://127.0.0.1:3004/api/orchestrator",
"go-livepeer:orchestrator": "bin/livepeer -orchestrator -datadir ./bin/orchestrator -transcoder -serviceAddr 127.0.0.1:3086 -cliAddr :3076 -v 6",
"test": "jest \"${PWD}/src\" -i --silent",
"test:watch": "jest \"${PWD}/src\" -i --silent --watch",
"test:build": "parcel build --no-autoinstall --no-minify --bundle-node-modules -t browser --out-dir ../dist-worker ../src/worker.js",
"coverage": "yarn run test --coverage",
"pkg": "pkg --out-path=bin --no-bytecode --public-packages \"*\" --public . --compress GZip ",
Expand Down
1 change: 1 addition & 0 deletions packages/api/src/app-router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ export default async function makeApp(params: CliArgs) {
return {
router: app,
webhookCannon,
taskScheduler,
store,
db,
queue,
Expand Down
320 changes: 320 additions & 0 deletions packages/api/src/controllers/asset.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,320 @@
import serverPromise, { TestServer } from "../test-server";
import { TestClient, clearDatabase, setupUsers } from "../test-helpers";
import { v4 as uuid } from "uuid";
import { Asset, User } from "../schema/types";
import { db } from "../store";
import { WithID } from "../store/types";
import Table from "../store/table";
import schema from "../schema/schema.json";

// repeat the type here so we don't need to export it from store/asset-table.ts
type DBAsset =
| Omit<Asset, "status"> & {
id: string;
updatedAt?: Asset["status"]["updatedAt"];
status?: Asset["status"] | Asset["status"]["phase"];
};

let server: TestServer;
let mockAdminUserInput: User;
let mockNonAdminUserInput: User;
// db.asset migrates the objects on read, gotta go raw
let rawAssetTable: Table<DBAsset>;

// jest.setTimeout(70000)

beforeAll(async () => {
server = await serverPromise;

mockAdminUserInput = {
email: "user_admin@gmail.com",
password: "x".repeat(64),
};

mockNonAdminUserInput = {
email: "user_non_admin@gmail.com",
password: "y".repeat(64),
};

rawAssetTable = new Table<DBAsset>({
db,
schema: schema.components.schemas["asset"],
});
});

afterEach(async () => {
await clearDatabase(server);
});

describe("controllers/asset", () => {
let client: TestClient;
let adminUser: User;
let adminApiKey: string;
let nonAdminUser: User;
let nonAdminToken: string;

beforeEach(async () => {
await db.objectStore.create({
id: "mock_vod_store",
url: "http://user:password@localhost:8080/us-east-1/vod",
});
({ client, adminUser, adminApiKey, nonAdminUser, nonAdminToken } =
await setupUsers(server, mockAdminUserInput, mockNonAdminUserInput));
client.jwtAuth = nonAdminToken;
});

describe("assets status schema migration", () => {
it("should create assets in the new format", async () => {
const res = await client.post("/asset/request-upload", { name: "zoo" });
expect(res.status).toBe(200);
const { asset } = await res.json();
const expected = {
id: expect.any(String),
name: "zoo",
status: { phase: "waiting", updatedAt: expect.any(Number) },
};
expect(asset).toMatchObject(expected);
const dbAsset = await rawAssetTable.get(asset.id);
expect(dbAsset).toMatchObject(expected);
});

it("should support assets in the old format in database", async () => {
const asset = await rawAssetTable.create({
id: uuid(),
name: "test2",
createdAt: Date.now(),
updatedAt: Date.now(),
status: "ready",
userId: nonAdminUser.id,
});

let { updatedAt, ...expected } = asset;
expected = {
...expected,
downloadUrl: "https://test/asset/video",
status: { phase: "ready", updatedAt: asset.updatedAt },
};

const res = await client.get(`/asset/${asset.id}`);
expect(res.status).toBe(200);
expect(res.json()).resolves.toEqual(expected);
});

it("should disallow non-admins from calling migrate API", async () => {
const res = await client.post("/asset/migrate-status");
expect(res.status).toBe(403);
});

it("should migrate assets to the new format", async () => {
client.jwtAuth = null;
client.apiKey = adminApiKey;
const asset = await rawAssetTable.create({
id: uuid(),
name: "test2",
createdAt: Date.now(),
updatedAt: Date.now(),
status: "ready",
userId: nonAdminUser.id,
});

let { updatedAt, ...expected } = asset;
expected = {
...expected,
status: { phase: "ready", updatedAt: asset.updatedAt },
};

expect(rawAssetTable.get(asset.id)).resolves.not.toEqual(expected);

const res = await client.post("/asset/migrate-status");
expect(res.status).toBe(200);

expect(rawAssetTable.get(asset.id)).resolves.toEqual(expected);
});
});

describe("asset inline storage", () => {
let asset: WithID<Asset>;

beforeEach(async () => {
asset = await db.asset.create({
id: uuid(),
name: "test-storage",
createdAt: Date.now(),
status: {
phase: "ready",
updatedAt: Date.now(),
},
userId: nonAdminUser.id,
});
});

it("should allow editing asset name", async () => {
const res = await client.patch(`/asset/${asset.id}`, { name: "zoo" });
expect(res.status).toBe(200);
const body = await res.json();
expect(body).toMatchObject({ ...asset, name: "zoo" });
});

it("should start export task when adding IPFS storage", async () => {
let res = await client.patch(`/asset/${asset.id}`, {
storage: { ipfs: { nftMetadata: { a: "b" } } },
});
expect(res.status).toBe(200);
const patched = await res.json();
expect(patched).toMatchObject({
...asset,
storage: { ipfs: { nftMetadata: { a: "b" } } },
status: {
...asset.status,
updatedAt: expect.any(Number),
storage: {
ipfs: {
taskIds: {
pending: expect.any(String),
},
},
},
},
});

const taskId = patched.status.storage.ipfs.taskIds.pending;
res = await client.get("/task/" + taskId);
expect(res.status).toBe(200);
expect(res.json()).resolves.toMatchObject({
id: taskId,
type: "export",
inputAssetId: asset.id,
params: {
export: {
ipfs: {
nftMetadata: { a: "b" },
},
},
},
});
});

it("should update asset storage when manually exporting to IPFS", async () => {
let res = await client.post(`/asset/${asset.id}/export`, {
ipfs: { nftMetadata: { a: "b" } },
});
expect(res.status).toBe(201);
const { task } = await res.json();
expect(task).toMatchObject({
id: expect.any(String),
type: "export",
inputAssetId: asset.id,
});

res = await client.get(`/asset/${asset.id}`);
expect(res.status).toBe(200);
expect(res.json()).resolves.toMatchObject({
...asset,
storage: {
ipfs: { nftMetadata: { a: "b" } },
},
status: {
...asset.status,
updatedAt: expect.any(Number),
storage: {
ipfs: {
taskIds: {
pending: task.id,
},
},
},
},
});
});

it("should update asset status when task finishes", async () => {
let res = await client.patch(`/asset/${asset.id}`, {
storage: { ipfs: {} },
});
expect(res.status).toBe(200);
const patched = await res.json();
const taskId = patched.status.storage.ipfs.taskIds.pending;
await server.taskScheduler.processTaskEvent({
id: uuid(),
type: "task_result",
timestamp: Date.now(),
task: {
id: taskId,
type: "export",
snapshot: await db.task.get(taskId),
},
error: null,
output: {
export: {
ipfs: {
videoFileCid: "QmX",
nftMetadataCid: "QmY",
},
},
},
});

res = await client.get(`/asset/${asset.id}`);
expect(res.status).toBe(200);
expect(res.json()).resolves.toEqual({
...patched,
status: {
phase: "ready",
updatedAt: expect.any(Number),
storage: {
ipfs: {
taskIds: {
last: taskId,
},
data: {
videoFileCid: "QmX",
nftMetadataCid: "QmY",
},
},
},
},
});
});

it("should update asset status when task fails", async () => {
let res = await client.patch(`/asset/${asset.id}`, {
storage: { ipfs: {} },
});
expect(res.status).toBe(200);
const patched = await res.json();
const taskId = patched.status.storage.ipfs.taskIds.pending;
await server.taskScheduler.processTaskEvent({
id: uuid(),
type: "task_result",
timestamp: Date.now(),
task: {
id: taskId,
type: "export",
snapshot: await db.task.get(taskId),
},
error: {
message: "failed!",
unretriable: true,
},
output: null,
});

res = await client.get(`/asset/${asset.id}`);
expect(res.status).toBe(200);
expect(res.json()).resolves.toEqual({
...patched,
status: {
phase: "ready",
updatedAt: expect.any(Number),
storage: {
ipfs: {
taskIds: {
failed: taskId,
},
},
},
},
});
});
});
});
Loading

0 comments on commit c691933

Please sign in to comment.