diff --git a/server/controllers/api/videos/index.ts b/server/controllers/api/videos/index.ts index ff29e584bf8..3f96f142cd5 100644 --- a/server/controllers/api/videos/index.ts +++ b/server/controllers/api/videos/index.ts @@ -66,6 +66,7 @@ import { liveRouter } from './live' import { ownershipVideoRouter } from './ownership' import { rateVideoRouter } from './rate' import { watchingRouter } from './watching' +import { LiveManager } from '@server/lib/live-manager' const auditLogger = auditLoggerFactory('videos') const videosRouter = express.Router() @@ -416,26 +417,46 @@ async function getVideo (req: express.Request, res: express.Response) { } async function viewVideo (req: express.Request, res: express.Response) { - const videoInstance = res.locals.onlyImmutableVideo + const immutableVideoAttrs = res.locals.onlyImmutableVideo const ip = req.ip - const exists = await Redis.Instance.doesVideoIPViewExist(ip, videoInstance.uuid) + const exists = await Redis.Instance.doesVideoIPViewExist(ip, immutableVideoAttrs.uuid) if (exists) { - logger.debug('View for ip %s and video %s already exists.', ip, videoInstance.uuid) - return res.status(204).end() + logger.debug('View for ip %s and video %s already exists.', ip, immutableVideoAttrs.uuid) + return res.sendStatus(204) } - await Promise.all([ - Redis.Instance.addVideoView(videoInstance.id), - Redis.Instance.setIPVideoView(ip, videoInstance.uuid) - ]) + const video = await VideoModel.load(immutableVideoAttrs.id) - const serverActor = await getServerActor() - await sendView(serverActor, videoInstance, undefined) + const promises: Promise[] = [ + Redis.Instance.setIPVideoView(ip, video.uuid, video.isLive) + ] - Hooks.runAction('action:api.video.viewed', { video: videoInstance, ip }) + let federateView = true - return res.status(204).end() + // Increment our live manager + if (video.isLive && video.isOwned()) { + LiveManager.Instance.addViewTo(video.id) + + // Views of our local live will be sent by our live manager + federateView = false + } + + // Increment our video views cache counter + if (!video.isLive) { + promises.push(Redis.Instance.addVideoView(video.id)) + } + + if (federateView) { + const serverActor = await getServerActor() + promises.push(sendView(serverActor, video, undefined)) + } + + await Promise.all(promises) + + Hooks.runAction('action:api.video.viewed', { video, ip }) + + return res.sendStatus(204) } async function getVideoDescription (req: express.Request, res: express.Response) { diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index e712f02a8a8..a93fe3c516f 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -316,7 +316,11 @@ const CONSTRAINTS_FIELDS = { } } -let VIDEO_VIEW_LIFETIME = 60000 * 60 // 1 hour +let VIEW_LIFETIME = { + VIDEO: 60000 * 60, // 1 hour + LIVE: 60000 * 5 // 5 minutes +} + let CONTACT_FORM_LIFETIME = 60000 * 60 // 1 hour const VIDEO_TRANSCODING_FPS: VideoTranscodingFPS = { @@ -726,7 +730,8 @@ if (isTestInstance() === true) { REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1 - VIDEO_VIEW_LIFETIME = 1000 // 1 second + VIEW_LIFETIME.VIDEO = 1000 // 1 second + VIEW_LIFETIME.LIVE = 1000 * 5 // 5 second CONTACT_FORM_LIFETIME = 1000 // 1 second JOB_ATTEMPTS['email'] = 1 @@ -838,7 +843,7 @@ export { JOB_COMPLETED_LIFETIME, HTTP_SIGNATURE, VIDEO_IMPORT_STATES, - VIDEO_VIEW_LIFETIME, + VIEW_LIFETIME, CONTACT_FORM_LIFETIME, VIDEO_PLAYLIST_PRIVACIES, PLUGIN_EXTERNAL_AUTH_TOKEN_LIFETIME, diff --git a/server/lib/activitypub/process/process-view.ts b/server/lib/activitypub/process/process-view.ts index cc26180af12..efceb21a207 100644 --- a/server/lib/activitypub/process/process-view.ts +++ b/server/lib/activitypub/process/process-view.ts @@ -4,6 +4,7 @@ import { Redis } from '../../redis' import { ActivityCreate, ActivityView, ViewObject } from '../../../../shared/models/activitypub' import { APProcessorOptions } from '../../../types/activitypub-processor.model' import { MActorSignature } from '../../../types/models' +import { LiveManager } from '@server/lib/live-manager' async function processViewActivity (options: APProcessorOptions) { const { activity, byActor } = options @@ -19,19 +20,27 @@ export { // --------------------------------------------------------------------------- async function processCreateView (activity: ActivityView | ActivityCreate, byActor: MActorSignature) { - const videoObject = activity.type === 'View' ? activity.object : (activity.object as ViewObject).object + const videoObject = activity.type === 'View' + ? activity.object + : (activity.object as ViewObject).object const options = { videoObject, - fetchType: 'only-immutable-attributes' as 'only-immutable-attributes', + fetchType: 'only-video' as 'only-video', allowRefresh: false as false } const { video } = await getOrCreateVideoAndAccountAndChannel(options) - await Redis.Instance.addVideoView(video.id) - if (video.isOwned()) { - // Don't resend the activity to the sender + // Our live manager will increment the counter and send the view to followers + if (video.isLive) { + LiveManager.Instance.addViewTo(video.id) + return + } + + await Redis.Instance.addVideoView(video.id) + + // Forward the view but don't resend the activity to the sender const exceptions = [ byActor ] await forwardVideoRelatedActivity(activity, undefined, exceptions, video) } diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts index 55c7a4ccbc8..599aabf80ea 100644 --- a/server/lib/job-queue/handlers/video-live-ending.ts +++ b/server/lib/job-queue/handlers/video-live-ending.ts @@ -83,6 +83,8 @@ async function saveLive (video: MVideo, live: MVideoLive) { await live.destroy() video.isLive = false + // Reinit views + video.views = 0 video.state = VideoState.TO_TRANSCODE video.duration = duration diff --git a/server/lib/live-manager.ts b/server/lib/live-manager.ts index ef9377e434d..4a1081a4f77 100644 --- a/server/lib/live-manager.ts +++ b/server/lib/live-manager.ts @@ -13,7 +13,7 @@ import { } from '@server/helpers/ffmpeg-utils' import { logger } from '@server/helpers/logger' import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' -import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants' +import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME, WEBSERVER } from '@server/initializers/constants' import { UserModel } from '@server/models/account/user' import { VideoModel } from '@server/models/video/video' import { VideoFileModel } from '@server/models/video/video-file' @@ -61,6 +61,8 @@ class LiveManager { private readonly transSessions = new Map() private readonly videoSessions = new Map() + // Values are Date().getTime() + private readonly watchersPerVideo = new Map() private readonly segmentsSha256 = new Map>() private readonly livesPerUser = new Map() @@ -115,6 +117,8 @@ class LiveManager { this.stop() } }) + + setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE) } run () { @@ -131,6 +135,10 @@ class LiveManager { this.rtmpServer = undefined } + isRunning () { + return !!this.rtmpServer + } + getSegmentsSha256 (videoUUID: string) { return this.segmentsSha256.get(videoUUID) } @@ -150,6 +158,19 @@ class LiveManager { return currentLives.reduce((sum, obj) => sum + obj.size, 0) } + addViewTo (videoId: number) { + if (this.videoSessions.has(videoId) === false) return + + let watchers = this.watchersPerVideo.get(videoId) + + if (!watchers) { + watchers = [] + this.watchersPerVideo.set(videoId, watchers) + } + + watchers.push(new Date().getTime()) + } + private getContext () { return context } @@ -331,6 +352,7 @@ class LiveManager { logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', rtmpUrl) this.transSessions.delete(sessionId) + this.watchersPerVideo.delete(videoLive.videoId) Promise.all([ tsWatcher.close(), masterWatcher.close() ]) .catch(err => logger.error('Cannot close watchers of %s.', outPath, { err })) @@ -426,6 +448,32 @@ class LiveManager { return this.isAbleToUploadVideoWithCache(user.id) } + private async updateLiveViews () { + if (!this.isRunning()) return + + logger.info('Updating live video views.') + + for (const videoId of this.watchersPerVideo.keys()) { + const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE + + const watchers = this.watchersPerVideo.get(videoId) + + const numWatchers = watchers.length + + const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) + video.views = numWatchers + await video.save() + + await federateVideoIfNeeded(video, false) + + // Only keep not expired watchers + const newWatchers = watchers.filter(w => w > notBefore) + this.watchersPerVideo.set(videoId, newWatchers) + + logger.debug('New live video views for %s is %d.', video.url, numWatchers) + } + } + static get Instance () { return this.instance || (this.instance = new this()) } diff --git a/server/lib/redis.ts b/server/lib/redis.ts index a075eee2d90..4325598b2f3 100644 --- a/server/lib/redis.ts +++ b/server/lib/redis.ts @@ -7,7 +7,7 @@ import { USER_EMAIL_VERIFY_LIFETIME, USER_PASSWORD_RESET_LIFETIME, USER_PASSWORD_CREATE_LIFETIME, - VIDEO_VIEW_LIFETIME, + VIEW_LIFETIME, WEBSERVER, TRACKER_RATE_LIMITS } from '../initializers/constants' @@ -118,8 +118,12 @@ class Redis { /* ************ Views per IP ************ */ - setIPVideoView (ip: string, videoUUID: string) { - return this.setValue(this.generateViewKey(ip, videoUUID), '1', VIDEO_VIEW_LIFETIME) + setIPVideoView (ip: string, videoUUID: string, isLive: boolean) { + const lifetime = isLive + ? VIEW_LIFETIME.LIVE + : VIEW_LIFETIME.VIDEO + + return this.setValue(this.generateViewKey(ip, videoUUID), '1', lifetime) } async doesVideoIPViewExist (ip: string, videoUUID: string) { diff --git a/server/tests/api/live/live.ts b/server/tests/api/live/live.ts index b41b5fc2eda..2198114b4e3 100644 --- a/server/tests/api/live/live.ts +++ b/server/tests/api/live/live.ts @@ -28,9 +28,12 @@ import { testImage, updateCustomSubConfig, updateLive, + viewVideo, + wait, waitJobs, waitUntilLiveStarts } from '../../../../shared/extra-utils' +import { FfmpegCommand } from 'fluent-ffmpeg' const expect = chai.expect @@ -419,6 +422,80 @@ describe('Test live', function () { }) }) + describe('Live views', function () { + let liveVideoId: string + let command: FfmpegCommand + + async function countViews (expected: number) { + for (const server of servers) { + const res = await getVideo(server.url, liveVideoId) + const video: VideoDetails = res.body + + expect(video.views).to.equal(expected) + } + } + + before(async function () { + this.timeout(30000) + + const liveAttributes = { + name: 'live video', + channelId: servers[0].videoChannel.id, + privacy: VideoPrivacy.PUBLIC + } + + const res = await createLive(servers[0].url, servers[0].accessToken, liveAttributes) + liveVideoId = res.body.video.uuid + + command = await sendRTMPStreamInVideo(servers[0].url, servers[0].accessToken, liveVideoId) + await waitUntilLiveStarts(servers[0].url, servers[0].accessToken, liveVideoId) + await waitJobs(servers) + }) + + it('Should display no views for a live', async function () { + await countViews(0) + }) + + it('Should view a live twice and display 1 view', async function () { + this.timeout(30000) + + await viewVideo(servers[0].url, liveVideoId) + await viewVideo(servers[0].url, liveVideoId) + + await wait(5000) + + await waitJobs(servers) + + await countViews(1) + }) + + it('Should wait 5 seconds and display 0 views', async function () { + this.timeout(30000) + + await wait(5000) + await waitJobs(servers) + + await countViews(0) + }) + + it('Should view a live on a remote and on local and display 2 views', async function () { + this.timeout(30000) + + await viewVideo(servers[0].url, liveVideoId) + await viewVideo(servers[1].url, liveVideoId) + await viewVideo(servers[1].url, liveVideoId) + + await wait(5000) + await waitJobs(servers) + + await countViews(2) + }) + + after(async function () { + await stopFfmpeg(command) + }) + }) + describe('Live socket messages', function () { async function createLiveWrapper () {