diff --git a/.vscode/settings.json b/.vscode/settings.json index e761c9d..8ba2f48 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -5,5 +5,6 @@ "clang-format.fallbackStyle": "LLVM", "[cpp]": { "editor.defaultFormatter": "xaver.clang-format" - } + }, + "editor.tabSize": 2 } diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f7a50e..68578ab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [] + - Add `streams/Filter` to support ffmpeg filters - Support piping from a `ReadStream` to a `Demuxer` - Support piping from a `Muxer` to a `WriteStream` - Send `error` events on `Demuxer` and `Muxer` diff --git a/deps/avcpp b/deps/avcpp index d3d3051..ac6f23d 160000 --- a/deps/avcpp +++ b/deps/avcpp @@ -1 +1 @@ -Subproject commit d3d30510179728228ed87a76c5011f2659b8a388 +Subproject commit ac6f23d51589abc468420f0e6ea779bbef040079 diff --git a/package-lock.json b/package-lock.json index b8b0434..e6b651d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -24,6 +24,7 @@ "eslint-plugin-mocha": "^10.2.0", "magickwand.js": "^1.0.0-beta.6", "mocha": "^10.2.0", + "readable-stream-clone": "^0.0.7", "ts-node": "^10.9.1", "tsconfig-paths": "^4.2.0", "typescript": "^5.3.3" @@ -2101,7 +2102,7 @@ }, "node_modules/nobind17": { "version": "1.1.1", - "resolved": "git+ssh://git@github.com/mmomtchev/nobind.git#d6988c7f9128d711a440ab7e6f59fe6df2caf14a", + "resolved": "git+ssh://git@github.com/mmomtchev/nobind.git#060a501710b85d252db7659ea2fa9c4be65f6022", "license": "ISC", "dependencies": { "node-addon-api": ">=5.0.0" @@ -2357,6 +2358,12 @@ "node": ">= 6" } }, + "node_modules/readable-stream-clone": { + "version": "0.0.7", + "resolved": "https://registry.npmjs.org/readable-stream-clone/-/readable-stream-clone-0.0.7.tgz", + "integrity": "sha512-mdkQtdg5elliOR64xcHVMkghSHQGjpsdCFAs6fSe28wTaVypR0GywmK5ndKpOSQzzuNpSrzC12Rv9UEkDLX73A==", + "dev": true + }, "node_modules/readdirp": { "version": "3.6.0", "dev": true, diff --git a/package.json b/package.json index b0698a6..a16a405 100644 --- a/package.json +++ b/package.json @@ -60,6 +60,7 @@ "eslint-plugin-mocha": "^10.2.0", "magickwand.js": "^1.0.0-beta.6", "mocha": "^10.2.0", + "readable-stream-clone": "^0.0.7", "ts-node": "^10.9.1", "tsconfig-paths": "^4.2.0", "typescript": "^5.3.3" diff --git a/src/binding/avcpp-frame.cc b/src/binding/avcpp-frame.cc index f623488..7bba0f0 100644 --- a/src/binding/avcpp-frame.cc +++ b/src/binding/avcpp-frame.cc @@ -9,7 +9,27 @@ VideoFrame CreateVideoFrame(Nobind::Typemap::Buffer buffer, PixelFormat pixelFor return VideoFrame{buffer.first, buffer.second, pixelFormat, width, height}; } -VideoFrameBuffer CopyFrameToBuffer(av::VideoFrame &frame) { +VideoFrameBuffer CopyFrameToBuffer(VideoFrame &frame) { auto size = frame.bufferSize(); return VideoFrameBuffer{{[&frame, size](uint8_t *data) { frame.copyToBuffer(data, size); }, size}}; } + +VideoFrame *GetVideoFrame(BufferSinkFilterContext &sink, OptionalErrorCode ec) { + VideoFrame *frame = new VideoFrame; + if (!sink.getVideoFrame(*frame, ec)) { + delete frame; + return nullptr; + } + + return frame; +} + +AudioSamples *GetAudioFrame(BufferSinkFilterContext &sink, OptionalErrorCode ec) { + AudioSamples *samples = new AudioSamples; + if (!sink.getAudioFrame(*samples, ec)) { + delete samples; + return nullptr; + } + + return samples; +} diff --git a/src/binding/avcpp-frame.h b/src/binding/avcpp-frame.h index ed4368e..f02c860 100644 --- a/src/binding/avcpp-frame.h +++ b/src/binding/avcpp-frame.h @@ -1,4 +1,5 @@ #pragma once +#include #include #include #include @@ -63,3 +64,9 @@ AudioSamples CreateAudioSamples(Nobind::Typemap::Buffer buffer, SampleFormat sam uint64_t channelLayout, int sampleRate); VideoFrame CreateVideoFrame(Nobind::Typemap::Buffer buffer, PixelFormat pixelFormat, int width, int height); + +// These extension functions are needed to wrap their avcpp counterparts which return data in an argument +// They return pointers to avoid unnecessary copying of the VideoFrame - as JavaScript makes no difference +// In JavaScript all C++ objects are heap-allocated objects referenced by a pointer +VideoFrame *GetVideoFrame(BufferSinkFilterContext &sink, OptionalErrorCode ec); +AudioSamples *GetAudioFrame(BufferSinkFilterContext &sink, OptionalErrorCode ec); diff --git a/src/binding/avcpp-nobind.cc b/src/binding/avcpp-nobind.cc index 460e06b..21eb202 100644 --- a/src/binding/avcpp-nobind.cc +++ b/src/binding/avcpp-nobind.cc @@ -2,15 +2,15 @@ #include #include #include -#include -#include -#include - -// API2 -#include #include +#include +#include +#include +#include #include #include +#include +#include #include @@ -26,6 +26,9 @@ using namespace av; #define REGISTER_CONSTANT(CONST, NAME) \ constexpr static int64_t __const_##CONST{static_cast(CONST)}; \ m.def<&__const_##CONST, Nobind::ReadOnly>(NAME); +#define REGISTER_ENUM(ENUM, ID) \ + constexpr static int64_t __const_##ID{static_cast(ENUM::ID)}; \ + m.def<&__const_##ID, Nobind::ReadOnly>(#ENUM "_" #ID); // An universal toString() wrapper, to be used as a class extension template std::string ToString(T &v) { @@ -296,7 +299,9 @@ NOBIND_MODULE_DATA(ffmpeg, m, ffmpegInstanceData) { .cons() .cons() .def<&ChannelLayout::channels>("channels") - .def<&ChannelLayout::layout>("layout"); + .def<&ChannelLayout::layout>("layout") + .def<&ChannelLayout::isValid>("isValid") + .def(&ChannelLayoutView::describe)>("toString"); m.def("ChannelLayoutView"); @@ -327,6 +332,8 @@ NOBIND_MODULE_DATA(ffmpeg, m, ffmpegInstanceData) { .def<&Packet::timeBase, Nobind::ReturnNested>("timeBase"); m.def("VideoFrame") + .cons() + .def<&VideoFrame::null>("null") // Every global function can also be registered as a static class method .def<&CreateVideoFrame>("create") .def<&VideoFrame::isNull>("isNull") @@ -354,6 +361,7 @@ NOBIND_MODULE_DATA(ffmpeg, m, ffmpegInstanceData) { .ext>(&ToString)>("toString"); m.def("AudioSamples") + .def<&AudioSamples::null>("null") .def<&CreateAudioSamples>("create") .def<&AudioSamples::isNull>("isNull") .def<&AudioSamples::isComplete>("isComplete") @@ -417,6 +425,45 @@ NOBIND_MODULE_DATA(ffmpeg, m, ffmpegInstanceData) { .def(&AudioResampler::pop), Nobind::ReturnAsync>("popAsync"); + m.def("Filter").cons(); + + m.def("FilterGraph") + .cons<>() + .def<&FilterGraph::createFilter>("createFilter") + .def(&FilterGraph::parse)>("parse") + .def<&FilterGraph::config>("config") + .def(&FilterGraph::filter)>( + "filter"); + + m.def("FilterContext"); + + // We only export the safer API that copies frames for now + m.def("BufferSrcFilterContext") + .cons() + .def( + &BufferSrcFilterContext::writeVideoFrame)>("writeVideoFrame") + .def( + &BufferSrcFilterContext::writeAudioSamples)>("writeAudioSamples") + .def( + &BufferSrcFilterContext::writeVideoFrame), + Nobind::ReturnAsync>("writeVideoFrameAsync") + .def( + &BufferSrcFilterContext::writeAudioSamples), + Nobind::ReturnAsync>("writeAudioSamplesAsync") + .def<&BufferSrcFilterContext::checkFilter>("checkFilter"); + + m.def("BufferSinkFilterContext") + .cons() + .ext<&GetVideoFrame, Nobind::ReturnNullAccept>("getVideoFrame") + .ext<&GetAudioFrame, Nobind::ReturnNullAccept>("getAudioFrame") + .def<&BufferSinkFilterContext::setFrameSize>("setFrameSize") + .def<&BufferSinkFilterContext::frameRate>("frameRate") + .def<&BufferSinkFilterContext::checkFilter>("checkFilter"); + + REGISTER_ENUM(FilterMediaType, Unknown); + REGISTER_ENUM(FilterMediaType, Audio); + REGISTER_ENUM(FilterMediaType, Video); + m.Exports().Set("WritableCustomIO", WritableCustomIO::GetClass(m.Env())); m.Exports().Set("ReadableCustomIO", ReadableCustomIO::GetClass(m.Env())); diff --git a/src/binding/avcpp-types.h b/src/binding/avcpp-types.h index e3fb1e1..5b9734a 100644 --- a/src/binding/avcpp-types.h +++ b/src/binding/avcpp-types.h @@ -54,6 +54,7 @@ TYPEMAPS_FOR_ENUM(AVCodecID); TYPEMAPS_FOR_ENUM(AVMediaType); TYPEMAPS_FOR_ENUM(AVPixelFormat); TYPEMAPS_FOR_ENUM(AVSampleFormat); +TYPEMAPS_FOR_ENUM(FilterMediaType); // While this is not an enum, the typemap is still compatible TYPEMAPS_FOR_ENUM(std::bitset<64>); diff --git a/src/binding/constants b/src/binding/constants index cc62d09..b1d2d7a 100644 --- a/src/binding/constants +++ b/src/binding/constants @@ -1009,3 +1009,5 @@ REGISTER_CONSTANT(SWS_SPLINE, "SWS_SPLINE"); REGISTER_CONSTANT(SWS_SRC_V_CHR_DROP_MASK, "SWS_SRC_V_CHR_DROP_MASK"); REGISTER_CONSTANT(SWS_SRC_V_CHR_DROP_SHIFT, "SWS_SRC_V_CHR_DROP_SHIFT"); REGISTER_CONSTANT(SWS_X, "SWS_X"); +REGISTER_CONSTANT(AV_BUFFERSINK_FLAG_NO_REQUEST, "AV_BUFFERSINK_FLAG_NO_REQUEST"); +REGISTER_CONSTANT(AV_BUFFERSINK_FLAG_PEEK, "AV_BUFFERSINK_FLAG_PEEK"); diff --git a/src/binding/gen_constants.sh b/src/binding/gen_constants.sh index dbc2a61..e839d07 100644 --- a/src/binding/gen_constants.sh +++ b/src/binding/gen_constants.sh @@ -20,4 +20,5 @@ sed -nr 's/^.*\s+AVFMT_([_A-Z0-9]+)[, ].*/AVFMT_\1 AV_FMT_\1/p' ${FFMPEG}/src/li sed -nr 's/^.*\s+AV_LOG_([_A-Z0-9]+)[, ].*/AV_LOG_\1 AV_LOG_\1/p' ${FFMPEG}/src/libavutil/log.h | sort | uniq sed -nr 's/^.*\s+SWS_([_A-Z0-9]+)[, ].*/SWS_\1 SWS_\1/p' ${FFMPEG}/src/libswscale/swscale.h | sort | uniq sed -nr 's/^.*\s+SWS_([_A-Z0-9]+)[, ].*/SWS_\1 SWS_\1/p' ${FFMPEG}/src/libswresample/swresample.h | sort | uniq +sed -nr 's/^.*\s+AV_BUFFERSINK_FLAG_([_A-Z0-9]+)[, ].*/AV_BUFFERSINK_FLAG_\1 AV_BUFFERSINK_FLAG_\1/p' ${FFMPEG}/src/libavfilter/buffersink.h | sort | uniq ) | sed -r 's/(.*)\s(.*)/REGISTER_CONSTANT(\1, "\2");/g' diff --git a/src/lib/AudioDecoder.ts b/src/lib/AudioDecoder.ts index 2d1222b..295661b 100644 --- a/src/lib/AudioDecoder.ts +++ b/src/lib/AudioDecoder.ts @@ -75,7 +75,8 @@ export class AudioDecoder extends MediaTransform implements MediaStream { sampleFormat: this.decoder.sampleFormat(), sampleRate: this.decoder.sampleRate(), channelLayout: new ffmpeg.ChannelLayout(this.decoder.channelLayout()), - frameSize: this.decoder.frameSize() + frameSize: this.decoder.frameSize(), + timeBase: this.decoder.timeBase() } as AudioStreamDefinition; } } diff --git a/src/lib/Filter.ts b/src/lib/Filter.ts new file mode 100644 index 0000000..101d378 --- /dev/null +++ b/src/lib/Filter.ts @@ -0,0 +1,276 @@ +import { EventEmitter, Writable, Readable } from 'node:stream'; +import ffmpeg from '@mmomtchev/ffmpeg'; +import { MediaStreamDefinition, isAudioDefinition, isVideoDefinition } from './MediaStream'; + +export const verbose = (process.env.DEBUG_FILTER || process.env.DEBUG_ALL) ? console.debug.bind(console) : () => undefined; + +export interface FilterOptions { + // Filter sources definitions + inputs: Record; + // Filter sinks definitions + outputs: Record; + // Graph string + graph: string; + // A filter must have a single time base + timeBase: any; +} + +/** + * A Transform stream that uses avfilter to transform a number of MediaStream. + * Must receive raw decoded input and sends raw decoded output. + */ +export class Filter extends EventEmitter { + protected filterGraph: any; + protected bufferSrc: Record; + protected bufferSink: Record; + protected timeBase: any; + protected stillStreamingSources: number; + protected destroyed: boolean; + src: Record; + sink: Record; + + constructor(options: FilterOptions) { + super(); + this.filterGraph = new ffmpeg.FilterGraph; + this.timeBase = options.timeBase; + + // construct inputs + let filterDescriptor = ''; + for (const inp of Object.keys(options.inputs)) { + const def = options.inputs[inp]; + if (isVideoDefinition(def)) { + if (!def.pixelFormat || !def.timeBase) + throw new Error('timeBase and pixelFormat are mandatory for filter sources'); + filterDescriptor += `buffer@${inp}=video_size=${def.width}x${def.height}:` + + `pix_fmt=${def.pixelFormat.toString()}:time_base=${def.timeBase.toString()} [${inp}]; `; + } + if (isAudioDefinition(def)) { + filterDescriptor += `abuffer@${inp}=sample_rate=${def.sampleRate}:` + + `channel_layout=${def.channelLayout.toString()}:` + + `sample_fmt=${def.sampleFormat.toString()}:time_base=${def.timeBase.toString()} [${inp}]; `; + } + } + filterDescriptor += options.graph; + for (const outp of Object.keys(options.outputs)) { + const def = options.outputs[outp]; + if (isVideoDefinition(def)) { + filterDescriptor += `[${outp}] buffersink@${outp}; `; + } else if (isAudioDefinition(def)) { + filterDescriptor += `[${outp}] abuffersink@${outp}; `; + } + } + verbose(`Filter: constructed graph ${filterDescriptor}`); + this.filterGraph.parse(filterDescriptor); + this.filterGraph.config(); + + this.stillStreamingSources = 0; + this.destroyed = false; + this.src = {}; + this.bufferSrc = {}; + for (const inp of Object.keys(options.inputs)) { + const def = options.inputs[inp]; + let nullFrame: any; + let id: string; + let type: 'Audio' | 'Video'; + if (isVideoDefinition(def)) { + nullFrame = ffmpeg.VideoFrame.null(); + id = `buffer@${inp}`; + type = 'Video'; + } else if (isAudioDefinition(def)) { + nullFrame = ffmpeg.AudioSamples.null(); + id = `abuffer@${inp}`; + type = 'Audio'; + } else { + throw new Error('Only Video and Audio filtering is supported'); + } + this.bufferSrc[inp] = { + type, + id, + buffer: new ffmpeg.BufferSrcFilterContext(this.filterGraph.filter(id)), + busy: false, + nullFrame + }; + this.src[inp] = new Writable({ + objectMode: true, + write: (chunk: any, encoding: BufferEncoding, callback: (error?: Error | null | undefined) => void) => { + this.write(inp, chunk, callback); + }, + destroy: (error: Error | null, callback: (error: Error | null) => void): void => { + if (error) { + this.stillStreamingSources--; + verbose(`Filter: error on source [${inp}], destroy all streams`, error); + this.destroy(error); + } else { + verbose(`Filter: destroy source [${inp}]`); + callback(null); + } + }, + final: (callback: (error?: Error | null | undefined) => void): void => { + verbose(`Filter: end source [${inp}]`); + this.write(inp, nullFrame, callback); + callback(null); + this.stillStreamingSources--; + if (this.stillStreamingSources === 0) + this.emit('finish'); + } + }); + this.src[inp].on('error', this.destroy.bind(this)); + this.stillStreamingSources++; + Promise.resolve().then(() => { + this.emit('ready'); + }); + } + + this.sink = {}; + this.bufferSink = {}; + for (const outp of Object.keys(options.outputs)) { + const def = options.outputs[outp]; + let id: string; + let type: 'Audio' | 'Video'; + if (isVideoDefinition(def)) { + id = `buffersink@${outp}`; + type = 'Video'; + } else if (isAudioDefinition(def)) { + id = `abuffersink@${outp}`; + type = 'Audio'; + } else { + throw new Error('Only Video and Audio filtering is supported'); + } + this.bufferSink[outp] = { + type, + id, + buffer: new ffmpeg.BufferSinkFilterContext(this.filterGraph.filter(id)), + waitingToRead: 0 + }; + this.sink[outp] = new Readable({ + objectMode: true, + read: (size: number) => { + this.read(outp, size); + } + }); + this.sink[outp].on('error', this.destroy.bind(this)); + } + } + + protected destroy(error: Error) { + if (this.destroyed) return; + this.destroyed = true; + for (const s of Object.keys(this.bufferSrc)) { + this.src[s].destroy(error); + } + for (const s of Object.keys(this.bufferSink)) { + this.sink[s].destroy(error); + } + this.emit('error', error); + } + + protected write(id: string, frame: any, callback: (error?: Error | null | undefined) => void) { + const src = this.bufferSrc[id]; + if (!src) { + return void callback(new Error(`Invalid buffer src [${id}]`)); + } + if (src.busy) { + // This is obviously a major malfunction and should never happen as long + // as the writer respects the stream semantics + return void callback(new Error(`Writing is not reentrant on [${id}]!`)); + } + src.busy = true; + + verbose(`Filter: received data for source [${id}]`); + + try { + let q: Promise; + if (src.type === 'Video') { + if (!(frame instanceof ffmpeg.VideoFrame)) + return void callback(new Error('Filter source video input must be a stream of VideoFrames')); + frame.setPictureType(ffmpeg.AV_PICTURE_TYPE_NONE); + frame.setTimeBase(this.timeBase); + frame.setStreamIndex(0); + q = src.buffer.writeVideoFrameAsync(frame); + } else if (src.type === 'Audio') { + if (!(frame instanceof ffmpeg.AudioSamples)) + return void callback(new Error('Filter source video input must be a stream of AudioSamples')); + frame.setTimeBase(this.timeBase); + frame.setStreamIndex(0); + q = src.buffer.writeAudioSamplesAsync(frame); + } else { + return void callback(new Error('Only Video and Audio filtering is supported')); + } + + q.then(() => { + src.busy = false; + verbose(`Filter: consumed data for source [${id}], pts=${frame.pts().toString()}`); + callback(null); + // Now that we pushed more data, try reading again, refer to 1* below + for (const sink of Object.keys(this.bufferSink)) { + // This is fully synchronous on purpose - otherwise we might run + // into complex synchronization issues where someone else manages + // to call read between the two operations + const size = this.bufferSink[sink].waitingToRead; + if (size) { + verbose(`Filter: wake up sink [${sink}]`); + this.bufferSink[sink].waitingToRead = 0; + this.read(sink, size); + } + } + }) + .catch(callback); + } catch (err) { + callback(err as Error); + } + } + + protected read(id: string, size: number) { + const sink = this.bufferSink[id]; + if (!sink) { + throw new Error(`Invalid buffer sink [${id}]`); + } + verbose(`Filter: received a request for data from sink [${id}]`); + + let getFrame: () => any; + if (sink.type === 'Video') { + getFrame = sink.buffer.getVideoFrame.bind(sink.buffer); + } else if (sink.type === 'Audio') { + getFrame = sink.buffer.getAudioFrame.bind(sink.buffer); + } else { + throw new Error('Only Video and Audio filtering is supported'); + } + // read must always return immediately + // this means that we depend on ffmpeg not doing any filtering work on this call. + // TODO: Check to what extent this is true + let frame; + let frames = 0; + while ((frame = getFrame()) !== null && frames < size) { + verbose(`Filter: sent data from sink [${id}], pts=${frame.pts().toString()}`); + if (sink.type === 'Video') { + frame.setPictureType(ffmpeg.AV_PICTURE_TYPE_NONE); + } + frame.setTimeBase(this.timeBase); + frame.setStreamIndex(0); + this.sink[id].push(frame); + frames++; + } + if (this.stillStreamingSources === 0) { + verbose(`Filter: sending null for EOF on sink [${id}]`); + this.sink[id].push(null); + return; + } + if (frames === 0) { + verbose(`Filter: no data for sink [${id}] will call back later`); + // If nothing was readily available, now it will be up to us + // to call back when something is, see 1* above + sink.waitingToRead = size; + } + } +} diff --git a/src/lib/Stream.ts b/src/lib/Stream.ts index a96d9e2..a40c224 100644 --- a/src/lib/Stream.ts +++ b/src/lib/Stream.ts @@ -7,4 +7,5 @@ export { VideoTransform } from './VideoTransform'; export { AudioDecoder } from './AudioDecoder'; export { AudioEncoder } from './AudioEncoder'; export { AudioTransform } from './AudioTransform'; +export { Filter } from './Filter'; export { Discarder } from './Discarder'; diff --git a/src/lib/VideoDecoder.ts b/src/lib/VideoDecoder.ts index 8764b96..5454691 100644 --- a/src/lib/VideoDecoder.ts +++ b/src/lib/VideoDecoder.ts @@ -77,7 +77,8 @@ export class VideoDecoder extends MediaTransform implements MediaStream { width: this.decoder.width(), height: this.decoder.height(), frameRate: this.stream.frameRate(), - pixelFormat: this.decoder.pixelFormat() + pixelFormat: this.decoder.pixelFormat(), + timeBase: this.decoder.timeBase() } as VideoStreamDefinition; } } diff --git a/src/lib/VideoEncoder.ts b/src/lib/VideoEncoder.ts index 2ef8be9..fb87884 100644 --- a/src/lib/VideoEncoder.ts +++ b/src/lib/VideoEncoder.ts @@ -73,7 +73,9 @@ export class VideoEncoder extends MediaTransform implements MediaStream { frame.setPictureType(ffmpeg.AV_PICTURE_TYPE_NONE); frame.setTimeBase(this.encoder.timeBase()); const packet = await this.encoder.encodeAsync(frame); - verbose(`VideoEncoder: encoded frame: pts=${frame.pts()} / ${frame.pts().seconds()} / ${frame.timeBase()} / ${frame.width()}x${frame.height()}, size=${frame.size()}, ref=${frame.isReferenced()}:${frame.refCount()} / type: ${frame.pictureType()} }`); + verbose(`VideoEncoder: encoded frame: pts=${frame.pts()} / ${frame.pts().seconds()} / ` + + `${frame.timeBase()} / ${frame.width()}x${frame.height()}, size=${frame.size()}, ` + + `ref=${frame.isReferenced()}:${frame.refCount()} / type: ${frame.pictureType()} }`); this.push(packet); this.busy = false; callback(); diff --git a/test/filtering.test.ts b/test/filtering.test.ts new file mode 100644 index 0000000..9a55881 --- /dev/null +++ b/test/filtering.test.ts @@ -0,0 +1,391 @@ +import * as path from 'node:path'; +import * as fs from 'node:fs'; +import ReadableStreamClone from 'readable-stream-clone'; + +import { assert } from 'chai'; + +import ffmpeg from '@mmomtchev/ffmpeg'; +import { Muxer, Demuxer, VideoDecoder, VideoEncoder, AudioDecoder, AudioEncoder, Filter, Discarder } from '@mmomtchev/ffmpeg/stream'; +import { Readable, Writable } from 'node:stream'; +import { MediaTransform, VideoStreamDefinition } from '../lib/MediaStream'; +import { Magick, MagickCore } from 'magickwand.js'; + +ffmpeg.setLogLevel(process.env.DEBUG_FFMPEG ? ffmpeg.AV_LOG_DEBUG : ffmpeg.AV_LOG_ERROR); + +const tempFile = path.resolve(__dirname, 'filter-temp.mkv'); + +describe('filtering', () => { + afterEach('delete temporary', (done) => { + if (!process.env.DEBUG_ALL && !process.env.DEBUG_MUXER) + fs.rm(tempFile, done); + else + done(); + }); + + it('w/ overlay (ffmpeg filter overlay version)', (done) => { + // This uses ffmpeg's filter subsystem to overlay text drawn by ImageMagick + // It reads from a file, overlays and transcodes to a realtime stream. + // This pipeline is fast enough to be usable in real-time even on an older CPU. + // + const demuxer = new Demuxer({ inputFile: path.resolve(__dirname, 'data', 'launch.mp4') }); + + // Use ImageMagick to create an image with the text + const textImage = new Magick.Image('500x20', 'transparent'); + textImage.draw([ + new Magick.DrawableFont('sans-serif', MagickCore.NormalStyle, 100, MagickCore.NormalStretch), + new Magick.DrawablePointSize(24), + new Magick.DrawableStrokeColor('black'), + new Magick.DrawableText(20, 18, 'The insurance is mandatory, the copyright is not') + ]); + // Convert the image to a single ffmpeg video frame + // We can't use YUV420 because it does not support transparency, RGBA8888 does + textImage.magick('rgba'); + textImage.depth(8); + const textBlob = new Magick.Blob; + textImage.write(textBlob); + const textImagePixelFormat = new ffmpeg.PixelFormat(ffmpeg.AV_PIX_FMT_RGBA); + const textFrame = new ffmpeg.VideoFrame.create(Buffer.from(textBlob.data()), textImagePixelFormat, 500, 20); + + demuxer.on('error', done); + demuxer.on('ready', () => { + try { + + const audioInput = new Discarder; + const videoInput = new VideoDecoder(demuxer.video[0]); + + const videoDefinition = videoInput.definition(); + + const videoOutput = new VideoEncoder({ + type: 'Video', + codec: ffmpeg.AV_CODEC_H264, + bitRate: 2.5e6, + width: videoDefinition.width, + height: videoDefinition.height, + frameRate: new ffmpeg.Rational(25, 1), + pixelFormat: videoDefinition.pixelFormat, + // We will try to go as fast as possible + // H.264 encoding in ffmpeg can be very fast + codecOptions: { preset: 'veryfast' } + }); + + // A Filter is an ffmpeg filter chain + const filter = new Filter({ + inputs: { + // Filter with two inputs + 'video_in': videoDefinition, + 'text_in': { + type: 'Video', + width: 500, + height: 20, + pixelFormat: textImagePixelFormat, + timeBase: videoDefinition.timeBase + } as VideoStreamDefinition + }, + outputs: { + // One output + 'out': videoOutput.definition() + }, + graph: + // Overlay 'text_in' over 'video_in' at the specified offset to obtain 'out' + `[video_in][text_in] overlay=x=20:y=${videoDefinition.height - 40} [out]; `, + // A filter must have a single time base + timeBase: videoDefinition.timeBase + }); + // These should be available based on the above configuration + assert.instanceOf(filter.src['video_in'], Writable); + assert.instanceOf(filter.src['text_in'], Writable); + assert.instanceOf(filter.sink['out'], Readable); + + const muxer = new Muxer({ outputFormat: 'matroska', streams: [videoOutput] }); + assert.instanceOf(muxer.output, Readable); + + muxer.on('finish', done); + muxer.on('error', done); + + const output = fs.createWriteStream(tempFile); + + // Demuxer -> Decoder -> Filter source 'video_in' + demuxer.video[0].pipe(videoInput).pipe(filter.src['video_in']); + + // Simply send the single frame to the other source, no need for a stream + // (if you want to change subtitles, you will have to push frames with a time base and a pts) + filter.src['text_in'].write(textFrame); + filter.src['text_in'].end(); + + // Filter sink 'out' -> Encoder -> Muxer + filter.sink['out'].pipe(videoOutput).pipe(muxer.video[0]); + + demuxer.audio[0].pipe(audioInput); + muxer.output!.pipe(output); + } catch (err) { + done(err); + } + }); + }); + + it('w/ video overlay (ffmpeg PiP filter)', (done) => { + // This uses ffmpeg's filter subsystem to overlay a copy of the video + // in a small thumbnail (Picture-in-Picture). + // It also processes the audio. + // It reads from a file, overlays and transcodes to a realtime stream. + // This pipeline is fast enough to be usable in real-time even on an older CPU. + // + const demuxer = new Demuxer({ inputFile: path.resolve(__dirname, 'data', 'launch.mp4') }); + + demuxer.on('error', done); + demuxer.on('ready', () => { + try { + + const audioInput = new AudioDecoder(demuxer.audio[0]); + const videoInput = new VideoDecoder(demuxer.video[0]); + + const audioDefinition = audioInput.definition(); + const videoDefinition = videoInput.definition(); + + const videoOutput = new VideoEncoder({ + type: 'Video', + codec: ffmpeg.AV_CODEC_H264, + bitRate: 2.5e6, + width: videoDefinition.width, + height: videoDefinition.height, + frameRate: new ffmpeg.Rational(25, 1), + pixelFormat: videoDefinition.pixelFormat, + // We will try to go as fast as possible + // H.264 encoding in ffmpeg can be very fast + codecOptions: { preset: 'veryfast' } + }); + + const audioOutput = new AudioEncoder({ + type: 'Audio', + codec: ffmpeg.AV_CODEC_AAC, + bitRate: 128e3, + sampleRate: audioDefinition.sampleRate, + sampleFormat: audioDefinition.sampleFormat, + channelLayout: audioDefinition.channelLayout + }); + + // A Filter is an ffmpeg filter chain + const filter = new Filter({ + inputs: { + // Filter with two identical video inputs (the same video) + 'main_in': videoDefinition, + 'pip_in': videoDefinition, + // and one audio input + 'audio_in': audioDefinition + }, + outputs: { + // Outputs + 'video_out': videoOutput.definition(), + 'audio_out': audioOutput.definition() + }, + graph: + // Take 'pip_in' and rescale it to 1/8th to obtain 'pip_out' + `[pip_in] scale=${videoDefinition.width / 8}x${videoDefinition.height / 8} [pip_out]; ` + + // Overlay 'pip_out' over 'main_in' at the specified offset to obtain 'out' + `[main_in][pip_out] overlay=x=${videoDefinition.width * 13 / 16}:y=${videoDefinition.height / 16} [video_out]; ` + + // Simply copy the audio through the filter + '[audio_in] acopy [audio_out]; ', + // A filter must have a single time base + timeBase: videoDefinition.timeBase + }); + // These should be available based on the above configuration + assert.instanceOf(filter.src['main_in'], Writable); + assert.instanceOf(filter.src['pip_in'], Writable); + assert.instanceOf(filter.sink['video_out'], Readable); + assert.instanceOf(filter.sink['audio_out'], Readable); + + const muxer = new Muxer({ outputFormat: 'matroska', streams: [videoOutput, audioOutput] }); + assert.instanceOf(muxer.output, Readable); + + muxer.on('finish', done); + muxer.on('error', done); + + const output = fs.createWriteStream(tempFile); + + // Create a T junction to copy the raw decoded video + const videoInput1 = new ReadableStreamClone(videoInput, { objectMode: true }); + const videoInput2 = new ReadableStreamClone(videoInput, { objectMode: true }); + + // Demuxer -> Decoder -> T junction + demuxer.video[0].pipe(videoInput); + + // Demuxer -> Decoder -> Filter source 'audio_in' + demuxer.audio[0].pipe(audioInput).pipe(filter.src['audio_in']); + + // T junction -> Filter source 'main_in' + videoInput1.pipe(filter.src['main_in']); + + // T junction -> Filter source 'pip_in' + videoInput2.pipe(filter.src['pip_in']); + + // Filter sinks -> Encoder -> Muxer + filter.sink['video_out'].pipe(videoOutput).pipe(muxer.video[0]); + filter.sink['audio_out'].pipe(audioOutput).pipe(muxer.audio[0]); + + muxer.output!.pipe(output); + } catch (err) { + done(err); + } + }); + }); + + describe('error handling', () => { + it('error when constructing the filter', (done) => { + const demuxer = new Demuxer({ inputFile: path.resolve(__dirname, 'data', 'launch.mp4') }); + const output = fs.createWriteStream(tempFile); + + demuxer.on('error', done); + demuxer.on('ready', () => { + try { + + const audioInput = new AudioDecoder(demuxer.audio[0]); + const videoInput = new VideoDecoder(demuxer.video[0]); + + const audioDefinition = audioInput.definition(); + const videoDefinition = videoInput.definition(); + + const videoOutput = new VideoEncoder({ + type: 'Video', + codec: ffmpeg.AV_CODEC_H264, + bitRate: 2.5e6, + width: videoDefinition.width, + height: videoDefinition.height, + frameRate: new ffmpeg.Rational(25, 1), + pixelFormat: videoDefinition.pixelFormat, + codecOptions: { preset: 'veryfast' } + }); + + const audioOutput = new AudioEncoder({ + type: 'Audio', + codec: ffmpeg.AV_CODEC_AAC, + bitRate: 128e3, + sampleRate: audioDefinition.sampleRate, + sampleFormat: audioDefinition.sampleFormat, + channelLayout: audioDefinition.channelLayout + }); + + const filter = new Filter({ + inputs: { + 'video_in': videoDefinition, + 'audio_in': audioDefinition + }, + outputs: { + 'video_out': videoOutput.definition(), + 'audio_out': audioOutput.definition() + }, + graph: + '[audio_in] acopy [video_out]; ', + timeBase: videoDefinition.timeBase + }); + + const muxer = new Muxer({ outputFormat: 'matroska', streams: [videoOutput, audioOutput] }); + assert.instanceOf(muxer.output, Readable); + + muxer.on('finish', () => done(new Error('Expected an error'))); + muxer.on('error', done); + + demuxer.video[0].pipe(videoInput).pipe(filter.src['video_in']); + demuxer.audio[0].pipe(audioInput).pipe(filter.src['audio_in']); + filter.sink['video_out'].pipe(videoOutput).pipe(muxer.video[0]); + filter.sink['audio_out'].pipe(audioOutput).pipe(muxer.audio[0]); + + muxer.output!.pipe(output); + } catch (err) { + output.close(); + // ffmpeg is not known for its explicative error messages + // The real information is usually found in stderr + assert.match((err as Error).message, /Invalid argument/); + done(); + } + }); + }); + + it('filter error while streaming', (done) => { + const demuxer = new Demuxer({ inputFile: path.resolve(__dirname, 'data', 'launch.mp4') }); + const output = fs.createWriteStream(tempFile); + + demuxer.on('error', done); + demuxer.on('ready', () => { + try { + + const audioInput = new AudioDecoder(demuxer.audio[0]); + const videoInput = new VideoDecoder(demuxer.video[0]); + + const audioDefinition = audioInput.definition(); + const videoDefinition = videoInput.definition(); + + const videoOutput = new VideoEncoder({ + type: 'Video', + codec: ffmpeg.AV_CODEC_H264, + bitRate: 2.5e6, + width: videoDefinition.width, + height: videoDefinition.height, + frameRate: new ffmpeg.Rational(25, 1), + pixelFormat: videoDefinition.pixelFormat, + codecOptions: { preset: 'veryfast' } + }); + + const audioOutput = new AudioEncoder({ + type: 'Audio', + codec: ffmpeg.AV_CODEC_AAC, + bitRate: 128e3, + sampleRate: audioDefinition.sampleRate, + sampleFormat: audioDefinition.sampleFormat, + channelLayout: audioDefinition.channelLayout + }); + + let frames = 0; + const injectError = new MediaTransform({ + transform(chunk, encoding, callback) { + if (frames++ === 100) this.push('invalid'); + else this.push(chunk); + callback(); + } + }); + + const filter = new Filter({ + inputs: { + 'video_in': videoDefinition, + 'audio_in': audioDefinition + }, + outputs: { + 'video_out': videoOutput.definition(), + 'audio_out': audioOutput.definition() + }, + graph: + '[video_in] copy [video_out]; ' + + '[audio_in] acopy [audio_out]; ', + timeBase: videoDefinition.timeBase + }); + + const muxer = new Muxer({ outputFormat: 'matroska', streams: [videoOutput, audioOutput] }); + assert.instanceOf(muxer.output, Readable); + + muxer.on('finish', () => done(new Error('Expected an error'))); + filter.on('error', (error) => { + try { + assert.match((error as Error).message, /Filter source video input must be a stream of VideoFrames/); + // Simple .pipe() does not close the attached Writable streams (.pipeline() does it) + // and a Muxer that is not closed/destroyed never exits + // (this is a caveat of Node.js streams and it is expected) + muxer.video[0].destroy(error); + output.close(); + done(); + } catch (err) { + done(err); + } + }); + + demuxer.video[0].pipe(videoInput).pipe(injectError).pipe(filter.src['video_in']); + demuxer.audio[0].pipe(audioInput).pipe(filter.src['audio_in']); + filter.sink['video_out'].pipe(videoOutput).pipe(muxer.video[0]); + filter.sink['audio_out'].pipe(audioOutput).pipe(muxer.audio[0]); + + muxer.output!.pipe(output); + } catch (err) { + done(err); + } + }); + }); + }); +}); diff --git a/test/overlay.test.ts b/test/overlay.test.ts deleted file mode 100644 index 5f1b95a..0000000 --- a/test/overlay.test.ts +++ /dev/null @@ -1,148 +0,0 @@ -import * as path from 'node:path'; -import * as fs from 'node:fs'; - -import { assert } from 'chai'; - -import ffmpeg from '@mmomtchev/ffmpeg'; -import { Muxer, Demuxer, VideoDecoder, VideoEncoder, AudioDecoder, AudioEncoder, VideoTransform } from '@mmomtchev/ffmpeg/stream'; -import { Readable } from 'node:stream'; -import { MediaTransform, VideoStreamDefinition } from '../lib/MediaStream'; -import { Magick, MagickCore } from 'magickwand.js'; - -ffmpeg.setLogLevel(process.env.DEBUG_FFMPEG ? ffmpeg.AV_LOG_DEBUG : ffmpeg.AV_LOG_ERROR); - -const tempFile = path.resolve(__dirname, 'overlay-temp.mkv'); - -describe('streaming', () => { - afterEach('delete temporary', (done) => { - if (!process.env.DEBUG_ALL && !process.env.DEBUG_MUXER) - fs.rm(tempFile, done); - else - done(); - }); - - it('w/ dynamic overlay (generic ImageMagick overlay version)', (done) => { - // Overlaying using ImageMagick is very versatile and allows for maximum quality, - // however it is far too slow to be done in realtime - const demuxer = new Demuxer({ inputFile: path.resolve(__dirname, 'data', 'launch.mp4') }); - - demuxer.on('error', done); - demuxer.on('ready', () => { - try { - const audioInput = new AudioDecoder(demuxer.audio[0]); - const videoInput = new VideoDecoder(demuxer.video[0]); - - const videoDefinition = videoInput.definition(); - const audioDefinition = audioInput.definition(); - - // Alas, even if ImageMagick supports YUV420 raw pixel encoding, it does so - // in a particularly inefficient manner - it performs a very high-quality - // transformation both when reading and when writing the image, preserving - // all available information while supporting arbitrary color depths - // - // On the other side, ffmpeg does this transformation using a low-level - // hand-optimized SSE assembly loop and it is more than 10 times faster - // Thus, we will add additional pipeline elements to convert the incoming - // frames to RGBA8888 and then back to YUV420 - // - // See below for an example that is much faster but involves manually - // overlaying YUV420 pixels over the video frame - // - // This is the intermediate format used - const videoRGB = { - type: 'Video', - codec: videoDefinition.codec, - bitRate: videoDefinition.bitRate, - width: videoDefinition.width, - height: videoDefinition.height, - frameRate: new ffmpeg.Rational(25, 1), - pixelFormat: new ffmpeg.PixelFormat(ffmpeg.AV_PIX_FMT_RGBA) - } as VideoStreamDefinition; - // A transformer that uses ffmpeg to convert from YUV420 to RGBA8888 - const toRGB = new VideoTransform({ - input: videoDefinition, - output: videoRGB, - interpolation: ffmpeg.SWS_BILINEAR - }); - // A transformer that uses ffmpeg to convert from RGBA8888 to YUV420 - const fromRGB = new VideoTransform({ - output: videoDefinition, - input: videoRGB, - interpolation: ffmpeg.SWS_BILINEAR - }); - - const videoOutput = new VideoEncoder({ - type: 'Video', - codec: ffmpeg.AV_CODEC_H264, - bitRate: 2.5e6, - width: videoDefinition.width, - height: videoDefinition.height, - frameRate: new ffmpeg.Rational(25, 1), - pixelFormat: videoDefinition.pixelFormat - }); - - const audioOutput = new AudioEncoder({ - type: 'Audio', - codec: ffmpeg.AV_CODEC_AAC, - bitRate: 128e3, - sampleRate: audioDefinition.sampleRate, - sampleFormat: audioDefinition.sampleFormat, - channelLayout: audioDefinition.channelLayout - }); - - // We will be overlaying a subtitle over the image - // Drawing text is a very expensive operation, so we won't be drawing on each frame - // We will draw it once and then overlay it on each frame - const textImage = new Magick.Image('500x20', 'transparent'); - textImage.draw([ - new Magick.DrawableFont('sans-serif', MagickCore.NormalStyle, 100, MagickCore.NormalStretch), - new Magick.DrawablePointSize(24), - new Magick.DrawableStrokeColor('black'), - new Magick.DrawableText(20, 18, 'The insurance is mandatory, the copyright is not') - ]); - - const frame = new Magick.Image; - // A MediaTransform is a generic user-definable object-mode stream transform - const overlay = new MediaTransform({ - transform(chunk, encoding, callback) { - assert.instanceOf(chunk, ffmpeg.VideoFrame); - // Create a Magick.Blob from the ffmpeg.VideoFrame - const blob = new Magick.Blob(chunk.data().buffer); - // Import this binary blob into an Image object, specify the RGBA 8:8:8:8 raw pixel format - frame.readAsync(blob, `${videoDefinition.width}x${videoDefinition.height}`, 8, 'rgba') - // Overlay the subtitle on this image - .then(() => frame.compositeAsync(textImage, `+0+${videoDefinition.height - 40}`, MagickCore.MultiplyCompositeOp)) - // Export the image back to a binary buffer - .then(() => frame.writeAsync(blob)) - // Extract an ArrayBuffer from the blob - .then(() => blob.dataAsync()) - .then((ab) => { - // Wrap it first in a Node.js Buffer, then in a ffmpeg.VideoFrame - const output = ffmpeg.VideoFrame.create( - Buffer.from(ab), videoRGB.pixelFormat, videoDefinition.width, videoDefinition.height); - // Each frame must carry a timestamp, we copy it from the original - output.setPts(chunk.pts()); - this.push(output); - callback(); - }) - .catch(callback); - } - }); - - const muxer = new Muxer({ outputFormat: 'matroska', streams: [videoOutput, audioOutput] }); - - muxer.on('finish', done); - muxer.on('error', done); - - assert.instanceOf(muxer.output, Readable); - const output = fs.createWriteStream(tempFile); - - demuxer.video[0].pipe(videoInput).pipe(toRGB).pipe(overlay).pipe(fromRGB).pipe(videoOutput).pipe(muxer.video[0]); - demuxer.audio[0].pipe(audioInput).pipe(audioOutput).pipe(muxer.audio[0]); - muxer.output!.pipe(output); - } catch (err) { - done(err); - } - }); - }); -}); diff --git a/test/streaming.test.ts b/test/streaming.test.ts index 22d7457..688ae4c 100644 --- a/test/streaming.test.ts +++ b/test/streaming.test.ts @@ -4,9 +4,10 @@ import * as fs from 'node:fs'; import { assert } from 'chai'; import ffmpeg from '@mmomtchev/ffmpeg'; -import { Muxer, Demuxer, VideoDecoder, VideoEncoder, AudioDecoder, AudioEncoder, AudioTransform } from '@mmomtchev/ffmpeg/stream'; +import { Muxer, Demuxer, VideoDecoder, VideoEncoder, AudioDecoder, AudioEncoder, AudioTransform, VideoTransform } from '@mmomtchev/ffmpeg/stream'; import { Readable, Transform, TransformCallback } from 'node:stream'; -import { MediaTransform } from '../lib/MediaStream'; +import { MediaTransform, VideoStreamDefinition } from '../lib/MediaStream'; +import { Magick, MagickCore } from 'magickwand.js'; ffmpeg.setLogLevel(process.env.DEBUG_FFMPEG ? ffmpeg.AV_LOG_DEBUG : ffmpeg.AV_LOG_ERROR); @@ -330,4 +331,128 @@ describe('streaming', () => { }); }); + it('w/ overlay (ImageMagick overlay version)', (done) => { + // Overlaying using ImageMagick is very versatile and allows for maximum quality, + // however it is far too slow to be done in realtime + const demuxer = new Demuxer({ inputFile: path.resolve(__dirname, 'data', 'launch.mp4') }); + + demuxer.on('error', done); + demuxer.on('ready', () => { + try { + const audioInput = new AudioDecoder(demuxer.audio[0]); + const videoInput = new VideoDecoder(demuxer.video[0]); + + const videoDefinition = videoInput.definition(); + const audioDefinition = audioInput.definition(); + + // Alas, even if ImageMagick supports YUV420 raw pixel encoding, it does so + // in a particularly inefficient manner - it performs a very high-quality + // transformation both when reading and when writing the image, preserving + // all available information while supporting arbitrary color depths + // + // On the other side, ffmpeg does this transformation using a low-level + // hand-optimized SSE assembly loop and it is more than 10 times faster + // Thus, we will add additional pipeline elements to convert the incoming + // frames to RGBA8888 and then back to YUV420 + // + // See the example in filtering for a much faster example that uses + // ffmpeg's filtering system to overlay the image + // + // This is the intermediate format used + const videoRGB = { + type: 'Video', + codec: videoDefinition.codec, + bitRate: videoDefinition.bitRate, + width: videoDefinition.width, + height: videoDefinition.height, + frameRate: new ffmpeg.Rational(25, 1), + pixelFormat: new ffmpeg.PixelFormat(ffmpeg.AV_PIX_FMT_RGBA) + } as VideoStreamDefinition; + // A transformer that uses ffmpeg to convert from YUV420 to RGBA8888 + const toRGB = new VideoTransform({ + input: videoDefinition, + output: videoRGB, + interpolation: ffmpeg.SWS_BILINEAR + }); + // A transformer that uses ffmpeg to convert from RGBA8888 to YUV420 + const fromRGB = new VideoTransform({ + output: videoDefinition, + input: videoRGB, + interpolation: ffmpeg.SWS_BILINEAR + }); + + const videoOutput = new VideoEncoder({ + type: 'Video', + codec: ffmpeg.AV_CODEC_H264, + bitRate: 2.5e6, + width: videoDefinition.width, + height: videoDefinition.height, + frameRate: new ffmpeg.Rational(25, 1), + pixelFormat: videoDefinition.pixelFormat + }); + + const audioOutput = new AudioEncoder({ + type: 'Audio', + codec: ffmpeg.AV_CODEC_AAC, + bitRate: 128e3, + sampleRate: audioDefinition.sampleRate, + sampleFormat: audioDefinition.sampleFormat, + channelLayout: audioDefinition.channelLayout + }); + + // We will be overlaying a subtitle over the image + // Drawing text is a very expensive operation, so we won't be drawing on each frame + // We will draw it once and then overlay it on each frame + const textImage = new Magick.Image('500x20', 'transparent'); + textImage.draw([ + new Magick.DrawableFont('sans-serif', MagickCore.NormalStyle, 100, MagickCore.NormalStretch), + new Magick.DrawablePointSize(24), + new Magick.DrawableStrokeColor('black'), + new Magick.DrawableText(20, 18, 'The insurance is mandatory, the copyright is not') + ]); + + const frame = new Magick.Image; + // A MediaTransform is a generic user-definable object-mode stream transform + const overlay = new MediaTransform({ + transform(chunk, encoding, callback) { + assert.instanceOf(chunk, ffmpeg.VideoFrame); + // Create a Magick.Blob from the ffmpeg.VideoFrame + const blob = new Magick.Blob(chunk.data().buffer); + // Import this binary blob into an Image object, specify the RGBA 8:8:8:8 raw pixel format + frame.readAsync(blob, `${videoDefinition.width}x${videoDefinition.height}`, 8, 'rgba') + // Overlay the subtitle on this image + .then(() => frame.compositeAsync(textImage, `+0+${videoDefinition.height - 40}`, MagickCore.MultiplyCompositeOp)) + // Export the image back to a binary buffer + .then(() => frame.writeAsync(blob)) + // Extract an ArrayBuffer from the blob + .then(() => blob.dataAsync()) + .then((ab) => { + // Wrap it first in a Node.js Buffer, then in a ffmpeg.VideoFrame + const output = ffmpeg.VideoFrame.create( + Buffer.from(ab), videoRGB.pixelFormat, videoDefinition.width, videoDefinition.height); + // Each frame must carry a timestamp, we copy it from the original + output.setPts(chunk.pts()); + this.push(output); + callback(); + }) + .catch(callback); + } + }); + + const muxer = new Muxer({ outputFormat: 'matroska', streams: [videoOutput, audioOutput] }); + + muxer.on('finish', done); + muxer.on('error', done); + + assert.instanceOf(muxer.output, Readable); + const output = fs.createWriteStream(tempFile); + + demuxer.video[0].pipe(videoInput).pipe(toRGB).pipe(overlay).pipe(fromRGB).pipe(videoOutput).pipe(muxer.video[0]); + demuxer.audio[0].pipe(audioInput).pipe(audioOutput).pipe(muxer.audio[0]); + muxer.output!.pipe(output); + } catch (err) { + done(err); + } + }); + }); });