From b68fdd126203f72182a40e150a8b842463e4d2a2 Mon Sep 17 00:00:00 2001 From: Momtchil Momtchev Date: Sat, 30 Dec 2023 17:35:53 +0100 Subject: [PATCH 01/10] filtering system w/ realtime PiP test-example --- deps/avcpp | 2 +- package-lock.json | 9 +- package.json | 1 + src/binding/avcpp-frame.cc | 2 +- src/binding/avcpp-frame.h | 1 + src/binding/avcpp-nobind.cc | 76 +++++++++++++-- src/binding/avcpp-types.h | 1 + src/binding/constants | 2 + src/binding/gen_constants.sh | 1 + src/lib/AudioDecoder.ts | 3 +- src/lib/Filter.ts | 184 +++++++++++++++++++++++++++++++++++ src/lib/Stream.ts | 1 + src/lib/VideoDecoder.ts | 3 +- src/lib/VideoEncoder.ts | 4 +- test/overlay.test.ts | 90 ++++++++++++++++- 15 files changed, 366 insertions(+), 14 deletions(-) create mode 100644 src/lib/Filter.ts diff --git a/deps/avcpp b/deps/avcpp index d3d3051..1a495ff 160000 --- a/deps/avcpp +++ b/deps/avcpp @@ -1 +1 @@ -Subproject commit d3d30510179728228ed87a76c5011f2659b8a388 +Subproject commit 1a495ff75d68b5303ee8551b47f17c8b63c4d0e5 diff --git a/package-lock.json b/package-lock.json index b8b0434..e6b651d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -24,6 +24,7 @@ "eslint-plugin-mocha": "^10.2.0", "magickwand.js": "^1.0.0-beta.6", "mocha": "^10.2.0", + "readable-stream-clone": "^0.0.7", "ts-node": "^10.9.1", "tsconfig-paths": "^4.2.0", "typescript": "^5.3.3" @@ -2101,7 +2102,7 @@ }, "node_modules/nobind17": { "version": "1.1.1", - "resolved": "git+ssh://git@github.com/mmomtchev/nobind.git#d6988c7f9128d711a440ab7e6f59fe6df2caf14a", + "resolved": "git+ssh://git@github.com/mmomtchev/nobind.git#060a501710b85d252db7659ea2fa9c4be65f6022", "license": "ISC", "dependencies": { "node-addon-api": ">=5.0.0" @@ -2357,6 +2358,12 @@ "node": ">= 6" } }, + "node_modules/readable-stream-clone": { + "version": "0.0.7", + "resolved": "https://registry.npmjs.org/readable-stream-clone/-/readable-stream-clone-0.0.7.tgz", + "integrity": "sha512-mdkQtdg5elliOR64xcHVMkghSHQGjpsdCFAs6fSe28wTaVypR0GywmK5ndKpOSQzzuNpSrzC12Rv9UEkDLX73A==", + "dev": true + }, "node_modules/readdirp": { "version": "3.6.0", "dev": true, diff --git a/package.json b/package.json index b0698a6..a16a405 100644 --- a/package.json +++ b/package.json @@ -60,6 +60,7 @@ "eslint-plugin-mocha": "^10.2.0", "magickwand.js": "^1.0.0-beta.6", "mocha": "^10.2.0", + "readable-stream-clone": "^0.0.7", "ts-node": "^10.9.1", "tsconfig-paths": "^4.2.0", "typescript": "^5.3.3" diff --git a/src/binding/avcpp-frame.cc b/src/binding/avcpp-frame.cc index f623488..5aac60d 100644 --- a/src/binding/avcpp-frame.cc +++ b/src/binding/avcpp-frame.cc @@ -9,7 +9,7 @@ VideoFrame CreateVideoFrame(Nobind::Typemap::Buffer buffer, PixelFormat pixelFor return VideoFrame{buffer.first, buffer.second, pixelFormat, width, height}; } -VideoFrameBuffer CopyFrameToBuffer(av::VideoFrame &frame) { +VideoFrameBuffer CopyFrameToBuffer(VideoFrame &frame) { auto size = frame.bufferSize(); return VideoFrameBuffer{{[&frame, size](uint8_t *data) { frame.copyToBuffer(data, size); }, size}}; } diff --git a/src/binding/avcpp-frame.h b/src/binding/avcpp-frame.h index ed4368e..6784be3 100644 --- a/src/binding/avcpp-frame.h +++ b/src/binding/avcpp-frame.h @@ -1,4 +1,5 @@ #pragma once +#include #include #include #include diff --git a/src/binding/avcpp-nobind.cc b/src/binding/avcpp-nobind.cc index 460e06b..ca5eac4 100644 --- a/src/binding/avcpp-nobind.cc +++ b/src/binding/avcpp-nobind.cc @@ -2,15 +2,15 @@ #include #include #include -#include -#include -#include - -// API2 -#include #include +#include +#include +#include +#include #include #include +#include +#include #include @@ -26,6 +26,9 @@ using namespace av; #define REGISTER_CONSTANT(CONST, NAME) \ constexpr static int64_t __const_##CONST{static_cast(CONST)}; \ m.def<&__const_##CONST, Nobind::ReadOnly>(NAME); +#define REGISTER_ENUM(ENUM, ID) \ + constexpr static int64_t __const_##ID{static_cast(ENUM::ID)}; \ + m.def<&__const_##ID, Nobind::ReadOnly>(#ENUM "_" #ID); // An universal toString() wrapper, to be used as a class extension template std::string ToString(T &v) { @@ -327,6 +330,8 @@ NOBIND_MODULE_DATA(ffmpeg, m, ffmpegInstanceData) { .def<&Packet::timeBase, Nobind::ReturnNested>("timeBase"); m.def("VideoFrame") + .cons() + .def<&VideoFrame::null>("null") // Every global function can also be registered as a static class method .def<&CreateVideoFrame>("create") .def<&VideoFrame::isNull>("isNull") @@ -417,6 +422,65 @@ NOBIND_MODULE_DATA(ffmpeg, m, ffmpegInstanceData) { .def(&AudioResampler::pop), Nobind::ReturnAsync>("popAsync"); + m.def("Filter").cons(); + + m.def("FilterGraph") + .cons<>() + .def<&FilterGraph::createFilter>("createFilter") + .def(&FilterGraph::parse)>("parse") + .def<&FilterGraph::config>("config") + .def(&FilterGraph::filter)>( + "filter"); + + m.def("FilterContext"); + + // We only export the safer API that copies frames for now + m.def("BufferSrcFilterContext") + .cons() + .def( + &BufferSrcFilterContext::writeVideoFrame)>("writeVideoFrame") + .def( + &BufferSrcFilterContext::writeAudioSamples)>("writeAudioSamples") + .def( + &BufferSrcFilterContext::writeVideoFrame), + Nobind::ReturnAsync>("writeVideoFrameAsync") + .def( + &BufferSrcFilterContext::writeAudioSamples), + Nobind::ReturnAsync>("writeAudioSamplesAsync") + .def<&BufferSrcFilterContext::checkFilter>("checkFilter"); + + m.def("BufferSinkFilterContext") + .cons() + // getVideoFrame & cie do not have good native-feel interface, but this avoids + // copying video frames + .def( + &BufferSinkFilterContext::getVideoFrame)>("getVideoFrame") + .def( + &BufferSinkFilterContext::getVideoFrame)>("getVideoFrameFlags") + .def( + &BufferSinkFilterContext::getAudioFrame)>("getAudioFrame") + .def( + &BufferSinkFilterContext::getAudioFrame)>("getAudioFrameFlags") + .def( + &BufferSinkFilterContext::getVideoFrame), + Nobind::ReturnAsync>("getVideoFrameAsync") + .def( + &BufferSinkFilterContext::getVideoFrame), + Nobind::ReturnAsync>("getVideoFrameFlagsAsync") + .def( + &BufferSinkFilterContext::getAudioFrame), + Nobind::ReturnAsync>("getAudioFrameAsync") + .def( + &BufferSinkFilterContext::getAudioFrame), + Nobind::ReturnAsync>("getAudioFrameFlagsAsync") + .def<&BufferSinkFilterContext::setFrameSize>("setFrameSize") + .def<&BufferSinkFilterContext::frameRate>("frameRate") + .def<&BufferSinkFilterContext::checkFilter>("checkFilter"); + + REGISTER_ENUM(FilterMediaType, Unknown); + REGISTER_ENUM(FilterMediaType, Audio); + REGISTER_ENUM(FilterMediaType, Video); + m.Exports().Set("WritableCustomIO", WritableCustomIO::GetClass(m.Env())); m.Exports().Set("ReadableCustomIO", ReadableCustomIO::GetClass(m.Env())); diff --git a/src/binding/avcpp-types.h b/src/binding/avcpp-types.h index e3fb1e1..5b9734a 100644 --- a/src/binding/avcpp-types.h +++ b/src/binding/avcpp-types.h @@ -54,6 +54,7 @@ TYPEMAPS_FOR_ENUM(AVCodecID); TYPEMAPS_FOR_ENUM(AVMediaType); TYPEMAPS_FOR_ENUM(AVPixelFormat); TYPEMAPS_FOR_ENUM(AVSampleFormat); +TYPEMAPS_FOR_ENUM(FilterMediaType); // While this is not an enum, the typemap is still compatible TYPEMAPS_FOR_ENUM(std::bitset<64>); diff --git a/src/binding/constants b/src/binding/constants index cc62d09..b1d2d7a 100644 --- a/src/binding/constants +++ b/src/binding/constants @@ -1009,3 +1009,5 @@ REGISTER_CONSTANT(SWS_SPLINE, "SWS_SPLINE"); REGISTER_CONSTANT(SWS_SRC_V_CHR_DROP_MASK, "SWS_SRC_V_CHR_DROP_MASK"); REGISTER_CONSTANT(SWS_SRC_V_CHR_DROP_SHIFT, "SWS_SRC_V_CHR_DROP_SHIFT"); REGISTER_CONSTANT(SWS_X, "SWS_X"); +REGISTER_CONSTANT(AV_BUFFERSINK_FLAG_NO_REQUEST, "AV_BUFFERSINK_FLAG_NO_REQUEST"); +REGISTER_CONSTANT(AV_BUFFERSINK_FLAG_PEEK, "AV_BUFFERSINK_FLAG_PEEK"); diff --git a/src/binding/gen_constants.sh b/src/binding/gen_constants.sh index dbc2a61..e839d07 100644 --- a/src/binding/gen_constants.sh +++ b/src/binding/gen_constants.sh @@ -20,4 +20,5 @@ sed -nr 's/^.*\s+AVFMT_([_A-Z0-9]+)[, ].*/AVFMT_\1 AV_FMT_\1/p' ${FFMPEG}/src/li sed -nr 's/^.*\s+AV_LOG_([_A-Z0-9]+)[, ].*/AV_LOG_\1 AV_LOG_\1/p' ${FFMPEG}/src/libavutil/log.h | sort | uniq sed -nr 's/^.*\s+SWS_([_A-Z0-9]+)[, ].*/SWS_\1 SWS_\1/p' ${FFMPEG}/src/libswscale/swscale.h | sort | uniq sed -nr 's/^.*\s+SWS_([_A-Z0-9]+)[, ].*/SWS_\1 SWS_\1/p' ${FFMPEG}/src/libswresample/swresample.h | sort | uniq +sed -nr 's/^.*\s+AV_BUFFERSINK_FLAG_([_A-Z0-9]+)[, ].*/AV_BUFFERSINK_FLAG_\1 AV_BUFFERSINK_FLAG_\1/p' ${FFMPEG}/src/libavfilter/buffersink.h | sort | uniq ) | sed -r 's/(.*)\s(.*)/REGISTER_CONSTANT(\1, "\2");/g' diff --git a/src/lib/AudioDecoder.ts b/src/lib/AudioDecoder.ts index 2d1222b..295661b 100644 --- a/src/lib/AudioDecoder.ts +++ b/src/lib/AudioDecoder.ts @@ -75,7 +75,8 @@ export class AudioDecoder extends MediaTransform implements MediaStream { sampleFormat: this.decoder.sampleFormat(), sampleRate: this.decoder.sampleRate(), channelLayout: new ffmpeg.ChannelLayout(this.decoder.channelLayout()), - frameSize: this.decoder.frameSize() + frameSize: this.decoder.frameSize(), + timeBase: this.decoder.timeBase() } as AudioStreamDefinition; } } diff --git a/src/lib/Filter.ts b/src/lib/Filter.ts new file mode 100644 index 0000000..6213acd --- /dev/null +++ b/src/lib/Filter.ts @@ -0,0 +1,184 @@ +import { EventEmitter, Writable, Readable } from 'node:stream'; +import ffmpeg from '@mmomtchev/ffmpeg'; +import { MediaStreamDefinition, isAudioDefinition, isVideoDefinition } from './MediaStream'; + +export const verbose = (process.env.DEBUG_FILTER || process.env.DEBUG_ALL) ? console.debug.bind(console) : () => undefined; + +export interface FilterOptions { + // Filter sources definitions + inputs: Record; + // Filter sinks definitions + outputs: Record; + // Graph string + graph: string; + // A filter must have a single time base + timeBase: any; +} + +/** + * A Transform stream that uses avfilter to transform a number of MediaStream. + * Must receive raw decoded input and sends raw decoded output. + */ +export class Filter extends EventEmitter { + protected filterGraph: any; + protected bufferSrc: Record; + protected bufferSink: Record; + protected timeBase: any; + protected stillStreamingSources: number; + src: Record; + sink: Record; + + constructor(options: FilterOptions) { + super(); + this.filterGraph = new ffmpeg.FilterGraph; + this.timeBase = options.timeBase; + + // construct inputs + let filterDescriptor = ''; + for (const inp of Object.keys(options.inputs)) { + const def = options.inputs[inp]; + if (isVideoDefinition(def)) { + filterDescriptor += `buffer@${inp}=video_size=${def.width}x${def.height}:` + + `pix_fmt=${def.pixelFormat.toString()}:time_base=${def.timeBase.toString()} [${inp}]; `; + } + if (isAudioDefinition(def)) { + throw new Error('later'); + } + } + filterDescriptor += options.graph; + for (const outp of Object.keys(options.outputs)) { + const def = options.outputs[outp]; + if (isVideoDefinition(def)) { + filterDescriptor += `[${outp}] buffersink@${outp}`; + } + } + verbose(`Filter: constructed graph ${filterDescriptor}`); + this.filterGraph.parse(filterDescriptor); + this.filterGraph.config(); + + this.stillStreamingSources = 0; + this.src = {}; + this.bufferSrc = {}; + for (const inp of Object.keys(options.inputs)) { + this.bufferSrc[inp] = { + buffer: new ffmpeg.BufferSrcFilterContext(this.filterGraph.filter(`buffer@${inp}`)), + busy: false + }; + this.src[inp] = new Writable({ + objectMode: true, + write: (chunk: any, encoding: BufferEncoding, callback: (error?: Error | null | undefined) => void) => { + this.write(inp, chunk, callback); + }, + destroy: (error: Error | null, callback: (error: Error | null) => void): void => { + if (error) { + this.stillStreamingSources--; + verbose(`Filter: error on source [${inp}], destroy all streams`, error); + } else { + verbose(`Filter: destroy source [${inp}]`); + callback(null); + } + }, + final: (callback: (error?: Error | null | undefined) => void): void => { + verbose(`Filter: end source [${inp}]`); + // VideoFrame.null() is a special EOF frame + this.write(inp, ffmpeg.VideoFrame.null(), callback); + callback(null); + this.stillStreamingSources--; + } + }); + this.stillStreamingSources++; + } + + this.sink = {}; + this.bufferSink = {}; + for (const outp of Object.keys(options.outputs)) { + this.bufferSink[outp] = { + buffer: new ffmpeg.BufferSinkFilterContext(this.filterGraph.filter(`buffersink@${outp}`)), + waitingToRead: 0 + }; + this.sink[outp] = new Readable({ + objectMode: true, + read: (size: number) => { + this.read(outp, size); + } + }); + } + } + + write(id: string, frame: any, callback: (error?: Error | null | undefined) => void) { + const src = this.bufferSrc[id]; + if (!src) { + return void callback(new Error(`Invalid buffer src [${id}]`)); + } + if (src.busy) { + // This is obviously a major malfunction and should never happen as long + // as the writer respects the stream semantics + return void callback(new Error(`Writing is not reentrant on [${id}]!`)); + } + src.busy = true; + + frame.setPictureType(ffmpeg.AV_PICTURE_TYPE_NONE); + frame.setTimeBase(this.timeBase); + frame.setStreamIndex(0); + + verbose(`Filter: received data for source [${id}]`); + src.buffer.writeVideoFrameAsync(frame) + .then(() => { + src.busy = false; + verbose(`Filter: consumed data for source [${id}], pts=${frame.pts().toString()}`); + callback(null); + // Now that we pushed more data, try reading again, refer to 1* below + for (const sink of Object.keys(this.bufferSink)) { + // This is fully synchronous on purpose - otherwise we might run + // into complex synchronization issues where someone else manages + // to call read between the two operations + const size = this.bufferSink[sink].waitingToRead; + if (size) { + verbose(`Filter: wake up sink [${sink}]`); + this.bufferSink[sink].waitingToRead = 0; + this.read(sink, size); + } + } + }) + .catch(callback); + } + + read(id: string, size: number) { + const sink = this.bufferSink[id]; + if (!sink) { + throw new Error(`Invalid buffer sink [${id}]`); + } + verbose(`Filter: received a request for data from sink [${id}]`); + + // read must always return immediately + // this means that we depend on ffmpeg not doing any filtering work on this call. + // This is the meaning of the special flag. + const videoFrame = new ffmpeg.VideoFrame; + let frames = 0; + while (sink.buffer.getVideoFrame(videoFrame) && frames < size) { + verbose(`Filter: sent data from sink [${id}], pts=${videoFrame.pts().toString()}`); + videoFrame.setPictureType(ffmpeg.AV_PICTURE_TYPE_NONE); + videoFrame.setTimeBase(this.timeBase); + videoFrame.setStreamIndex(0); + this.sink[id].push(videoFrame); + frames++; + } + if (this.stillStreamingSources === 0) { + verbose(`Filter: sending null for EOF on sink [${id}]`); + this.sink[id].push(null); + return; + } + if (frames === 0) { + verbose(`Filter: no data for sink [${id}] will call back later`); + // If nothing was readily available, now it will be up to us + // to call back when something is, see 1* above + sink.waitingToRead = size; + } + } +} diff --git a/src/lib/Stream.ts b/src/lib/Stream.ts index a96d9e2..a40c224 100644 --- a/src/lib/Stream.ts +++ b/src/lib/Stream.ts @@ -7,4 +7,5 @@ export { VideoTransform } from './VideoTransform'; export { AudioDecoder } from './AudioDecoder'; export { AudioEncoder } from './AudioEncoder'; export { AudioTransform } from './AudioTransform'; +export { Filter } from './Filter'; export { Discarder } from './Discarder'; diff --git a/src/lib/VideoDecoder.ts b/src/lib/VideoDecoder.ts index 8764b96..5454691 100644 --- a/src/lib/VideoDecoder.ts +++ b/src/lib/VideoDecoder.ts @@ -77,7 +77,8 @@ export class VideoDecoder extends MediaTransform implements MediaStream { width: this.decoder.width(), height: this.decoder.height(), frameRate: this.stream.frameRate(), - pixelFormat: this.decoder.pixelFormat() + pixelFormat: this.decoder.pixelFormat(), + timeBase: this.decoder.timeBase() } as VideoStreamDefinition; } } diff --git a/src/lib/VideoEncoder.ts b/src/lib/VideoEncoder.ts index 2ef8be9..fb87884 100644 --- a/src/lib/VideoEncoder.ts +++ b/src/lib/VideoEncoder.ts @@ -73,7 +73,9 @@ export class VideoEncoder extends MediaTransform implements MediaStream { frame.setPictureType(ffmpeg.AV_PICTURE_TYPE_NONE); frame.setTimeBase(this.encoder.timeBase()); const packet = await this.encoder.encodeAsync(frame); - verbose(`VideoEncoder: encoded frame: pts=${frame.pts()} / ${frame.pts().seconds()} / ${frame.timeBase()} / ${frame.width()}x${frame.height()}, size=${frame.size()}, ref=${frame.isReferenced()}:${frame.refCount()} / type: ${frame.pictureType()} }`); + verbose(`VideoEncoder: encoded frame: pts=${frame.pts()} / ${frame.pts().seconds()} / ` + + `${frame.timeBase()} / ${frame.width()}x${frame.height()}, size=${frame.size()}, ` + + `ref=${frame.isReferenced()}:${frame.refCount()} / type: ${frame.pictureType()} }`); this.push(packet); this.busy = false; callback(); diff --git a/test/overlay.test.ts b/test/overlay.test.ts index 5f1b95a..8c45c7d 100644 --- a/test/overlay.test.ts +++ b/test/overlay.test.ts @@ -1,11 +1,12 @@ import * as path from 'node:path'; import * as fs from 'node:fs'; +import ReadableStreamClone from 'readable-stream-clone'; import { assert } from 'chai'; import ffmpeg from '@mmomtchev/ffmpeg'; -import { Muxer, Demuxer, VideoDecoder, VideoEncoder, AudioDecoder, AudioEncoder, VideoTransform } from '@mmomtchev/ffmpeg/stream'; -import { Readable } from 'node:stream'; +import { Muxer, Demuxer, VideoDecoder, VideoEncoder, AudioDecoder, AudioEncoder, VideoTransform, Filter, Discarder } from '@mmomtchev/ffmpeg/stream'; +import { Readable, Writable } from 'node:stream'; import { MediaTransform, VideoStreamDefinition } from '../lib/MediaStream'; import { Magick, MagickCore } from 'magickwand.js'; @@ -145,4 +146,89 @@ describe('streaming', () => { } }); }); + + it('w/ ffmpeg filtering (PiP example)', (done) => { + // This uses ffmpeg's filter subsystem to overlay a copy of the video + // in a small thumbnail (Picture-in-Picture). + // It reads from a file, overlays and transcodes to a realtime stream. + // This pipeline is easily fast enough to be usable in real-time even on an older CPU. + // + const demuxer = new Demuxer({ inputFile: path.resolve(__dirname, 'data', 'launch.mp4') }); + + demuxer.on('error', done); + demuxer.on('ready', () => { + try { + + const audioInput = new Discarder; + const videoInput = new VideoDecoder(demuxer.video[0]); + + const videoDefinition = videoInput.definition(); + + const videoOutput = new VideoEncoder({ + type: 'Video', + codec: ffmpeg.AV_CODEC_H264, + bitRate: 2.5e6, + width: videoDefinition.width, + height: videoDefinition.height, + frameRate: new ffmpeg.Rational(25, 1), + pixelFormat: videoDefinition.pixelFormat, + // We will try to go as fast as possible + // H.264 encoding in ffmpeg can be very fast + codecOptions: { preset: 'veryfast' } + }); + + // A Filter is an ffmpeg filter chain + const filter = new Filter({ + inputs: { + // Filter with two identical inputs (the same video) + 'main_in': videoDefinition, + 'pip_in': videoDefinition + }, + outputs: { + // One output + 'out': videoOutput.definition() + }, + graph: + // Take 'pip_in' and rescale it to 1/8th to obtain 'pip_out' + `[pip_in] scale=${videoDefinition.width / 8}x${videoDefinition.height / 8} [pip_out]; ` + + // Overlay 'pip_out' over 'main_in' at the specified offset to obtain 'out' + `[main_in][pip_out] overlay=x=${videoDefinition.width * 13 / 16}:y=${videoDefinition.height / 16} [out]; `, + timeBase: videoDefinition.timeBase + }); + assert.instanceOf(filter.src['main_in'], Writable); + assert.instanceOf(filter.src['pip_in'], Writable); + assert.instanceOf(filter.sink['out'], Readable); + + const muxer = new Muxer({ outputFormat: 'matroska', streams: [videoOutput] }); + assert.instanceOf(muxer.output, Readable); + + muxer.on('finish', done); + muxer.on('error', done); + + const output = fs.createWriteStream(tempFile); + + // Create a T junction to copy the raw decoded video + const videoInput1 = new ReadableStreamClone(videoInput, { objectMode: true }); + const videoInput2 = new ReadableStreamClone(videoInput, { objectMode: true }); + + // Demuxer -> Decoder -> T junction + demuxer.video[0].pipe(videoInput); + + // T junction -> Filter source 'main_in' + videoInput1.pipe(filter.src['main_in']); + + // T junction -> Filter source 'pip_in' + videoInput2.pipe(filter.src['pip_in']); + + // Filter sink 'out' -> Encoder -> Muxer + filter.sink['out'].pipe(videoOutput).pipe(muxer.video[0]); + + demuxer.audio[0].pipe(audioInput); + muxer.output!.pipe(output); + } catch (err) { + done(err); + } + }); + }); + }); From 326df2eeabdf19161b9115777223923975b16a38 Mon Sep 17 00:00:00 2001 From: Momtchil Momtchev Date: Sat, 30 Dec 2023 19:20:52 +0100 Subject: [PATCH 02/10] emit events --- src/lib/Filter.ts | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/lib/Filter.ts b/src/lib/Filter.ts index 6213acd..a1479dd 100644 --- a/src/lib/Filter.ts +++ b/src/lib/Filter.ts @@ -31,6 +31,7 @@ export class Filter extends EventEmitter { }>; protected timeBase: any; protected stillStreamingSources: number; + protected destroyed: boolean; src: Record; sink: Record; @@ -63,6 +64,7 @@ export class Filter extends EventEmitter { this.filterGraph.config(); this.stillStreamingSources = 0; + this.destroyed = false; this.src = {}; this.bufferSrc = {}; for (const inp of Object.keys(options.inputs)) { @@ -79,6 +81,7 @@ export class Filter extends EventEmitter { if (error) { this.stillStreamingSources--; verbose(`Filter: error on source [${inp}], destroy all streams`, error); + this.destroy(error); } else { verbose(`Filter: destroy source [${inp}]`); callback(null); @@ -90,9 +93,14 @@ export class Filter extends EventEmitter { this.write(inp, ffmpeg.VideoFrame.null(), callback); callback(null); this.stillStreamingSources--; + if (this.stillStreamingSources === 0) + this.emit('finish'); } }); this.stillStreamingSources++; + Promise.resolve().then(() => { + this.emit('ready'); + }); } this.sink = {}; @@ -111,6 +119,18 @@ export class Filter extends EventEmitter { } } + destroy(error: Error) { + if (this.destroyed) return; + this.destroyed = true; + for (const s of Object.keys(this.bufferSrc)) { + this.sink[s].destroy(error); + } + for (const s of Object.keys(this.bufferSink)) { + this.src[s].destroy(error); + } + this.emit('error'); + } + write(id: string, frame: any, callback: (error?: Error | null | undefined) => void) { const src = this.bufferSrc[id]; if (!src) { From bedb608ace62a820127aa1e42d343dde7293d010 Mon Sep 17 00:00:00 2001 From: Momtchil Momtchev Date: Sat, 30 Dec 2023 19:24:13 +0100 Subject: [PATCH 03/10] comment more --- test/overlay.test.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/overlay.test.ts b/test/overlay.test.ts index 8c45c7d..9c6e255 100644 --- a/test/overlay.test.ts +++ b/test/overlay.test.ts @@ -193,8 +193,10 @@ describe('streaming', () => { `[pip_in] scale=${videoDefinition.width / 8}x${videoDefinition.height / 8} [pip_out]; ` + // Overlay 'pip_out' over 'main_in' at the specified offset to obtain 'out' `[main_in][pip_out] overlay=x=${videoDefinition.width * 13 / 16}:y=${videoDefinition.height / 16} [out]; `, + // A filter must have a single time base timeBase: videoDefinition.timeBase }); + // These should be available based on the above configuration assert.instanceOf(filter.src['main_in'], Writable); assert.instanceOf(filter.src['pip_in'], Writable); assert.instanceOf(filter.sink['out'], Readable); From 136f378e52e254824f4f445ba311b3eb9a4ecada Mon Sep 17 00:00:00 2001 From: Momtchil Momtchev Date: Sun, 31 Dec 2023 14:12:42 +0100 Subject: [PATCH 04/10] test-example for overlaying text from ImageMagick using a filter --- src/lib/Filter.ts | 2 + test/overlay.test.ts | 110 +++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 108 insertions(+), 4 deletions(-) diff --git a/src/lib/Filter.ts b/src/lib/Filter.ts index a1479dd..6eb34b5 100644 --- a/src/lib/Filter.ts +++ b/src/lib/Filter.ts @@ -45,6 +45,8 @@ export class Filter extends EventEmitter { for (const inp of Object.keys(options.inputs)) { const def = options.inputs[inp]; if (isVideoDefinition(def)) { + if (!def.pixelFormat || !def.timeBase) + throw new Error('timeBase and pixelFormat are mandatory for filter sources'); filterDescriptor += `buffer@${inp}=video_size=${def.width}x${def.height}:` + `pix_fmt=${def.pixelFormat.toString()}:time_base=${def.timeBase.toString()} [${inp}]; `; } diff --git a/test/overlay.test.ts b/test/overlay.test.ts index 9c6e255..c113780 100644 --- a/test/overlay.test.ts +++ b/test/overlay.test.ts @@ -22,7 +22,7 @@ describe('streaming', () => { done(); }); - it('w/ dynamic overlay (generic ImageMagick overlay version)', (done) => { + it('w/ overlay (ImageMagick overlay version)', (done) => { // Overlaying using ImageMagick is very versatile and allows for maximum quality, // however it is far too slow to be done in realtime const demuxer = new Demuxer({ inputFile: path.resolve(__dirname, 'data', 'launch.mp4') }); @@ -46,8 +46,8 @@ describe('streaming', () => { // Thus, we will add additional pipeline elements to convert the incoming // frames to RGBA8888 and then back to YUV420 // - // See below for an example that is much faster but involves manually - // overlaying YUV420 pixels over the video frame + // See below for a much faster example that uses ffmpeg's filtering + // system to overlay the image // // This is the intermediate format used const videoRGB = { @@ -147,11 +147,113 @@ describe('streaming', () => { }); }); + it('w/ overlay (ffmpeg filter overlay version)', (done) => { + // This uses ffmpeg's filter subsystem to overlay text drawn by ImageMagick + // It reads from a file, overlays and transcodes to a realtime stream. + // This pipeline is fast enough to be usable in real-time even on an older CPU. + // + const demuxer = new Demuxer({ inputFile: path.resolve(__dirname, 'data', 'launch.mp4') }); + + // Use ImageMagick to create an image with the text that + const textImage = new Magick.Image('500x20', 'transparent'); + textImage.draw([ + new Magick.DrawableFont('sans-serif', MagickCore.NormalStyle, 100, MagickCore.NormalStretch), + new Magick.DrawablePointSize(24), + new Magick.DrawableStrokeColor('black'), + new Magick.DrawableText(20, 18, 'The insurance is mandatory, the copyright is not') + ]); + // Convert the image to a single ffmpeg video frame + // We can't use YUV420 because it does not support transparency, RGBA8888 does + textImage.magick('rgba'); + textImage.depth(8); + const textBlob = new Magick.Blob; + textImage.write(textBlob); + const textImagePixelFormat = new ffmpeg.PixelFormat(ffmpeg.AV_PIX_FMT_RGBA); + const textFrame = new ffmpeg.VideoFrame.create(Buffer.from(textBlob.data()), textImagePixelFormat, 500, 20); + + demuxer.on('error', done); + demuxer.on('ready', () => { + try { + + const audioInput = new Discarder; + const videoInput = new VideoDecoder(demuxer.video[0]); + + const videoDefinition = videoInput.definition(); + + const videoOutput = new VideoEncoder({ + type: 'Video', + codec: ffmpeg.AV_CODEC_H264, + bitRate: 2.5e6, + width: videoDefinition.width, + height: videoDefinition.height, + frameRate: new ffmpeg.Rational(25, 1), + pixelFormat: videoDefinition.pixelFormat, + // We will try to go as fast as possible + // H.264 encoding in ffmpeg can be very fast + codecOptions: { preset: 'veryfast' } + }); + + // A Filter is an ffmpeg filter chain + const filter = new Filter({ + inputs: { + // Filter with two inputs + 'video_in': videoDefinition, + 'text_in': { + type: 'Video', + width: 500, + height: 20, + pixelFormat: textImagePixelFormat, + timeBase: videoDefinition.timeBase + } as VideoStreamDefinition + }, + outputs: { + // One output + 'out': videoOutput.definition() + }, + graph: + // Overlay 'text_in' over 'video_in' at the specified offset to obtain 'out' + `[video_in][text_in] overlay=x=20:y=${videoDefinition.height - 40} [out]; `, + // A filter must have a single time base + timeBase: videoDefinition.timeBase + }); + // These should be available based on the above configuration + assert.instanceOf(filter.src['video_in'], Writable); + assert.instanceOf(filter.src['text_in'], Writable); + assert.instanceOf(filter.sink['out'], Readable); + + const muxer = new Muxer({ outputFormat: 'matroska', streams: [videoOutput] }); + assert.instanceOf(muxer.output, Readable); + + muxer.on('finish', done); + muxer.on('error', done); + + const output = fs.createWriteStream(tempFile); + + + // Demuxer -> Decoder -> Filter source 'video_in' + demuxer.video[0].pipe(videoInput).pipe(filter.src['video_in']); + + // Simply send the single frame to the other source, no need for a stream + // (if you want to change subtitles, you will have to push frames with a time base and a pts) + filter.src['text_in'].write(textFrame); + filter.src['text_in'].end(); + + // Filter sink 'out' -> Encoder -> Muxer + filter.sink['out'].pipe(videoOutput).pipe(muxer.video[0]); + + demuxer.audio[0].pipe(audioInput); + muxer.output!.pipe(output); + } catch (err) { + done(err); + } + }); + }); + it('w/ ffmpeg filtering (PiP example)', (done) => { // This uses ffmpeg's filter subsystem to overlay a copy of the video // in a small thumbnail (Picture-in-Picture). // It reads from a file, overlays and transcodes to a realtime stream. - // This pipeline is easily fast enough to be usable in real-time even on an older CPU. + // This pipeline is fast enough to be usable in real-time even on an older CPU. // const demuxer = new Demuxer({ inputFile: path.resolve(__dirname, 'data', 'launch.mp4') }); From 840ee5494299e3f070cde27c12508c4e5aed1be1 Mon Sep 17 00:00:00 2001 From: Momtchil Momtchev Date: Sun, 31 Dec 2023 14:13:42 +0100 Subject: [PATCH 05/10] reformat --- test/overlay.test.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/overlay.test.ts b/test/overlay.test.ts index c113780..aa6772c 100644 --- a/test/overlay.test.ts +++ b/test/overlay.test.ts @@ -154,7 +154,7 @@ describe('streaming', () => { // const demuxer = new Demuxer({ inputFile: path.resolve(__dirname, 'data', 'launch.mp4') }); - // Use ImageMagick to create an image with the text that + // Use ImageMagick to create an image with the text const textImage = new Magick.Image('500x20', 'transparent'); textImage.draw([ new Magick.DrawableFont('sans-serif', MagickCore.NormalStyle, 100, MagickCore.NormalStretch), @@ -229,7 +229,6 @@ describe('streaming', () => { const output = fs.createWriteStream(tempFile); - // Demuxer -> Decoder -> Filter source 'video_in' demuxer.video[0].pipe(videoInput).pipe(filter.src['video_in']); From 1f5d43eac847fb4af8b06bdb4a4e149659d4c441 Mon Sep 17 00:00:00 2001 From: Momtchil Momtchev Date: Mon, 1 Jan 2024 13:55:53 +0100 Subject: [PATCH 06/10] support audio filtering --- .vscode/settings.json | 3 +- deps/avcpp | 2 +- src/binding/avcpp-frame.cc | 20 ++++++ src/binding/avcpp-frame.h | 6 ++ src/binding/avcpp-nobind.cc | 29 ++------ src/lib/Filter.ts | 130 ++++++++++++++++++++++++++---------- test/overlay.test.ts | 44 ++++++++---- 7 files changed, 163 insertions(+), 71 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index e761c9d..8ba2f48 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -5,5 +5,6 @@ "clang-format.fallbackStyle": "LLVM", "[cpp]": { "editor.defaultFormatter": "xaver.clang-format" - } + }, + "editor.tabSize": 2 } diff --git a/deps/avcpp b/deps/avcpp index 1a495ff..ac6f23d 160000 --- a/deps/avcpp +++ b/deps/avcpp @@ -1 +1 @@ -Subproject commit 1a495ff75d68b5303ee8551b47f17c8b63c4d0e5 +Subproject commit ac6f23d51589abc468420f0e6ea779bbef040079 diff --git a/src/binding/avcpp-frame.cc b/src/binding/avcpp-frame.cc index 5aac60d..7bba0f0 100644 --- a/src/binding/avcpp-frame.cc +++ b/src/binding/avcpp-frame.cc @@ -13,3 +13,23 @@ VideoFrameBuffer CopyFrameToBuffer(VideoFrame &frame) { auto size = frame.bufferSize(); return VideoFrameBuffer{{[&frame, size](uint8_t *data) { frame.copyToBuffer(data, size); }, size}}; } + +VideoFrame *GetVideoFrame(BufferSinkFilterContext &sink, OptionalErrorCode ec) { + VideoFrame *frame = new VideoFrame; + if (!sink.getVideoFrame(*frame, ec)) { + delete frame; + return nullptr; + } + + return frame; +} + +AudioSamples *GetAudioFrame(BufferSinkFilterContext &sink, OptionalErrorCode ec) { + AudioSamples *samples = new AudioSamples; + if (!sink.getAudioFrame(*samples, ec)) { + delete samples; + return nullptr; + } + + return samples; +} diff --git a/src/binding/avcpp-frame.h b/src/binding/avcpp-frame.h index 6784be3..f02c860 100644 --- a/src/binding/avcpp-frame.h +++ b/src/binding/avcpp-frame.h @@ -64,3 +64,9 @@ AudioSamples CreateAudioSamples(Nobind::Typemap::Buffer buffer, SampleFormat sam uint64_t channelLayout, int sampleRate); VideoFrame CreateVideoFrame(Nobind::Typemap::Buffer buffer, PixelFormat pixelFormat, int width, int height); + +// These extension functions are needed to wrap their avcpp counterparts which return data in an argument +// They return pointers to avoid unnecessary copying of the VideoFrame - as JavaScript makes no difference +// In JavaScript all C++ objects are heap-allocated objects referenced by a pointer +VideoFrame *GetVideoFrame(BufferSinkFilterContext &sink, OptionalErrorCode ec); +AudioSamples *GetAudioFrame(BufferSinkFilterContext &sink, OptionalErrorCode ec); diff --git a/src/binding/avcpp-nobind.cc b/src/binding/avcpp-nobind.cc index ca5eac4..21eb202 100644 --- a/src/binding/avcpp-nobind.cc +++ b/src/binding/avcpp-nobind.cc @@ -299,7 +299,9 @@ NOBIND_MODULE_DATA(ffmpeg, m, ffmpegInstanceData) { .cons() .cons() .def<&ChannelLayout::channels>("channels") - .def<&ChannelLayout::layout>("layout"); + .def<&ChannelLayout::layout>("layout") + .def<&ChannelLayout::isValid>("isValid") + .def(&ChannelLayoutView::describe)>("toString"); m.def("ChannelLayoutView"); @@ -359,6 +361,7 @@ NOBIND_MODULE_DATA(ffmpeg, m, ffmpegInstanceData) { .ext>(&ToString)>("toString"); m.def("AudioSamples") + .def<&AudioSamples::null>("null") .def<&CreateAudioSamples>("create") .def<&AudioSamples::isNull>("isNull") .def<&AudioSamples::isComplete>("isComplete") @@ -451,28 +454,8 @@ NOBIND_MODULE_DATA(ffmpeg, m, ffmpegInstanceData) { m.def("BufferSinkFilterContext") .cons() - // getVideoFrame & cie do not have good native-feel interface, but this avoids - // copying video frames - .def( - &BufferSinkFilterContext::getVideoFrame)>("getVideoFrame") - .def( - &BufferSinkFilterContext::getVideoFrame)>("getVideoFrameFlags") - .def( - &BufferSinkFilterContext::getAudioFrame)>("getAudioFrame") - .def( - &BufferSinkFilterContext::getAudioFrame)>("getAudioFrameFlags") - .def( - &BufferSinkFilterContext::getVideoFrame), - Nobind::ReturnAsync>("getVideoFrameAsync") - .def( - &BufferSinkFilterContext::getVideoFrame), - Nobind::ReturnAsync>("getVideoFrameFlagsAsync") - .def( - &BufferSinkFilterContext::getAudioFrame), - Nobind::ReturnAsync>("getAudioFrameAsync") - .def( - &BufferSinkFilterContext::getAudioFrame), - Nobind::ReturnAsync>("getAudioFrameFlagsAsync") + .ext<&GetVideoFrame, Nobind::ReturnNullAccept>("getVideoFrame") + .ext<&GetAudioFrame, Nobind::ReturnNullAccept>("getAudioFrame") .def<&BufferSinkFilterContext::setFrameSize>("setFrameSize") .def<&BufferSinkFilterContext::frameRate>("frameRate") .def<&BufferSinkFilterContext::checkFilter>("checkFilter"); diff --git a/src/lib/Filter.ts b/src/lib/Filter.ts index 6eb34b5..db11981 100644 --- a/src/lib/Filter.ts +++ b/src/lib/Filter.ts @@ -22,12 +22,17 @@ export interface FilterOptions { export class Filter extends EventEmitter { protected filterGraph: any; protected bufferSrc: Record; protected bufferSink: Record; protected timeBase: any; protected stillStreamingSources: number; @@ -51,14 +56,18 @@ export class Filter extends EventEmitter { `pix_fmt=${def.pixelFormat.toString()}:time_base=${def.timeBase.toString()} [${inp}]; `; } if (isAudioDefinition(def)) { - throw new Error('later'); + filterDescriptor += `abuffer@${inp}=sample_rate=${def.sampleRate}:` + + `channel_layout=${def.channelLayout.toString()}:` + + `sample_fmt=${def.sampleFormat.toString()}:time_base=${def.timeBase.toString()} [${inp}]; `; } } filterDescriptor += options.graph; for (const outp of Object.keys(options.outputs)) { const def = options.outputs[outp]; if (isVideoDefinition(def)) { - filterDescriptor += `[${outp}] buffersink@${outp}`; + filterDescriptor += `[${outp}] buffersink@${outp}; `; + } else if (isAudioDefinition(def)) { + filterDescriptor += `[${outp}] abuffersink@${outp}; `; } } verbose(`Filter: constructed graph ${filterDescriptor}`); @@ -70,9 +79,27 @@ export class Filter extends EventEmitter { this.src = {}; this.bufferSrc = {}; for (const inp of Object.keys(options.inputs)) { + const def = options.inputs[inp]; + let nullFrame: any; + let id: string; + let type: 'Audio' | 'Video'; + if (isVideoDefinition(def)) { + nullFrame = ffmpeg.VideoFrame.null(); + id = `buffer@${inp}`; + type = 'Video'; + } else if (isAudioDefinition(def)) { + nullFrame = ffmpeg.AudioSamples.null(); + id = `abuffer@${inp}`; + type = 'Audio'; + } else { + throw new Error('Only Video and Audio filtering is supported'); + } this.bufferSrc[inp] = { - buffer: new ffmpeg.BufferSrcFilterContext(this.filterGraph.filter(`buffer@${inp}`)), - busy: false + type, + id, + buffer: new ffmpeg.BufferSrcFilterContext(this.filterGraph.filter(id)), + busy: false, + nullFrame }; this.src[inp] = new Writable({ objectMode: true, @@ -91,8 +118,7 @@ export class Filter extends EventEmitter { }, final: (callback: (error?: Error | null | undefined) => void): void => { verbose(`Filter: end source [${inp}]`); - // VideoFrame.null() is a special EOF frame - this.write(inp, ffmpeg.VideoFrame.null(), callback); + this.write(inp, nullFrame, callback); callback(null); this.stillStreamingSources--; if (this.stillStreamingSources === 0) @@ -108,8 +134,22 @@ export class Filter extends EventEmitter { this.sink = {}; this.bufferSink = {}; for (const outp of Object.keys(options.outputs)) { + const def = options.outputs[outp]; + let id: string; + let type: 'Audio' | 'Video'; + if (isVideoDefinition(def)) { + id = `buffersink@${outp}`; + type = 'Video'; + } else if (isAudioDefinition(def)) { + id = `abuffersink@${outp}`; + type = 'Audio'; + } else { + throw new Error('Only Video and Audio filtering is supported'); + } this.bufferSink[outp] = { - buffer: new ffmpeg.BufferSinkFilterContext(this.filterGraph.filter(`buffersink@${outp}`)), + type, + id, + buffer: new ffmpeg.BufferSinkFilterContext(this.filterGraph.filter(id)), waitingToRead: 0 }; this.sink[outp] = new Readable({ @@ -145,29 +185,41 @@ export class Filter extends EventEmitter { } src.busy = true; - frame.setPictureType(ffmpeg.AV_PICTURE_TYPE_NONE); + verbose(`Filter: received data for source [${id}]`); frame.setTimeBase(this.timeBase); frame.setStreamIndex(0); - verbose(`Filter: received data for source [${id}]`); - src.buffer.writeVideoFrameAsync(frame) - .then(() => { - src.busy = false; - verbose(`Filter: consumed data for source [${id}], pts=${frame.pts().toString()}`); - callback(null); - // Now that we pushed more data, try reading again, refer to 1* below - for (const sink of Object.keys(this.bufferSink)) { - // This is fully synchronous on purpose - otherwise we might run - // into complex synchronization issues where someone else manages - // to call read between the two operations - const size = this.bufferSink[sink].waitingToRead; - if (size) { - verbose(`Filter: wake up sink [${sink}]`); - this.bufferSink[sink].waitingToRead = 0; - this.read(sink, size); - } + let q: Promise; + if (src.type === 'Video') { + if (!(frame instanceof ffmpeg.VideoFrame)) + throw new Error('Filter source video input must be a stream of VideoFrames'); + frame.setPictureType(ffmpeg.AV_PICTURE_TYPE_NONE); + q = src.buffer.writeVideoFrameAsync(frame); + } else if (src.type === 'Audio') { + if (!(frame instanceof ffmpeg.AudioSamples)) + throw new Error('Filter source video input must be a stream of AudioSamples'); + q = src.buffer.writeAudioSamplesAsync(frame); + } else { + throw new Error('Only Video and Audio filtering is supported'); + } + + q.then(() => { + src.busy = false; + verbose(`Filter: consumed data for source [${id}], pts=${frame.pts().toString()}`); + callback(null); + // Now that we pushed more data, try reading again, refer to 1* below + for (const sink of Object.keys(this.bufferSink)) { + // This is fully synchronous on purpose - otherwise we might run + // into complex synchronization issues where someone else manages + // to call read between the two operations + const size = this.bufferSink[sink].waitingToRead; + if (size) { + verbose(`Filter: wake up sink [${sink}]`); + this.bufferSink[sink].waitingToRead = 0; + this.read(sink, size); } - }) + } + }) .catch(callback); } @@ -178,17 +230,27 @@ export class Filter extends EventEmitter { } verbose(`Filter: received a request for data from sink [${id}]`); + let getFrame: () => any; + if (sink.type === 'Video') { + getFrame = sink.buffer.getVideoFrame.bind(sink.buffer); + } else if (sink.type === 'Audio') { + getFrame = sink.buffer.getAudioFrame.bind(sink.buffer); + } else { + throw new Error('Only Video and Audio filtering is supported'); + } // read must always return immediately // this means that we depend on ffmpeg not doing any filtering work on this call. - // This is the meaning of the special flag. - const videoFrame = new ffmpeg.VideoFrame; + // TODO: Check to what extent this is true + let frame; let frames = 0; - while (sink.buffer.getVideoFrame(videoFrame) && frames < size) { - verbose(`Filter: sent data from sink [${id}], pts=${videoFrame.pts().toString()}`); - videoFrame.setPictureType(ffmpeg.AV_PICTURE_TYPE_NONE); - videoFrame.setTimeBase(this.timeBase); - videoFrame.setStreamIndex(0); - this.sink[id].push(videoFrame); + while ((frame = getFrame()) !== null && frames < size) { + verbose(`Filter: sent data from sink [${id}], pts=${frame.pts().toString()}`); + if (sink.type === 'Video') { + frame.setPictureType(ffmpeg.AV_PICTURE_TYPE_NONE); + } + frame.setTimeBase(this.timeBase); + frame.setStreamIndex(0); + this.sink[id].push(frame); frames++; } if (this.stillStreamingSources === 0) { diff --git a/test/overlay.test.ts b/test/overlay.test.ts index aa6772c..3c0f680 100644 --- a/test/overlay.test.ts +++ b/test/overlay.test.ts @@ -248,9 +248,10 @@ describe('streaming', () => { }); }); - it('w/ ffmpeg filtering (PiP example)', (done) => { + it('w/ video overlay (ffmpeg PiP filter)', (done) => { // This uses ffmpeg's filter subsystem to overlay a copy of the video // in a small thumbnail (Picture-in-Picture). + // It also processes the audio. // It reads from a file, overlays and transcodes to a realtime stream. // This pipeline is fast enough to be usable in real-time even on an older CPU. // @@ -260,9 +261,10 @@ describe('streaming', () => { demuxer.on('ready', () => { try { - const audioInput = new Discarder; + const audioInput = new AudioDecoder(demuxer.audio[0]); const videoInput = new VideoDecoder(demuxer.video[0]); + const audioDefinition = audioInput.definition(); const videoDefinition = videoInput.definition(); const videoOutput = new VideoEncoder({ @@ -278,31 +280,46 @@ describe('streaming', () => { codecOptions: { preset: 'veryfast' } }); + const audioOutput = new AudioEncoder({ + type: 'Audio', + codec: ffmpeg.AV_CODEC_AAC, + bitRate: 128e3, + sampleRate: audioDefinition.sampleRate, + sampleFormat: audioDefinition.sampleFormat, + channelLayout: audioDefinition.channelLayout + }); + // A Filter is an ffmpeg filter chain const filter = new Filter({ inputs: { - // Filter with two identical inputs (the same video) + // Filter with two identical video inputs (the same video) 'main_in': videoDefinition, - 'pip_in': videoDefinition + 'pip_in': videoDefinition, + // and one audio input + 'audio_in': audioDefinition }, outputs: { - // One output - 'out': videoOutput.definition() + // Outputs + 'video_out': videoOutput.definition(), + 'audio_out': audioOutput.definition() }, graph: // Take 'pip_in' and rescale it to 1/8th to obtain 'pip_out' `[pip_in] scale=${videoDefinition.width / 8}x${videoDefinition.height / 8} [pip_out]; ` + // Overlay 'pip_out' over 'main_in' at the specified offset to obtain 'out' - `[main_in][pip_out] overlay=x=${videoDefinition.width * 13 / 16}:y=${videoDefinition.height / 16} [out]; `, + `[main_in][pip_out] overlay=x=${videoDefinition.width * 13 / 16}:y=${videoDefinition.height / 16} [video_out]; ` + + // Simply copy the audio through the filter + '[audio_in] acopy [audio_out]; ', // A filter must have a single time base timeBase: videoDefinition.timeBase }); // These should be available based on the above configuration assert.instanceOf(filter.src['main_in'], Writable); assert.instanceOf(filter.src['pip_in'], Writable); - assert.instanceOf(filter.sink['out'], Readable); + assert.instanceOf(filter.sink['video_out'], Readable); + assert.instanceOf(filter.sink['audio_out'], Readable); - const muxer = new Muxer({ outputFormat: 'matroska', streams: [videoOutput] }); + const muxer = new Muxer({ outputFormat: 'matroska', streams: [videoOutput, audioOutput] }); assert.instanceOf(muxer.output, Readable); muxer.on('finish', done); @@ -317,16 +334,19 @@ describe('streaming', () => { // Demuxer -> Decoder -> T junction demuxer.video[0].pipe(videoInput); + // Demuxer -> Decoder -> Filter source 'audio_in' + demuxer.audio[0].pipe(audioInput).pipe(filter.src['audio_in']); + // T junction -> Filter source 'main_in' videoInput1.pipe(filter.src['main_in']); // T junction -> Filter source 'pip_in' videoInput2.pipe(filter.src['pip_in']); - // Filter sink 'out' -> Encoder -> Muxer - filter.sink['out'].pipe(videoOutput).pipe(muxer.video[0]); + // Filter sinks -> Encoder -> Muxer + filter.sink['video_out'].pipe(videoOutput).pipe(muxer.video[0]); + filter.sink['audio_out'].pipe(audioOutput).pipe(muxer.audio[0]); - demuxer.audio[0].pipe(audioInput); muxer.output!.pipe(output); } catch (err) { done(err); From 795048811bf2d8eded79a199972c3fe89218d7af Mon Sep 17 00:00:00 2001 From: Momtchil Momtchev Date: Mon, 1 Jan 2024 14:58:13 +0100 Subject: [PATCH 07/10] test the error handling --- src/lib/Filter.ts | 84 +++--- test/{overlay.test.ts => filtering.test.ts} | 287 +++++++++++--------- test/streaming.test.ts | 129 ++++++++- 3 files changed, 330 insertions(+), 170 deletions(-) rename test/{overlay.test.ts => filtering.test.ts} (60%) diff --git a/src/lib/Filter.ts b/src/lib/Filter.ts index db11981..101d378 100644 --- a/src/lib/Filter.ts +++ b/src/lib/Filter.ts @@ -125,6 +125,7 @@ export class Filter extends EventEmitter { this.emit('finish'); } }); + this.src[inp].on('error', this.destroy.bind(this)); this.stillStreamingSources++; Promise.resolve().then(() => { this.emit('ready'); @@ -158,22 +159,23 @@ export class Filter extends EventEmitter { this.read(outp, size); } }); + this.sink[outp].on('error', this.destroy.bind(this)); } } - destroy(error: Error) { + protected destroy(error: Error) { if (this.destroyed) return; this.destroyed = true; for (const s of Object.keys(this.bufferSrc)) { - this.sink[s].destroy(error); + this.src[s].destroy(error); } for (const s of Object.keys(this.bufferSink)) { - this.src[s].destroy(error); + this.sink[s].destroy(error); } - this.emit('error'); + this.emit('error', error); } - write(id: string, frame: any, callback: (error?: Error | null | undefined) => void) { + protected write(id: string, frame: any, callback: (error?: Error | null | undefined) => void) { const src = this.bufferSrc[id]; if (!src) { return void callback(new Error(`Invalid buffer src [${id}]`)); @@ -186,44 +188,50 @@ export class Filter extends EventEmitter { src.busy = true; verbose(`Filter: received data for source [${id}]`); - frame.setTimeBase(this.timeBase); - frame.setStreamIndex(0); - let q: Promise; - if (src.type === 'Video') { - if (!(frame instanceof ffmpeg.VideoFrame)) - throw new Error('Filter source video input must be a stream of VideoFrames'); - frame.setPictureType(ffmpeg.AV_PICTURE_TYPE_NONE); - q = src.buffer.writeVideoFrameAsync(frame); - } else if (src.type === 'Audio') { - if (!(frame instanceof ffmpeg.AudioSamples)) - throw new Error('Filter source video input must be a stream of AudioSamples'); - q = src.buffer.writeAudioSamplesAsync(frame); - } else { - throw new Error('Only Video and Audio filtering is supported'); - } + try { + let q: Promise; + if (src.type === 'Video') { + if (!(frame instanceof ffmpeg.VideoFrame)) + return void callback(new Error('Filter source video input must be a stream of VideoFrames')); + frame.setPictureType(ffmpeg.AV_PICTURE_TYPE_NONE); + frame.setTimeBase(this.timeBase); + frame.setStreamIndex(0); + q = src.buffer.writeVideoFrameAsync(frame); + } else if (src.type === 'Audio') { + if (!(frame instanceof ffmpeg.AudioSamples)) + return void callback(new Error('Filter source video input must be a stream of AudioSamples')); + frame.setTimeBase(this.timeBase); + frame.setStreamIndex(0); + q = src.buffer.writeAudioSamplesAsync(frame); + } else { + return void callback(new Error('Only Video and Audio filtering is supported')); + } - q.then(() => { - src.busy = false; - verbose(`Filter: consumed data for source [${id}], pts=${frame.pts().toString()}`); - callback(null); - // Now that we pushed more data, try reading again, refer to 1* below - for (const sink of Object.keys(this.bufferSink)) { - // This is fully synchronous on purpose - otherwise we might run - // into complex synchronization issues where someone else manages - // to call read between the two operations - const size = this.bufferSink[sink].waitingToRead; - if (size) { - verbose(`Filter: wake up sink [${sink}]`); - this.bufferSink[sink].waitingToRead = 0; - this.read(sink, size); + q.then(() => { + src.busy = false; + verbose(`Filter: consumed data for source [${id}], pts=${frame.pts().toString()}`); + callback(null); + // Now that we pushed more data, try reading again, refer to 1* below + for (const sink of Object.keys(this.bufferSink)) { + // This is fully synchronous on purpose - otherwise we might run + // into complex synchronization issues where someone else manages + // to call read between the two operations + const size = this.bufferSink[sink].waitingToRead; + if (size) { + verbose(`Filter: wake up sink [${sink}]`); + this.bufferSink[sink].waitingToRead = 0; + this.read(sink, size); + } } - } - }) - .catch(callback); + }) + .catch(callback); + } catch (err) { + callback(err as Error); + } } - read(id: string, size: number) { + protected read(id: string, size: number) { const sink = this.bufferSink[id]; if (!sink) { throw new Error(`Invalid buffer sink [${id}]`); diff --git a/test/overlay.test.ts b/test/filtering.test.ts similarity index 60% rename from test/overlay.test.ts rename to test/filtering.test.ts index 3c0f680..7175617 100644 --- a/test/overlay.test.ts +++ b/test/filtering.test.ts @@ -5,147 +5,23 @@ import ReadableStreamClone from 'readable-stream-clone'; import { assert } from 'chai'; import ffmpeg from '@mmomtchev/ffmpeg'; -import { Muxer, Demuxer, VideoDecoder, VideoEncoder, AudioDecoder, AudioEncoder, VideoTransform, Filter, Discarder } from '@mmomtchev/ffmpeg/stream'; +import { Muxer, Demuxer, VideoDecoder, VideoEncoder, AudioDecoder, AudioEncoder, Filter, Discarder } from '@mmomtchev/ffmpeg/stream'; import { Readable, Writable } from 'node:stream'; import { MediaTransform, VideoStreamDefinition } from '../lib/MediaStream'; import { Magick, MagickCore } from 'magickwand.js'; ffmpeg.setLogLevel(process.env.DEBUG_FFMPEG ? ffmpeg.AV_LOG_DEBUG : ffmpeg.AV_LOG_ERROR); -const tempFile = path.resolve(__dirname, 'overlay-temp.mkv'); +const tempFile = path.resolve(__dirname, 'filter-temp.mkv'); -describe('streaming', () => { - afterEach('delete temporary', (done) => { +describe('filtering', () => { + /*afterEach('delete temporary', (done) => { + console.log('DONE, DELETE'); if (!process.env.DEBUG_ALL && !process.env.DEBUG_MUXER) fs.rm(tempFile, done); else done(); - }); - - it('w/ overlay (ImageMagick overlay version)', (done) => { - // Overlaying using ImageMagick is very versatile and allows for maximum quality, - // however it is far too slow to be done in realtime - const demuxer = new Demuxer({ inputFile: path.resolve(__dirname, 'data', 'launch.mp4') }); - - demuxer.on('error', done); - demuxer.on('ready', () => { - try { - const audioInput = new AudioDecoder(demuxer.audio[0]); - const videoInput = new VideoDecoder(demuxer.video[0]); - - const videoDefinition = videoInput.definition(); - const audioDefinition = audioInput.definition(); - - // Alas, even if ImageMagick supports YUV420 raw pixel encoding, it does so - // in a particularly inefficient manner - it performs a very high-quality - // transformation both when reading and when writing the image, preserving - // all available information while supporting arbitrary color depths - // - // On the other side, ffmpeg does this transformation using a low-level - // hand-optimized SSE assembly loop and it is more than 10 times faster - // Thus, we will add additional pipeline elements to convert the incoming - // frames to RGBA8888 and then back to YUV420 - // - // See below for a much faster example that uses ffmpeg's filtering - // system to overlay the image - // - // This is the intermediate format used - const videoRGB = { - type: 'Video', - codec: videoDefinition.codec, - bitRate: videoDefinition.bitRate, - width: videoDefinition.width, - height: videoDefinition.height, - frameRate: new ffmpeg.Rational(25, 1), - pixelFormat: new ffmpeg.PixelFormat(ffmpeg.AV_PIX_FMT_RGBA) - } as VideoStreamDefinition; - // A transformer that uses ffmpeg to convert from YUV420 to RGBA8888 - const toRGB = new VideoTransform({ - input: videoDefinition, - output: videoRGB, - interpolation: ffmpeg.SWS_BILINEAR - }); - // A transformer that uses ffmpeg to convert from RGBA8888 to YUV420 - const fromRGB = new VideoTransform({ - output: videoDefinition, - input: videoRGB, - interpolation: ffmpeg.SWS_BILINEAR - }); - - const videoOutput = new VideoEncoder({ - type: 'Video', - codec: ffmpeg.AV_CODEC_H264, - bitRate: 2.5e6, - width: videoDefinition.width, - height: videoDefinition.height, - frameRate: new ffmpeg.Rational(25, 1), - pixelFormat: videoDefinition.pixelFormat - }); - - const audioOutput = new AudioEncoder({ - type: 'Audio', - codec: ffmpeg.AV_CODEC_AAC, - bitRate: 128e3, - sampleRate: audioDefinition.sampleRate, - sampleFormat: audioDefinition.sampleFormat, - channelLayout: audioDefinition.channelLayout - }); - - // We will be overlaying a subtitle over the image - // Drawing text is a very expensive operation, so we won't be drawing on each frame - // We will draw it once and then overlay it on each frame - const textImage = new Magick.Image('500x20', 'transparent'); - textImage.draw([ - new Magick.DrawableFont('sans-serif', MagickCore.NormalStyle, 100, MagickCore.NormalStretch), - new Magick.DrawablePointSize(24), - new Magick.DrawableStrokeColor('black'), - new Magick.DrawableText(20, 18, 'The insurance is mandatory, the copyright is not') - ]); - - const frame = new Magick.Image; - // A MediaTransform is a generic user-definable object-mode stream transform - const overlay = new MediaTransform({ - transform(chunk, encoding, callback) { - assert.instanceOf(chunk, ffmpeg.VideoFrame); - // Create a Magick.Blob from the ffmpeg.VideoFrame - const blob = new Magick.Blob(chunk.data().buffer); - // Import this binary blob into an Image object, specify the RGBA 8:8:8:8 raw pixel format - frame.readAsync(blob, `${videoDefinition.width}x${videoDefinition.height}`, 8, 'rgba') - // Overlay the subtitle on this image - .then(() => frame.compositeAsync(textImage, `+0+${videoDefinition.height - 40}`, MagickCore.MultiplyCompositeOp)) - // Export the image back to a binary buffer - .then(() => frame.writeAsync(blob)) - // Extract an ArrayBuffer from the blob - .then(() => blob.dataAsync()) - .then((ab) => { - // Wrap it first in a Node.js Buffer, then in a ffmpeg.VideoFrame - const output = ffmpeg.VideoFrame.create( - Buffer.from(ab), videoRGB.pixelFormat, videoDefinition.width, videoDefinition.height); - // Each frame must carry a timestamp, we copy it from the original - output.setPts(chunk.pts()); - this.push(output); - callback(); - }) - .catch(callback); - } - }); - - const muxer = new Muxer({ outputFormat: 'matroska', streams: [videoOutput, audioOutput] }); - - muxer.on('finish', done); - muxer.on('error', done); - - assert.instanceOf(muxer.output, Readable); - const output = fs.createWriteStream(tempFile); - - demuxer.video[0].pipe(videoInput).pipe(toRGB).pipe(overlay).pipe(fromRGB).pipe(videoOutput).pipe(muxer.video[0]); - demuxer.audio[0].pipe(audioInput).pipe(audioOutput).pipe(muxer.audio[0]); - muxer.output!.pipe(output); - } catch (err) { - done(err); - } - }); - }); + });*/ it('w/ overlay (ffmpeg filter overlay version)', (done) => { // This uses ffmpeg's filter subsystem to overlay text drawn by ImageMagick @@ -354,4 +230,155 @@ describe('streaming', () => { }); }); + describe('error handling', () => { + it('error when constructing the filter', (done) => { + const demuxer = new Demuxer({ inputFile: path.resolve(__dirname, 'data', 'launch.mp4') }); + const output = fs.createWriteStream(tempFile); + + demuxer.on('error', done); + demuxer.on('ready', () => { + try { + + const audioInput = new AudioDecoder(demuxer.audio[0]); + const videoInput = new VideoDecoder(demuxer.video[0]); + + const audioDefinition = audioInput.definition(); + const videoDefinition = videoInput.definition(); + + const videoOutput = new VideoEncoder({ + type: 'Video', + codec: ffmpeg.AV_CODEC_H264, + bitRate: 2.5e6, + width: videoDefinition.width, + height: videoDefinition.height, + frameRate: new ffmpeg.Rational(25, 1), + pixelFormat: videoDefinition.pixelFormat, + codecOptions: { preset: 'veryfast' } + }); + + const audioOutput = new AudioEncoder({ + type: 'Audio', + codec: ffmpeg.AV_CODEC_AAC, + bitRate: 128e3, + sampleRate: audioDefinition.sampleRate, + sampleFormat: audioDefinition.sampleFormat, + channelLayout: audioDefinition.channelLayout + }); + + const filter = new Filter({ + inputs: { + 'video_in': videoDefinition, + 'audio_in': audioDefinition + }, + outputs: { + 'video_out': videoOutput.definition(), + 'audio_out': audioOutput.definition() + }, + graph: + '[audio_in] acopy [video_out]; ', + timeBase: videoDefinition.timeBase + }); + + const muxer = new Muxer({ outputFormat: 'matroska', streams: [videoOutput, audioOutput] }); + assert.instanceOf(muxer.output, Readable); + + muxer.on('finish', () => done(new Error('Expected an error'))); + muxer.on('error', done); + + demuxer.video[0].pipe(videoInput).pipe(filter.src['video_in']); + demuxer.audio[0].pipe(audioInput).pipe(filter.src['audio_in']); + filter.sink['video_out'].pipe(videoOutput).pipe(muxer.video[0]); + filter.sink['audio_out'].pipe(audioOutput).pipe(muxer.audio[0]); + + muxer.output!.pipe(output); + } catch (err) { + output.close(); + // ffmpeg is not known for its explicative error messages + // The real information is usually found in stderr + assert.match((err as Error).message, /Invalid argument/); + done(); + } + }); + }); + + it('filter error while streaming', (done) => { + const demuxer = new Demuxer({ inputFile: path.resolve(__dirname, 'data', 'launch.mp4') }); + const output = fs.createWriteStream(tempFile); + + demuxer.on('error', done); + demuxer.on('ready', () => { + try { + + const audioInput = new AudioDecoder(demuxer.audio[0]); + const videoInput = new VideoDecoder(demuxer.video[0]); + + const audioDefinition = audioInput.definition(); + const videoDefinition = videoInput.definition(); + + const videoOutput = new VideoEncoder({ + type: 'Video', + codec: ffmpeg.AV_CODEC_H264, + bitRate: 2.5e6, + width: videoDefinition.width, + height: videoDefinition.height, + frameRate: new ffmpeg.Rational(25, 1), + pixelFormat: videoDefinition.pixelFormat, + codecOptions: { preset: 'veryfast' } + }); + + const audioOutput = new AudioEncoder({ + type: 'Audio', + codec: ffmpeg.AV_CODEC_AAC, + bitRate: 128e3, + sampleRate: audioDefinition.sampleRate, + sampleFormat: audioDefinition.sampleFormat, + channelLayout: audioDefinition.channelLayout + }); + + let frames = 0; + const injectError = new MediaTransform({ + transform(chunk, encoding, callback) { + if (frames++ === 100) this.push('invalid'); + else this.push(chunk); + callback(); + } + }); + + const filter = new Filter({ + inputs: { + 'video_in': videoDefinition, + 'audio_in': audioDefinition + }, + outputs: { + 'video_out': videoOutput.definition(), + 'audio_out': audioOutput.definition() + }, + graph: + '[video_in] copy [video_out]; ' + + '[audio_in] acopy [audio_out]; ', + timeBase: videoDefinition.timeBase + }); + + const muxer = new Muxer({ outputFormat: 'matroska', streams: [videoOutput, audioOutput] }); + assert.instanceOf(muxer.output, Readable); + + muxer.on('finish', () => done(new Error('Expected an error'))); + filter.on('error', (error) => { + output.close(); + assert.match((error as Error).message, /Filter source video input must be a stream of VideoFrames/); + done(); + }); + + demuxer.video[0].pipe(videoInput).pipe(injectError).pipe(filter.src['video_in']); + demuxer.audio[0].pipe(audioInput).pipe(filter.src['audio_in']); + filter.sink['video_out'].pipe(videoOutput).pipe(muxer.video[0]); + filter.sink['audio_out'].pipe(audioOutput).pipe(muxer.audio[0]); + + muxer.output!.pipe(output); + } catch (err) { + done(err); + } + }); + }); + }); }); diff --git a/test/streaming.test.ts b/test/streaming.test.ts index 22d7457..688ae4c 100644 --- a/test/streaming.test.ts +++ b/test/streaming.test.ts @@ -4,9 +4,10 @@ import * as fs from 'node:fs'; import { assert } from 'chai'; import ffmpeg from '@mmomtchev/ffmpeg'; -import { Muxer, Demuxer, VideoDecoder, VideoEncoder, AudioDecoder, AudioEncoder, AudioTransform } from '@mmomtchev/ffmpeg/stream'; +import { Muxer, Demuxer, VideoDecoder, VideoEncoder, AudioDecoder, AudioEncoder, AudioTransform, VideoTransform } from '@mmomtchev/ffmpeg/stream'; import { Readable, Transform, TransformCallback } from 'node:stream'; -import { MediaTransform } from '../lib/MediaStream'; +import { MediaTransform, VideoStreamDefinition } from '../lib/MediaStream'; +import { Magick, MagickCore } from 'magickwand.js'; ffmpeg.setLogLevel(process.env.DEBUG_FFMPEG ? ffmpeg.AV_LOG_DEBUG : ffmpeg.AV_LOG_ERROR); @@ -330,4 +331,128 @@ describe('streaming', () => { }); }); + it('w/ overlay (ImageMagick overlay version)', (done) => { + // Overlaying using ImageMagick is very versatile and allows for maximum quality, + // however it is far too slow to be done in realtime + const demuxer = new Demuxer({ inputFile: path.resolve(__dirname, 'data', 'launch.mp4') }); + + demuxer.on('error', done); + demuxer.on('ready', () => { + try { + const audioInput = new AudioDecoder(demuxer.audio[0]); + const videoInput = new VideoDecoder(demuxer.video[0]); + + const videoDefinition = videoInput.definition(); + const audioDefinition = audioInput.definition(); + + // Alas, even if ImageMagick supports YUV420 raw pixel encoding, it does so + // in a particularly inefficient manner - it performs a very high-quality + // transformation both when reading and when writing the image, preserving + // all available information while supporting arbitrary color depths + // + // On the other side, ffmpeg does this transformation using a low-level + // hand-optimized SSE assembly loop and it is more than 10 times faster + // Thus, we will add additional pipeline elements to convert the incoming + // frames to RGBA8888 and then back to YUV420 + // + // See the example in filtering for a much faster example that uses + // ffmpeg's filtering system to overlay the image + // + // This is the intermediate format used + const videoRGB = { + type: 'Video', + codec: videoDefinition.codec, + bitRate: videoDefinition.bitRate, + width: videoDefinition.width, + height: videoDefinition.height, + frameRate: new ffmpeg.Rational(25, 1), + pixelFormat: new ffmpeg.PixelFormat(ffmpeg.AV_PIX_FMT_RGBA) + } as VideoStreamDefinition; + // A transformer that uses ffmpeg to convert from YUV420 to RGBA8888 + const toRGB = new VideoTransform({ + input: videoDefinition, + output: videoRGB, + interpolation: ffmpeg.SWS_BILINEAR + }); + // A transformer that uses ffmpeg to convert from RGBA8888 to YUV420 + const fromRGB = new VideoTransform({ + output: videoDefinition, + input: videoRGB, + interpolation: ffmpeg.SWS_BILINEAR + }); + + const videoOutput = new VideoEncoder({ + type: 'Video', + codec: ffmpeg.AV_CODEC_H264, + bitRate: 2.5e6, + width: videoDefinition.width, + height: videoDefinition.height, + frameRate: new ffmpeg.Rational(25, 1), + pixelFormat: videoDefinition.pixelFormat + }); + + const audioOutput = new AudioEncoder({ + type: 'Audio', + codec: ffmpeg.AV_CODEC_AAC, + bitRate: 128e3, + sampleRate: audioDefinition.sampleRate, + sampleFormat: audioDefinition.sampleFormat, + channelLayout: audioDefinition.channelLayout + }); + + // We will be overlaying a subtitle over the image + // Drawing text is a very expensive operation, so we won't be drawing on each frame + // We will draw it once and then overlay it on each frame + const textImage = new Magick.Image('500x20', 'transparent'); + textImage.draw([ + new Magick.DrawableFont('sans-serif', MagickCore.NormalStyle, 100, MagickCore.NormalStretch), + new Magick.DrawablePointSize(24), + new Magick.DrawableStrokeColor('black'), + new Magick.DrawableText(20, 18, 'The insurance is mandatory, the copyright is not') + ]); + + const frame = new Magick.Image; + // A MediaTransform is a generic user-definable object-mode stream transform + const overlay = new MediaTransform({ + transform(chunk, encoding, callback) { + assert.instanceOf(chunk, ffmpeg.VideoFrame); + // Create a Magick.Blob from the ffmpeg.VideoFrame + const blob = new Magick.Blob(chunk.data().buffer); + // Import this binary blob into an Image object, specify the RGBA 8:8:8:8 raw pixel format + frame.readAsync(blob, `${videoDefinition.width}x${videoDefinition.height}`, 8, 'rgba') + // Overlay the subtitle on this image + .then(() => frame.compositeAsync(textImage, `+0+${videoDefinition.height - 40}`, MagickCore.MultiplyCompositeOp)) + // Export the image back to a binary buffer + .then(() => frame.writeAsync(blob)) + // Extract an ArrayBuffer from the blob + .then(() => blob.dataAsync()) + .then((ab) => { + // Wrap it first in a Node.js Buffer, then in a ffmpeg.VideoFrame + const output = ffmpeg.VideoFrame.create( + Buffer.from(ab), videoRGB.pixelFormat, videoDefinition.width, videoDefinition.height); + // Each frame must carry a timestamp, we copy it from the original + output.setPts(chunk.pts()); + this.push(output); + callback(); + }) + .catch(callback); + } + }); + + const muxer = new Muxer({ outputFormat: 'matroska', streams: [videoOutput, audioOutput] }); + + muxer.on('finish', done); + muxer.on('error', done); + + assert.instanceOf(muxer.output, Readable); + const output = fs.createWriteStream(tempFile); + + demuxer.video[0].pipe(videoInput).pipe(toRGB).pipe(overlay).pipe(fromRGB).pipe(videoOutput).pipe(muxer.video[0]); + demuxer.audio[0].pipe(audioInput).pipe(audioOutput).pipe(muxer.audio[0]); + muxer.output!.pipe(output); + } catch (err) { + done(err); + } + }); + }); }); From 21592fca701d0f280ed1e1ac1966a61b7e92d80f Mon Sep 17 00:00:00 2001 From: Momtchil Momtchev Date: Mon, 1 Jan 2024 17:45:17 +0100 Subject: [PATCH 08/10] manually close the muxer on filter error --- test/filtering.test.ts | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/test/filtering.test.ts b/test/filtering.test.ts index 7175617..da82e97 100644 --- a/test/filtering.test.ts +++ b/test/filtering.test.ts @@ -15,13 +15,13 @@ ffmpeg.setLogLevel(process.env.DEBUG_FFMPEG ? ffmpeg.AV_LOG_DEBUG : ffmpeg.AV_LO const tempFile = path.resolve(__dirname, 'filter-temp.mkv'); describe('filtering', () => { - /*afterEach('delete temporary', (done) => { + afterEach('delete temporary', (done) => { console.log('DONE, DELETE'); if (!process.env.DEBUG_ALL && !process.env.DEBUG_MUXER) fs.rm(tempFile, done); else done(); - });*/ + }); it('w/ overlay (ffmpeg filter overlay version)', (done) => { // This uses ffmpeg's filter subsystem to overlay text drawn by ImageMagick @@ -364,9 +364,17 @@ describe('filtering', () => { muxer.on('finish', () => done(new Error('Expected an error'))); filter.on('error', (error) => { - output.close(); - assert.match((error as Error).message, /Filter source video input must be a stream of VideoFrames/); - done(); + try { + assert.match((error as Error).message, /Filter source video input must be a stream of VideoFrames/); + // Simple .pipe() does not close the attached Writable streams (.pipeline() does it) + // and a Muxer that is not closed/destroyed never exits + // (this is a caveat of Node.js streams and it is expected) + muxer.video[0].destroy(error); + output.close(); + done(); + } catch (err) { + done(err); + } }); demuxer.video[0].pipe(videoInput).pipe(injectError).pipe(filter.src['video_in']); From 0c240b4f6209f50cdacbc2441d7042e7303c7c6d Mon Sep 17 00:00:00 2001 From: Momtchil Momtchev Date: Mon, 1 Jan 2024 18:43:19 +0100 Subject: [PATCH 09/10] clean up the debug --- test/filtering.test.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/test/filtering.test.ts b/test/filtering.test.ts index da82e97..9a55881 100644 --- a/test/filtering.test.ts +++ b/test/filtering.test.ts @@ -16,7 +16,6 @@ const tempFile = path.resolve(__dirname, 'filter-temp.mkv'); describe('filtering', () => { afterEach('delete temporary', (done) => { - console.log('DONE, DELETE'); if (!process.env.DEBUG_ALL && !process.env.DEBUG_MUXER) fs.rm(tempFile, done); else From bfa0f9d27f13ec0958eea367350bd1eb8c280ea7 Mon Sep 17 00:00:00 2001 From: Momtchil Momtchev Date: Mon, 1 Jan 2024 19:18:08 +0100 Subject: [PATCH 10/10] update the changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f7a50e..68578ab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [] + - Add `streams/Filter` to support ffmpeg filters - Support piping from a `ReadStream` to a `Demuxer` - Support piping from a `Muxer` to a `WriteStream` - Send `error` events on `Demuxer` and `Muxer`