diff --git a/packages/api/package.json b/packages/api/package.json index b8d1f1f76b..2e9998aeee 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -35,6 +35,7 @@ "go-livepeer:broadcaster": "bin/livepeer -broadcaster -datadir ./bin/broadcaster -orchAddr 127.0.0.1:3086 -rtmpAddr 0.0.0.0:3035 -httpAddr :3085 -cliAddr :3075 -v 6 -authWebhookUrl http://127.0.0.1:3004/api/stream/hook -orchWebhookUrl http://127.0.0.1:3004/api/orchestrator", "go-livepeer:orchestrator": "bin/livepeer -orchestrator -datadir ./bin/orchestrator -transcoder -serviceAddr 127.0.0.1:3086 -cliAddr :3076 -v 6", "test": "jest \"${PWD}/src\" -i --silent", + "test:watch": "jest \"${PWD}/src\" -i --silent --watch", "test:build": "parcel build --no-autoinstall --no-minify --bundle-node-modules -t browser --out-dir ../dist-worker ../src/worker.js", "coverage": "yarn run test --coverage", "pkg": "pkg --out-path=bin --no-bytecode --public-packages \"*\" --public . --compress GZip ", diff --git a/packages/api/src/app-router.ts b/packages/api/src/app-router.ts index 438c995ddf..5d1dbf934a 100644 --- a/packages/api/src/app-router.ts +++ b/packages/api/src/app-router.ts @@ -252,6 +252,7 @@ export default async function makeApp(params: CliArgs) { return { router: app, webhookCannon, + taskScheduler, store, db, queue, diff --git a/packages/api/src/controllers/asset.test.ts b/packages/api/src/controllers/asset.test.ts new file mode 100644 index 0000000000..a9b38b1d27 --- /dev/null +++ b/packages/api/src/controllers/asset.test.ts @@ -0,0 +1,320 @@ +import serverPromise, { TestServer } from "../test-server"; +import { TestClient, clearDatabase, setupUsers } from "../test-helpers"; +import { v4 as uuid } from "uuid"; +import { Asset, User } from "../schema/types"; +import { db } from "../store"; +import { WithID } from "../store/types"; +import Table from "../store/table"; +import schema from "../schema/schema.json"; + +// repeat the type here so we don't need to export it from store/asset-table.ts +type DBAsset = + | Omit & { + id: string; + updatedAt?: Asset["status"]["updatedAt"]; + status?: Asset["status"] | Asset["status"]["phase"]; + }; + +let server: TestServer; +let mockAdminUserInput: User; +let mockNonAdminUserInput: User; +// db.asset migrates the objects on read, gotta go raw +let rawAssetTable: Table; + +// jest.setTimeout(70000) + +beforeAll(async () => { + server = await serverPromise; + + mockAdminUserInput = { + email: "user_admin@gmail.com", + password: "x".repeat(64), + }; + + mockNonAdminUserInput = { + email: "user_non_admin@gmail.com", + password: "y".repeat(64), + }; + + rawAssetTable = new Table({ + db, + schema: schema.components.schemas["asset"], + }); +}); + +afterEach(async () => { + await clearDatabase(server); +}); + +describe("controllers/asset", () => { + let client: TestClient; + let adminUser: User; + let adminApiKey: string; + let nonAdminUser: User; + let nonAdminToken: string; + + beforeEach(async () => { + await db.objectStore.create({ + id: "mock_vod_store", + url: "http://user:password@localhost:8080/us-east-1/vod", + }); + ({ client, adminUser, adminApiKey, nonAdminUser, nonAdminToken } = + await setupUsers(server, mockAdminUserInput, mockNonAdminUserInput)); + client.jwtAuth = nonAdminToken; + }); + + describe("assets status schema migration", () => { + it("should create assets in the new format", async () => { + const res = await client.post("/asset/request-upload", { name: "zoo" }); + expect(res.status).toBe(200); + const { asset } = await res.json(); + const expected = { + id: expect.any(String), + name: "zoo", + status: { phase: "waiting", updatedAt: expect.any(Number) }, + }; + expect(asset).toMatchObject(expected); + const dbAsset = await rawAssetTable.get(asset.id); + expect(dbAsset).toMatchObject(expected); + }); + + it("should support assets in the old format in database", async () => { + const asset = await rawAssetTable.create({ + id: uuid(), + name: "test2", + createdAt: Date.now(), + updatedAt: Date.now(), + status: "ready", + userId: nonAdminUser.id, + }); + + let { updatedAt, ...expected } = asset; + expected = { + ...expected, + downloadUrl: "https://test/asset/video", + status: { phase: "ready", updatedAt: asset.updatedAt }, + }; + + const res = await client.get(`/asset/${asset.id}`); + expect(res.status).toBe(200); + expect(res.json()).resolves.toEqual(expected); + }); + + it("should disallow non-admins from calling migrate API", async () => { + const res = await client.post("/asset/migrate-status"); + expect(res.status).toBe(403); + }); + + it("should migrate assets to the new format", async () => { + client.jwtAuth = null; + client.apiKey = adminApiKey; + const asset = await rawAssetTable.create({ + id: uuid(), + name: "test2", + createdAt: Date.now(), + updatedAt: Date.now(), + status: "ready", + userId: nonAdminUser.id, + }); + + let { updatedAt, ...expected } = asset; + expected = { + ...expected, + status: { phase: "ready", updatedAt: asset.updatedAt }, + }; + + expect(rawAssetTable.get(asset.id)).resolves.not.toEqual(expected); + + const res = await client.post("/asset/migrate-status"); + expect(res.status).toBe(200); + + expect(rawAssetTable.get(asset.id)).resolves.toEqual(expected); + }); + }); + + describe("asset inline storage", () => { + let asset: WithID; + + beforeEach(async () => { + asset = await db.asset.create({ + id: uuid(), + name: "test-storage", + createdAt: Date.now(), + status: { + phase: "ready", + updatedAt: Date.now(), + }, + userId: nonAdminUser.id, + }); + }); + + it("should allow editing asset name", async () => { + const res = await client.patch(`/asset/${asset.id}`, { name: "zoo" }); + expect(res.status).toBe(200); + const body = await res.json(); + expect(body).toMatchObject({ ...asset, name: "zoo" }); + }); + + it("should start export task when adding IPFS storage", async () => { + let res = await client.patch(`/asset/${asset.id}`, { + storage: { ipfs: { nftMetadata: { a: "b" } } }, + }); + expect(res.status).toBe(200); + const patched = await res.json(); + expect(patched).toMatchObject({ + ...asset, + storage: { ipfs: { nftMetadata: { a: "b" } } }, + status: { + ...asset.status, + updatedAt: expect.any(Number), + storage: { + ipfs: { + taskIds: { + pending: expect.any(String), + }, + }, + }, + }, + }); + + const taskId = patched.status.storage.ipfs.taskIds.pending; + res = await client.get("/task/" + taskId); + expect(res.status).toBe(200); + expect(res.json()).resolves.toMatchObject({ + id: taskId, + type: "export", + inputAssetId: asset.id, + params: { + export: { + ipfs: { + nftMetadata: { a: "b" }, + }, + }, + }, + }); + }); + + it("should update asset storage when manually exporting to IPFS", async () => { + let res = await client.post(`/asset/${asset.id}/export`, { + ipfs: { nftMetadata: { a: "b" } }, + }); + expect(res.status).toBe(201); + const { task } = await res.json(); + expect(task).toMatchObject({ + id: expect.any(String), + type: "export", + inputAssetId: asset.id, + }); + + res = await client.get(`/asset/${asset.id}`); + expect(res.status).toBe(200); + expect(res.json()).resolves.toMatchObject({ + ...asset, + storage: { + ipfs: { nftMetadata: { a: "b" } }, + }, + status: { + ...asset.status, + updatedAt: expect.any(Number), + storage: { + ipfs: { + taskIds: { + pending: task.id, + }, + }, + }, + }, + }); + }); + + it("should update asset status when task finishes", async () => { + let res = await client.patch(`/asset/${asset.id}`, { + storage: { ipfs: {} }, + }); + expect(res.status).toBe(200); + const patched = await res.json(); + const taskId = patched.status.storage.ipfs.taskIds.pending; + await server.taskScheduler.processTaskEvent({ + id: uuid(), + type: "task_result", + timestamp: Date.now(), + task: { + id: taskId, + type: "export", + snapshot: await db.task.get(taskId), + }, + error: null, + output: { + export: { + ipfs: { + videoFileCid: "QmX", + nftMetadataCid: "QmY", + }, + }, + }, + }); + + res = await client.get(`/asset/${asset.id}`); + expect(res.status).toBe(200); + expect(res.json()).resolves.toEqual({ + ...patched, + status: { + phase: "ready", + updatedAt: expect.any(Number), + storage: { + ipfs: { + taskIds: { + last: taskId, + }, + data: { + videoFileCid: "QmX", + nftMetadataCid: "QmY", + }, + }, + }, + }, + }); + }); + + it("should update asset status when task fails", async () => { + let res = await client.patch(`/asset/${asset.id}`, { + storage: { ipfs: {} }, + }); + expect(res.status).toBe(200); + const patched = await res.json(); + const taskId = patched.status.storage.ipfs.taskIds.pending; + await server.taskScheduler.processTaskEvent({ + id: uuid(), + type: "task_result", + timestamp: Date.now(), + task: { + id: taskId, + type: "export", + snapshot: await db.task.get(taskId), + }, + error: { + message: "failed!", + unretriable: true, + }, + output: null, + }); + + res = await client.get(`/asset/${asset.id}`); + expect(res.status).toBe(200); + expect(res.json()).resolves.toEqual({ + ...patched, + status: { + phase: "ready", + updatedAt: expect.any(Number), + storage: { + ipfs: { + taskIds: { + failed: taskId, + }, + }, + }, + }, + }); + }); + }); +}); diff --git a/packages/api/src/controllers/asset.ts b/packages/api/src/controllers/asset.ts index 9391a553f4..316e4f97ce 100644 --- a/packages/api/src/controllers/asset.ts +++ b/packages/api/src/controllers/asset.ts @@ -1,6 +1,6 @@ import { authorizer } from "../middleware"; import { validatePost } from "../middleware"; -import { RequestHandler, Router } from "express"; +import { Request, RequestHandler, Router } from "express"; import jwt, { JwtPayload } from "jsonwebtoken"; import { v4 as uuid } from "uuid"; import mung from "express-mung"; @@ -20,11 +20,18 @@ import { UnprocessableEntityError, NotFoundError, BadRequestError, + InternalServerError, } from "../store/errors"; import httpProxy from "http-proxy"; import { generateStreamKey } from "./generate-stream-key"; -import { Asset, NewAssetPayload } from "../schema/types"; +import { + Asset, + ExportTaskParams, + NewAssetPayload, + Task, +} from "../schema/types"; import { WithID } from "../store/types"; +import { mergeAssetStatus } from "../store/asset-table"; const app = Router(); @@ -80,7 +87,10 @@ async function validateAssetPayload( playbackId, userId, createdAt, - status: "waiting", + status: { + phase: "waiting", + updatedAt: createdAt, + }, name: payload.name, meta: payload.meta, objectStoreId: payload.objectStoreId || defaultObjectStoreId, @@ -88,7 +98,7 @@ async function validateAssetPayload( } function withPlaybackUrls(asset: WithID, ingest: string): WithID { - if (asset.status !== "ready") { + if (asset.status.phase !== "ready") { return asset; } if (asset.playbackRecordingId) { @@ -105,6 +115,40 @@ function withPlaybackUrls(asset: WithID, ingest: string): WithID { }; } +async function reconcileAssetStorage( + { taskScheduler }: Request, + asset: WithID, + newStorage: Asset["storage"], + task?: WithID +): Promise<{ storage: Asset["storage"]; status: Asset["status"] }> { + let { storage, status } = asset; + const ipfsParamsEq = + JSON.stringify(newStorage?.ipfs) === JSON.stringify(storage?.ipfs); + if (!ipfsParamsEq) { + if (!newStorage.ipfs) { + throw new BadRequestError("Cannot remove asset from IPFS"); + } + if (!task) { + task = await taskScheduler.scheduleTask( + "export", + { export: { ipfs: newStorage.ipfs } }, + asset + ); + } + storage = { ...storage, ipfs: newStorage.ipfs }; + status = mergeAssetStatus(status, { + storage: { + ipfs: { + taskIds: { + pending: task.id, + }, + }, + }, + }); + } + return { storage, status }; +} + async function genUploadUrl( playbackId: string, objectStoreId: string, @@ -149,21 +193,31 @@ function parseUploadUrl( } app.use( - mung.json(function cleanWriteOnlyResponses( + mung.jsonAsync(async function cleanWriteOnlyResponses( data: WithID[] | WithID | { asset: WithID }, req ) { - if (req.user.admin) { - return data; + const ingests = await req.getIngest(); + if (!ingests.length) { + throw new InternalServerError("Ingest not configured"); } + const ingest = ingests[0].base; + const toExternalAsset = (a: WithID) => + req.user.admin + ? withPlaybackUrls(a, ingest) + : db.asset.cleanWriteOnlyResponse(withPlaybackUrls(a, ingest)); + if (Array.isArray(data)) { - return db.asset.cleanWriteOnlyResponses(data); + return data.map(toExternalAsset); } if ("id" in data) { - return db.asset.cleanWriteOnlyResponse(data); + return toExternalAsset(data); } if ("asset" in data) { - return { ...data, asset: db.asset.cleanWriteOnlyResponse(data.asset) }; + return { + ...data, + asset: toExternalAsset(data.asset), + }; } return data; }) @@ -174,7 +228,7 @@ const fieldsMap: FieldsMap = { name: { val: `asset.data->>'name'`, type: "full-text" }, objectStoreId: `asset.data->>'objectStoreId'`, createdAt: { val: `asset.data->'createdAt'`, type: "int" }, - updatedAt: { val: `asset.data->'updatedAt'`, type: "int" }, + updatedAt: { val: `asset.data->'status'->'updatedAt'`, type: "int" }, userId: `asset.data->>'userId'`, playbackId: `asset.data->>'playbackId'`, "user.email": { val: `users.data->>'email'`, type: "full-text" }, @@ -182,20 +236,15 @@ const fieldsMap: FieldsMap = { }; app.get("/", authorizer({}), async (req, res) => { - let { limit, cursor, all, event, allUsers, order, filters, count } = - toStringValues(req.query); + let { limit, cursor, all, allUsers, order, filters, count } = toStringValues( + req.query + ); if (isNaN(parseInt(limit))) { limit = undefined; } if (!order) { order = "updatedAt-true,createdAt-true"; } - const ingests = await req.getIngest(); - if (!ingests.length) { - res.status(501); - return res.json({ errors: ["Ingest not configured"] }); - } - const ingest = ingests[0].base; if (req.user.admin && allUsers && allUsers !== "false") { const query = parseFilters(fieldsMap, filters); @@ -220,7 +269,7 @@ app.get("/", authorizer({}), async (req, res) => { res.set("X-Total-Count", c); } return { - ...withPlaybackUrls(data, ingest), + ...data, user: db.user.cleanWriteOnlyResponse(usersdata), }; }, @@ -256,7 +305,7 @@ app.get("/", authorizer({}), async (req, res) => { if (count) { res.set("X-Total-Count", c); } - return withPlaybackUrls(data, ingest); + return data; }, }); res.status(200); @@ -269,13 +318,6 @@ app.get("/", authorizer({}), async (req, res) => { }); app.get("/:id", authorizer({ allowCorsApiKey: true }), async (req, res) => { - const ingests = await req.getIngest(); - if (!ingests.length) { - res.status(501); - return res.json({ errors: ["Ingest not configured"] }); - } - - const ingest = ingests[0].base; const asset = await db.asset.get(req.params.id); if (!asset) { throw new NotFoundError(`Asset not found`); @@ -287,7 +329,7 @@ app.get("/:id", authorizer({ allowCorsApiKey: true }), async (req, res) => { ); } - res.json(withPlaybackUrls(asset, ingest)); + res.json(asset); }); app.post( @@ -300,20 +342,23 @@ app.post( if (!asset) { throw new NotFoundError(`Asset not found with id ${assetId}`); } - if (asset.status !== "ready") { + if (asset.status.phase !== "ready") { res.status(412); return res.json({ errors: ["asset is not ready to be exported"] }); } if (req.user.id !== asset.userId) { throw new ForbiddenError(`User can only export their own assets`); } + const params = req.body as ExportTaskParams; const task = await req.taskScheduler.scheduleTask( "export", - { - export: req.body, - }, + { export: params }, asset ); + if ("ipfs" in params && !params.ipfs?.pinata) { + const updates = await reconcileAssetStorage(req, asset, params, task); + await db.asset.update(assetId, updates); + } res.status(201); res.json({ task }); @@ -341,7 +386,7 @@ app.post( }); } - asset = await db.asset.create(asset); + asset = (await db.asset.create(asset)) as WithID; const task = await req.taskScheduler.scheduleTask( "import", @@ -397,7 +442,7 @@ const transcodeAssetHandler: RequestHandler = async (req, res) => { } ); outputAsset.sourceAssetId = inputAsset.id; - outputAsset = await db.asset.create(outputAsset); + outputAsset = (await db.asset.create(outputAsset)) as WithID; const task = await req.taskScheduler.scheduleTask( "transcode", @@ -484,7 +529,7 @@ app.put("/upload/:url", async (req, res) => { throw new NotFoundError(`asset not found`); } let asset = assets[0][0]; - if (asset.status !== "waiting") { + if (asset.status.phase !== "waiting") { throw new UnprocessableEntityError(`asset has already been uploaded`); } @@ -533,32 +578,66 @@ app.delete("/:id", authorizer({}), async (req, res) => { res.end(); }); -// TODO: Delete this API as well? app.patch( "/:id", - authorizer({ anyAdmin: true }), - validatePost("asset"), + authorizer({}), + validatePost("asset-patch-payload"), async (req, res) => { + // these are the only updateable fields + let { name, storage } = req.body as Asset; + if (storage?.ipfs?.pinata) { + throw new BadRequestError( + "Custom pinata not allowed in asset storage. Call export API explicitly instead" + ); + } + // update a specific asset - const asset = await db.asset.get(req.body.id); + const { id } = req.params; + const asset = await db.asset.get(id); if (!asset) { throw new NotFoundError(`asset not found`); } - const { id, playbackId, userId, createdAt, objectStoreId } = asset; - await db.asset.update(req.body.id, { - ...req.body, - // these fields are not updateable - id, - playbackId, - userId, - createdAt, - updatedAt: Date.now(), - objectStoreId, + const storageUpdates = await reconcileAssetStorage(req, asset, storage); + await db.asset.update(id, { name, ...storageUpdates }); + const updated = await db.asset.get(id, { useReplica: false }); + res.status(200).json(updated); + } +); + +// TODO: Call this in production until there are no assets left in old format. +// Then remove compatibility code and this API. +app.post( + "/migrate-status", + authorizer({ anyAdmin: true }), + async (req, res) => { + let { limit, cursor } = toStringValues(req.query); + if (isNaN(parseInt(limit))) { + limit = "100"; + } + + const query = [sql`asset.data->'status'->>'phase' IS NULL`]; + const fields = " asset.id as id, asset.data as data"; + const [toUpdate, nextCursor] = await db.asset.find(query, { + limit, + cursor, + fields, }); - res.status(200); - res.json({ id: req.body.id }); + for (const asset of toUpdate) { + // the db.asset will actually already return the asset transformed to the + // updated format. All we need to do is re-save it as returned here. + await db.asset.replace(asset); + } + + if (toUpdate.length > 0 && nextCursor) { + res.links({ next: makeNextHREF(req, nextCursor) }); + } + return res.status(200).json({ + count: toUpdate.length, + nextCursor, + updated: toUpdate, + }); } ); diff --git a/packages/api/src/index.ts b/packages/api/src/index.ts index 0eee2f9d5c..125c12bb04 100644 --- a/packages/api/src/index.ts +++ b/packages/api/src/index.ts @@ -74,7 +74,14 @@ export default async function makeApp(params: CliArgs) { throw e; // process.exit(1) }); - const { db, queue, router, store, webhookCannon: webhook } = appRoute; + const { + db, + queue, + router, + store, + webhookCannon: webhook, + taskScheduler, + } = appRoute; const app = express(); const isSilentTest = @@ -125,6 +132,7 @@ export default async function makeApp(params: CliArgs) { store, db, webhook, + taskScheduler, queue, }; } diff --git a/packages/api/src/schema/schema.yaml b/packages/api/src/schema/schema.yaml index a565530c38..23eaa4e1ab 100644 --- a/packages/api/src/schema/schema.yaml +++ b/packages/api/src/schema/schema.yaml @@ -629,6 +629,15 @@ components: description: Timestamp (in milliseconds) at which the stream started. example: 1587667174725 + asset-patch-payload: + type: object + additionalProperties: false + properties: + name: + $ref: "#/components/schemas/asset/properties/name" + storage: + $ref: "#/components/schemas/asset/properties/storage" + stream-patch-payload: type: object additionalProperties: false @@ -906,6 +915,60 @@ components: description: Object store ID where the asset is stored writeOnly: true example: 09F8B46C-61A0-4254-9875-F71F4C605BC7 + storage: + additionalProperties: false + properties: + ipfs: + $ref: "#/components/schemas/export-task-params/oneOf/1/properties/ipfs" + status: + type: object + additionalProperties: false + readOnly: true + required: [phase, updatedAt] + description: Status of the asset + properties: + phase: + type: string + description: Phase of the asset + enum: + - waiting + - ready + - failed + updatedAt: + type: number + description: + Timestamp (in milliseconds) at which the asset was last updated + example: 1587667174725 + errorMessage: + type: string + description: Error message if the asset creation failed. + storage: + type: object + additionalProperties: false + properties: + ipfs: + additionalProperties: false + required: [taskIds] + properties: + taskIds: + type: object + additionalProperties: false + properties: + pending: + type: string + description: + ID of any currently running task that is exporting + this asset to IPFS. + last: + type: string + description: + ID of the last task to run successfully, that + created the currently saved data. + failed: + type: string + description: ID of the last task to fail execution. + data: + $ref: "#/components/schemas/task/properties/output/properties/export/properties/ipfs" name: type: string description: @@ -923,11 +986,6 @@ components: "description": "This is a video of my awesome life", "tags": ["awesome", "life", "video"], } - updatedAt: - type: number - description: - Timestamp (in milliseconds) at which the asset was last updated - example: 1587667174725 createdAt: type: number readOnly: true @@ -1037,13 +1095,6 @@ components: type: number description: Bit depth of the track - only for audio tracks example: 16 - status: - type: string - description: Status of the asset - enum: - - waiting - - ready - - failed sourceAssetId: type: string index: true @@ -1183,6 +1234,7 @@ components: status: type: object additionalProperties: false + readOnly: true description: Status of the task properties: phase: diff --git a/packages/api/src/store/asset-table.ts b/packages/api/src/store/asset-table.ts new file mode 100644 index 0000000000..61f40c946a --- /dev/null +++ b/packages/api/src/store/asset-table.ts @@ -0,0 +1,95 @@ +import { QueryResult } from "pg"; +import { SQLStatement } from "sql-template-strings"; +import { Asset } from "../schema/types"; +import Table from "./table"; +import { + FindOptions, + FindQuery, + GetOptions, + UpdateOptions, + WithID, +} from "./types"; + +// Ideally this type should never be used outside of this file. It's only here +// to fix some backward incompatible change we made on the asset.status field. +type DBAsset = + | Omit & { + id: string; + + // These are deprecated fields from when we didn't have the top-level + // status object in the asset resources. + updatedAt?: Asset["status"]["updatedAt"]; + status?: Asset["status"] | Asset["status"]["phase"]; + }; + +// Receives an asset from database and returns it in the new status schema. +// +// TODO: Update existing objects in DB with the new schema to remove this +// compatibility code. +const assetStatusCompat = (asset: DBAsset): WithID => + !asset || typeof asset.status === "object" + ? (asset as WithID) + : { + ...{ ...asset, updatedAt: undefined }, + status: { + phase: asset.status, + updatedAt: asset.updatedAt, + }, + }; + +export const mergeAssetStatus = ( + s1: Asset["status"], + s2: Partial, + updatedAt: number = Date.now() +): Asset["status"] => ({ + ...s1, + ...s2, + updatedAt, + storage: { + ...s1?.storage, + ...s2?.storage, + ipfs: { + ...s1?.storage?.ipfs, + ...s2?.storage?.ipfs, + taskIds: { + ...s1?.storage?.ipfs?.taskIds, + ...s2?.storage?.ipfs?.taskIds, + }, + // data is not mergeable, just keep the result of the spread above (s2>s1) + }, + }, +}); + +// This is mostly a compatibility helper which transforms assets read from the +// database into the new schema (with status object instead of status string). +// +// Any methods not overridden here will return the raw DBAsset objects and that +// is by design. If you need to use any method that is not here and need to +// access the status as an object, first create the corresponding override which +// transforms the returned objects here. +export default class AssetTable extends Table { + async get(id: string, opts?: GetOptions): Promise> { + const asset = await super.get(id, opts); + return assetStatusCompat(asset); + } + + async find( + query?: FindQuery | SQLStatement[], + opts?: FindOptions + ): Promise<[WithID[], string]> { + const [assets, cursor] = await super.find(query, opts); + return [assets?.map(assetStatusCompat), cursor]; + } + + create(doc: WithID): Promise> { + return super.create(doc) as Promise>; + } + + update( + query: string | SQLStatement[], + doc: Partial>, + opts?: UpdateOptions + ): Promise> { + return super.update(query, doc, opts); + } +} diff --git a/packages/api/src/store/db.ts b/packages/api/src/store/db.ts index 05847e0d05..8ddf2d24bc 100644 --- a/packages/api/src/store/db.ts +++ b/packages/api/src/store/db.ts @@ -9,10 +9,7 @@ import { ObjectStore, ApiToken, User, - Webhook, PasswordResetToken, - MultistreamTarget, - Asset, Task, Usage, Region, @@ -27,6 +24,7 @@ import { QueryOptions, WithID } from "./types"; import MultistreamTargetTable from "./multistream-table"; import WebhookTable from "./webhook-table"; import { CdnUsageTable } from "./cdn-usage-table"; +import AssetTable from "./asset-table"; // Should be configurable, perhaps? const CONNECT_TIMEOUT = 5000; @@ -64,7 +62,7 @@ export class DB { stream: StreamTable; objectStore: Table; multistreamTarget: MultistreamTargetTable; - asset: Table; + asset: AssetTable; task: Table; apiToken: Table; user: Table; @@ -147,7 +145,7 @@ export class DB { db: this, schema: schemas["api-token"], }); - this.asset = makeTable({ + this.asset = new AssetTable({ db: this, schema: schemas["asset"], }); diff --git a/packages/api/src/task/scheduler.ts b/packages/api/src/task/scheduler.ts index e8dce3e8c6..d352c7adb9 100644 --- a/packages/api/src/task/scheduler.ts +++ b/packages/api/src/task/scheduler.ts @@ -5,6 +5,7 @@ import Queue from "../store/queue"; import { Asset, Task } from "../schema/types"; import { v4 as uuid } from "uuid"; import { WithID } from "../store/types"; +import { mergeAssetStatus } from "../store/asset-table"; export default class TaskScheduler { queue: Queue; running: boolean; @@ -68,42 +69,67 @@ export default class TaskScheduler { return true; } - if (event.task.type === "import") { - const assetSpec = event.output?.import?.assetSpec; - if (!assetSpec) { - const error = "bad task output: missing assetSpec"; - console.error( - `task event process error: err=${error} taskId=${event.task.id}` - ); - await this.failTask(task, error, event.output); - return true; - } - await db.asset.update(task.outputAssetId, { - size: assetSpec.size, - hash: assetSpec.hash, - videoSpec: assetSpec.videoSpec, - playbackRecordingId: assetSpec.playbackRecordingId, - status: "ready", - updatedAt: Date.now(), - }); - } else if (event.task.type === "transcode") { - const assetSpec = event.output?.transcode?.asset?.assetSpec; - if (!assetSpec) { - const error = "bad task output: missing assetSpec"; - console.error( - `task event process error: err=${error} taskId=${event.task.id}` - ); - await this.failTask(task, error, event.output); - return true; - } - await db.asset.update(task.outputAssetId, { - size: assetSpec.size, - hash: assetSpec.hash, - videoSpec: assetSpec.videoSpec, - playbackRecordingId: assetSpec.playbackRecordingId, - status: "ready", - updatedAt: Date.now(), - }); + switch (event.task.type) { + case "import": + let assetSpec = event.output?.import?.assetSpec; + if (!assetSpec) { + const error = "bad task output: missing assetSpec"; + console.error( + `task event process error: err=${error} taskId=${event.task.id}` + ); + await this.failTask(task, error, event.output); + return true; + } + await db.asset.update(task.outputAssetId, { + size: assetSpec.size, + hash: assetSpec.hash, + videoSpec: assetSpec.videoSpec, + playbackRecordingId: assetSpec.playbackRecordingId, + status: { + phase: "ready", + updatedAt: Date.now(), + }, + }); + break; + case "transcode": + assetSpec = event.output?.transcode?.asset?.assetSpec; + if (!assetSpec) { + const error = "bad task output: missing assetSpec"; + console.error( + `task event process error: err=${error} taskId=${event.task.id}` + ); + await this.failTask(task, error, event.output); + return true; + } + await db.asset.update(task.outputAssetId, { + size: assetSpec.size, + hash: assetSpec.hash, + videoSpec: assetSpec.videoSpec, + playbackRecordingId: assetSpec.playbackRecordingId, + status: { + phase: "ready", + updatedAt: Date.now(), + }, + }); + break; + case "export": + const inputAsset = await db.asset.get(task.inputAssetId); + if (inputAsset.status.storage?.ipfs?.taskIds?.pending === task.id) { + await db.asset.update(inputAsset.id, { + status: mergeAssetStatus(inputAsset.status, { + storage: { + ipfs: { + taskIds: { + pending: undefined, + last: task.id, + }, + data: event.output.export.ipfs, + }, + }, + }), + }); + } + break; } await db.task.update(task.id, { status: { @@ -116,19 +142,36 @@ export default class TaskScheduler { } private async failTask(task: Task, error: string, output?: Task["output"]) { + const status = { + phase: "failed", + updatedAt: Date.now(), + errorMessage: error, + } as const; await db.task.update(task.id, { output, - status: { - phase: "failed", - updatedAt: Date.now(), - errorMessage: error, - }, + status, }); if (task.outputAssetId) { - await db.asset.update(task.outputAssetId, { - status: "failed", - updatedAt: Date.now(), - }); + await db.asset.update(task.outputAssetId, { status }); + } + switch (task.type) { + case "export": + const inputAsset = await db.asset.get(task.inputAssetId); + if (inputAsset.status?.storage?.ipfs?.taskIds?.pending === task.id) { + await db.asset.update(inputAsset.id, { + status: mergeAssetStatus(inputAsset.status, { + storage: { + ipfs: { + taskIds: { + pending: undefined, + failed: task.id, + }, + }, + }, + }), + }); + } + break; } } diff --git a/packages/api/src/test-server.ts b/packages/api/src/test-server.ts index 54dec14e2b..f43a40ebf1 100644 --- a/packages/api/src/test-server.ts +++ b/packages/api/src/test-server.ts @@ -39,6 +39,7 @@ params.sendgridTemplateId = sendgridTemplateId; params.sendgridApiKey = sendgridApiKey; params.postgresUrl = `postgresql://postgres@127.0.0.1/${testId}`; params.recordObjectStoreId = "mock_store"; +params.vodObjectStoreId = "mock_vod_store"; params.ingest = '[{"ingest": "rtmp://test/live","playback": "https://test/hls","base": "https://test"}]'; params.amqpUrl = `amqp://localhost:5672/${testId}`; @@ -66,6 +67,7 @@ async function setupServer() { db: server.db, queue: server.queue, webhook: server.webhook, + taskScheduler: server.taskScheduler, }; } diff --git a/packages/api/src/webhooks/cannon.ts b/packages/api/src/webhooks/cannon.ts index 6aad126b2b..82ff0383f9 100644 --- a/packages/api/src/webhooks/cannon.ts +++ b/packages/api/src/webhooks/cannon.ts @@ -512,12 +512,13 @@ export default class WebhookCannon { const id = uuid(); const playbackId = await generateUniquePlaybackId(this.store, sessionId); + const createdAt = Date.now(); const asset = await this.db.asset.create({ id, playbackId, userId, - createdAt: Date.now(), - status: "waiting", + createdAt, + status: { phase: "waiting", updatedAt: createdAt }, name: `live-to-vod-${sessionId}`, objectStoreId: this.vodObjectStoreId, }); diff --git a/packages/www/components/Dashboard/AssetsTable/index.tsx b/packages/www/components/Dashboard/AssetsTable/index.tsx index 130ea0e66c..73f65b9cac 100644 --- a/packages/www/components/Dashboard/AssetsTable/index.tsx +++ b/packages/www/components/Dashboard/AssetsTable/index.tsx @@ -160,7 +160,11 @@ const AssetsTable = ({ fallback: , }, updatedAt: { - date: asset.updatedAt ? new Date(asset.updatedAt) : null, + date: + asset.status.updatedAt && + asset.status.updatedAt !== asset.createdAt + ? new Date(asset.status.updatedAt) + : null, fallback: , }, downloadUrl: { diff --git a/packages/www/docs/api-reference/vod/export.mdx b/packages/www/docs/api-reference/vod/export.mdx index bab206ac5a..df749168b9 100644 --- a/packages/www/docs/api-reference/vod/export.mdx +++ b/packages/www/docs/api-reference/vod/export.mdx @@ -50,10 +50,10 @@ curl --location --request POST 'https://livepeer.com/api/asset/$ASSET_ID/export' description: "Will be added to the `pinata_secret_api_key` header.", }, { - parameter: ipfs.erc1155Metadata, + parameter: ipfs.nftMetadata, type: object, description: - "Additional data to add to the automatic ERC-1155 default export. Will be deep merged with the default metadata regularly exported.", + "Additional data to add to the NFT metadata also exported to IPFS. Will be deep merged with the default metadata regularly exported.", }, { parameter: custom, diff --git a/packages/www/docs/api-reference/vod/list.mdx b/packages/www/docs/api-reference/vod/list.mdx index 4c5901c978..597f5a53fe 100644 --- a/packages/www/docs/api-reference/vod/list.mdx +++ b/packages/www/docs/api-reference/vod/list.mdx @@ -59,10 +59,12 @@ curl --location --request GET 'https://livepeer.com/api/asset/$ASSET_ID' \ ], "name": "Example name", "size": 52615193, - "status": "ready", + "status": { + "phase": "ready", + "updatedAt": 1644546541218 + }, "userId": "$USER_ID", "createdAt": 1644546528636, - "updatedAt": 1644546541218, "videoSpec": { "format": "mp4", "tracks": [ diff --git a/packages/www/docs/api-reference/vod/upload.mdx b/packages/www/docs/api-reference/vod/upload.mdx index 171d3b6247..b296267380 100644 --- a/packages/www/docs/api-reference/vod/upload.mdx +++ b/packages/www/docs/api-reference/vod/upload.mdx @@ -49,15 +49,38 @@ curl --location --request POST 'https://livepeer.com/api/asset/request-upload' \ "url": "https://origin.livepeer.com/api/asset/upload/aHR0cHM6Ly9zdG9yYWdlLmdvb2dsZWFwaXMuY29tL2xwLW55Yy12b2QtbW9uc3Rlci9kaXJlY3RVcGxvYWQvNjI3OHQyZWlrY2JqZXUxNC9zb3VyY2U%2F...", "asset": { "id": "$ASSET_ID", + "name": "Example name", "playbackId": "$PLAYBACK_ID", "userId": "$USER_ID", "createdAt": 1644595454248, - "status": "waiting", - "name": "Example name" + "status": { + "phase": "waiting", + "updatedAt": 1644595454248 + } + }, + "task": { + "id": "$TASK_ID", + "type": "import", + "outputAssetId": "$ASSET_ID", + "params": { + "import": { + "uploadedObjectKey": "directUpload/$PLAYBACK_ID/source" + } + }, + "createdAt": 1652813126031, + "status": { + "phase": "pending", + "updatedAt": 1652813126031 + } } } ``` +You can use the returned `task` object to monitor the progress of processing the +asset after the contents have been uploaded. Check the +[Retrieve a Task](/docs/api-reference/vod/list-tasks#retrieve-a-task) API for +more information. + ## Step 2: Upload the contents Now upload (`PUT`) a video file on the exact URL in the `url` field of the