Skip to content

Commit

Permalink
prevent multiple post-process triggering of upload-resumable
Browse files Browse the repository at this point in the history
  • Loading branch information
rigelk committed Jun 8, 2021
1 parent 17b064e commit 922b2b8
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 18 deletions.
18 changes: 11 additions & 7 deletions server/controllers/api/videos/upload.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as express from 'express'
import { move } from 'fs-extra'
import { getLowercaseExtension } from '@server/helpers/core-utils'
import { deleteResumableUploadMetaFile, getResumableUploadPath } from '@server/helpers/upload'
import { getResumableUploadPath, scheduleDeleteResumableUploadMetaFile } from '@server/helpers/upload'
import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent'
import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url'
import { addOptimizeOrMergeAudioJob, buildLocalVideoFromReq, buildVideoThumbnailsFromReq, setVideoTags } from '@server/lib/video'
Expand Down Expand Up @@ -35,6 +35,7 @@ import {
import { ScheduleVideoUpdateModel } from '../../../models/video/schedule-video-update'
import { VideoModel } from '../../../models/video/video'
import { VideoFileModel } from '../../../models/video/video-file'
import { Redis } from '@server/lib/redis'

const lTags = loggerTagsFactory('api', 'video')
const auditLogger = auditLoggerFactory('videos')
Expand Down Expand Up @@ -112,18 +113,21 @@ export async function addVideoLegacy (req: express.Request, res: express.Respons
const videoInfo: VideoCreate = req.body
const files = req.files

return addVideo({ res, videoPhysicalFile, videoInfo, files })
const response = addVideo({ res, videoPhysicalFile, videoInfo, files })
return res.json(response)
}

export async function addVideoResumable (_req: express.Request, res: express.Response) {
export async function addVideoResumable (req: express.Request, res: express.Response) {
const videoPhysicalFile = res.locals.videoFileResumable
const videoInfo = videoPhysicalFile.metadata
const files = { previewfile: videoInfo.previewfile }

// Don't need the meta file anymore
await deleteResumableUploadMetaFile(videoPhysicalFile.path)
scheduleDeleteResumableUploadMetaFile(videoPhysicalFile.path)

return addVideo({ res, videoPhysicalFile, videoInfo, files })
const response = await addVideo({ res, videoPhysicalFile, videoInfo, files })
await Redis.Instance.setUploadSession(req.query.upload_id, response.video)
return res.json(response)
}

async function addVideo (options: {
Expand Down Expand Up @@ -215,12 +219,12 @@ async function addVideo (options: {

Hooks.runAction('action:api.video.uploaded', { video: videoCreated })

return res.json({
return {
video: {
id: videoCreated.id,
uuid: videoCreated.uuid
}
})
}
}

async function buildNewFile (video: MVideo, videoPhysicalFile: express.VideoUploadFile) {
Expand Down
9 changes: 8 additions & 1 deletion server/helpers/upload.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { JobQueue } from '@server/lib/job-queue'
import { METAFILE_EXTNAME } from '@uploadx/core'
import { remove } from 'fs-extra'
import { join } from 'path'
Expand All @@ -13,9 +14,15 @@ function deleteResumableUploadMetaFile (filepath: string) {
return remove(filepath + METAFILE_EXTNAME)
}

function scheduleDeleteResumableUploadMetaFile (filepath: string) {
const payload = { filepath }
JobQueue.Instance.createJob({ type: 'delete-resumable-upload-meta-file', payload }, { delay: 900 * 1000 }) // executed in 15 min
}

// ---------------------------------------------------------------------------

export {
getResumableUploadPath,
deleteResumableUploadMetaFile
deleteResumableUploadMetaFile,
scheduleDeleteResumableUploadMetaFile
}
12 changes: 9 additions & 3 deletions server/initializers/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = {
'videos-views': 1,
'activitypub-refresher': 1,
'video-redundancy': 1,
'video-live-ending': 1
'video-live-ending': 1,
'delete-resumable-upload-meta-file': 1
}
// Excluded keys are jobs that can be configured by admins
const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-import'>]: number } = {
Expand All @@ -161,7 +162,8 @@ const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-im
'videos-views': 1,
'activitypub-refresher': 1,
'video-redundancy': 1,
'video-live-ending': 10
'video-live-ending': 10,
'delete-resumable-upload-meta-file': 5
}
const JOB_TTL: { [id in JobType]: number } = {
'activitypub-http-broadcast': 60000 * 10, // 10 minutes
Expand All @@ -177,7 +179,8 @@ const JOB_TTL: { [id in JobType]: number } = {
'videos-views': undefined, // Unlimited
'activitypub-refresher': 60000 * 10, // 10 minutes
'video-redundancy': 1000 * 3600 * 3, // 3 hours
'video-live-ending': 1000 * 60 * 10 // 10 minutes
'video-live-ending': 1000 * 60 * 10, // 10 minutes
'delete-resumable-upload-meta-file': 60000 * 10 // 10 minutes
}
const REPEAT_JOBS: { [ id: string ]: EveryRepeatOptions | CronRepeatOptions } = {
'videos-views': {
Expand Down Expand Up @@ -651,6 +654,8 @@ const RESUMABLE_UPLOAD_DIRECTORY = join(CONFIG.STORAGE.TMP_DIR, 'resumable-uploa
const HLS_STREAMING_PLAYLIST_DIRECTORY = join(CONFIG.STORAGE.STREAMING_PLAYLISTS_DIR, 'hls')
const HLS_REDUNDANCY_DIRECTORY = join(CONFIG.STORAGE.REDUNDANCY_DIR, 'hls')

const RESUMABLE_UPLOAD_SESSION_LIFETIME = 60000 * 60 * 16 // 16 hours

const VIDEO_LIVE = {
EXTENSION: '.ts',
CLEANUP_DELAY: 1000 * 60 * 5, // 5 minutes
Expand Down Expand Up @@ -824,6 +829,7 @@ export {
LAZY_STATIC_PATHS,
SEARCH_INDEX,
RESUMABLE_UPLOAD_DIRECTORY,
RESUMABLE_UPLOAD_SESSION_LIFETIME,
HLS_REDUNDANCY_DIRECTORY,
P2P_MEDIA_LOADER_PEER_VERSION,
ACTOR_IMAGES_SIZE,
Expand Down
17 changes: 17 additions & 0 deletions server/lib/job-queue/handlers/delete-resumable-upload-meta-file.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import * as Bull from 'bull'
import { DeleteResumableUploadMetaFilePayload } from '@shared/models'
import { logger } from '../../../helpers/logger'
import { deleteResumableUploadMetaFile } from '@server/helpers/upload'

async function processDeleteResumableUploadMetaFile (job: Bull.Job) {
const payload = job.data as DeleteResumableUploadMetaFilePayload
logger.info('Processing deletion of meta file for resumable upload in job %d.', job.id)

await deleteResumableUploadMetaFile(payload.filepath)
}

// ---------------------------------------------------------------------------

export {
processDeleteResumableUploadMetaFile
}
11 changes: 8 additions & 3 deletions server/lib/job-queue/job-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
ActivitypubHttpFetcherPayload,
ActivitypubHttpUnicastPayload,
ActorKeysPayload,
DeleteResumableUploadMetaFilePayload,
EmailPayload,
JobState,
JobType,
Expand All @@ -34,6 +35,7 @@ import { processVideoImport } from './handlers/video-import'
import { processVideoLiveEnding } from './handlers/video-live-ending'
import { processVideoTranscoding } from './handlers/video-transcoding'
import { processVideosViews } from './handlers/video-views'
import { processDeleteResumableUploadMetaFile } from './handlers/delete-resumable-upload-meta-file'

type CreateJobArgument =
{ type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
Expand All @@ -49,7 +51,8 @@ type CreateJobArgument =
{ type: 'videos-views', payload: {} } |
{ type: 'video-live-ending', payload: VideoLiveEndingPayload } |
{ type: 'actor-keys', payload: ActorKeysPayload } |
{ type: 'video-redundancy', payload: VideoRedundancyPayload }
{ type: 'video-redundancy', payload: VideoRedundancyPayload } |
{ type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload }

type CreateJobOptions = {
delay?: number
Expand All @@ -70,7 +73,8 @@ const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = {
'activitypub-refresher': refreshAPObject,
'video-live-ending': processVideoLiveEnding,
'actor-keys': processActorKeys,
'video-redundancy': processVideoRedundancy
'video-redundancy': processVideoRedundancy,
'delete-resumable-upload-meta-file': processDeleteResumableUploadMetaFile
}

const jobTypes: JobType[] = [
Expand All @@ -87,7 +91,8 @@ const jobTypes: JobType[] = [
'activitypub-refresher',
'video-redundancy',
'actor-keys',
'video-live-ending'
'video-live-ending',
'delete-resumable-upload-meta-file'
]

class JobQueue {
Expand Down
26 changes: 25 additions & 1 deletion server/lib/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import {
USER_PASSWORD_CREATE_LIFETIME,
VIEW_LIFETIME,
WEBSERVER,
TRACKER_RATE_LIMITS
TRACKER_RATE_LIMITS,
RESUMABLE_UPLOAD_SESSION_LIFETIME
} from '../initializers/constants'
import { CONFIG } from '../initializers/config'

Expand Down Expand Up @@ -202,6 +203,29 @@ class Redis {
])
}

/* ************ Resumable uploads final responses ************ */

setUploadSession (uploadId: string, video?: { id: number, uuid: string }) {
return this.setObject(
uploadId,
video
? {
id: video.id.toString(),
uuid: video.uuid
}
: null,
RESUMABLE_UPLOAD_SESSION_LIFETIME
)
}

doesUploadSessionExist (uploadId: string) {
return this.exists(uploadId)
}

getUploadSession (uploadId: string) {
return this.getValue(uploadId)
}

/* ************ Keys generation ************ */

generateCachedRouteKey (req: express.Request) {
Expand Down
21 changes: 19 additions & 2 deletions server/middlewares/validators/videos/videos.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import * as express from 'express'
import { body, header, param, query, ValidationChain } from 'express-validator'
import { Redis } from '@server/lib/redis'
import { getResumableUploadPath } from '@server/helpers/upload'
import { isAbleToUploadVideo } from '@server/lib/user'
import { getServerActor } from '@server/models/application/application'
Expand Down Expand Up @@ -109,12 +110,28 @@ const videosAddLegacyValidator = getCommonVideoEditAttributes().concat([
const videosAddResumableValidator = [
async (req: express.Request, res: express.Response, next: express.NextFunction) => {
const user = res.locals.oauth.token.User

const body: express.CustomUploadXFile<express.UploadXFileMetadata> = req.body
const file = { ...body, duration: undefined, path: getResumableUploadPath(body.id), filename: body.metadata.filename }

const cleanup = () => deleteFileAndCatch(file.path)

const uploadId = req.query.upload_id
const sessionExists = await Redis.Instance.doesUploadSessionExist(uploadId)
if (sessionExists) {
const sessionResponse = await Redis.Instance.getUploadSession(uploadId)
if (!sessionResponse) {
res.setHeader('Retry-After', 300) // ask to retry after 5 min, knowing the upload_id is kept for up to 15 min after completion
res.fail({
status: HttpStatusCode.CONFLICT_409,
message: 'The upload is already being processed'
})
} else {
res.json({ video: sessionResponse })
}
return
} else {
await Redis.Instance.setUploadSession(uploadId)
}

if (!await doesVideoChannelOfAccountExist(file.metadata.channelId, user, res)) return cleanup()

try {
Expand Down
5 changes: 5 additions & 0 deletions shared/models/server/job.model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export type JobType =
| 'video-redundancy'
| 'video-live-ending'
| 'actor-keys'
| 'delete-resumable-upload-meta-file'

export interface Job {
id: number
Expand Down Expand Up @@ -137,3 +138,7 @@ export interface VideoLiveEndingPayload {
export interface ActorKeysPayload {
actorId: number
}

export interface DeleteResumableUploadMetaFilePayload {
filepath: string
}
2 changes: 1 addition & 1 deletion support/doc/api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2039,7 +2039,7 @@ paths:
'404':
description: upload not found
'409':
description: chunk doesn't match range
description: chunk doesn't match range or upload being processed
'422':
description: video unreadable
'429':
Expand Down

0 comments on commit 922b2b8

Please sign in to comment.