Skip to content

Commit

Permalink
support audio filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
mmomtchev committed Jan 1, 2024
1 parent 840ee54 commit 1f5d43e
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 71 deletions.
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@
"clang-format.fallbackStyle": "LLVM",
"[cpp]": {
"editor.defaultFormatter": "xaver.clang-format"
}
},
"editor.tabSize": 2
}
2 changes: 1 addition & 1 deletion deps/avcpp
20 changes: 20 additions & 0 deletions src/binding/avcpp-frame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,23 @@ VideoFrameBuffer CopyFrameToBuffer(VideoFrame &frame) {
auto size = frame.bufferSize();
return VideoFrameBuffer{{[&frame, size](uint8_t *data) { frame.copyToBuffer(data, size); }, size}};
}

VideoFrame *GetVideoFrame(BufferSinkFilterContext &sink, OptionalErrorCode ec) {
VideoFrame *frame = new VideoFrame;
if (!sink.getVideoFrame(*frame, ec)) {
delete frame;
return nullptr;
}

return frame;
}

AudioSamples *GetAudioFrame(BufferSinkFilterContext &sink, OptionalErrorCode ec) {
AudioSamples *samples = new AudioSamples;
if (!sink.getAudioFrame(*samples, ec)) {
delete samples;
return nullptr;
}

return samples;
}
6 changes: 6 additions & 0 deletions src/binding/avcpp-frame.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,9 @@ AudioSamples CreateAudioSamples(Nobind::Typemap::Buffer buffer, SampleFormat sam
uint64_t channelLayout, int sampleRate);

VideoFrame CreateVideoFrame(Nobind::Typemap::Buffer buffer, PixelFormat pixelFormat, int width, int height);

// These extension functions are needed to wrap their avcpp counterparts which return data in an argument
// They return pointers to avoid unnecessary copying of the VideoFrame - as JavaScript makes no difference
// In JavaScript all C++ objects are heap-allocated objects referenced by a pointer
VideoFrame *GetVideoFrame(BufferSinkFilterContext &sink, OptionalErrorCode ec);
AudioSamples *GetAudioFrame(BufferSinkFilterContext &sink, OptionalErrorCode ec);
29 changes: 6 additions & 23 deletions src/binding/avcpp-nobind.cc
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,9 @@ NOBIND_MODULE_DATA(ffmpeg, m, ffmpegInstanceData) {
.cons<const char *>()
.cons<const ChannelLayoutView &>()
.def<&ChannelLayout::channels>("channels")
.def<&ChannelLayout::layout>("layout");
.def<&ChannelLayout::layout>("layout")
.def<&ChannelLayout::isValid>("isValid")
.def<static_cast<std::string (ChannelLayoutView::*)() const>(&ChannelLayoutView::describe)>("toString");

m.def<ChannelLayoutView>("ChannelLayoutView");

Expand Down Expand Up @@ -359,6 +361,7 @@ NOBIND_MODULE_DATA(ffmpeg, m, ffmpegInstanceData) {
.ext<static_cast<ToString_t<VideoFrame>>(&ToString<VideoFrame>)>("toString");

m.def<AudioSamples>("AudioSamples")
.def<&AudioSamples::null>("null")
.def<&CreateAudioSamples>("create")
.def<&AudioSamples::isNull>("isNull")
.def<&AudioSamples::isComplete>("isComplete")
Expand Down Expand Up @@ -451,28 +454,8 @@ NOBIND_MODULE_DATA(ffmpeg, m, ffmpegInstanceData) {

m.def<BufferSinkFilterContext>("BufferSinkFilterContext")
.cons<FilterContext &>()
// getVideoFrame & cie do not have good native-feel interface, but this avoids
// copying video frames
.def<static_cast<bool (BufferSinkFilterContext::*)(VideoFrame &, OptionalErrorCode)>(
&BufferSinkFilterContext::getVideoFrame)>("getVideoFrame")
.def<static_cast<bool (BufferSinkFilterContext::*)(VideoFrame &, int, OptionalErrorCode)>(
&BufferSinkFilterContext::getVideoFrame)>("getVideoFrameFlags")
.def<static_cast<bool (BufferSinkFilterContext::*)(AudioSamples &, OptionalErrorCode)>(
&BufferSinkFilterContext::getAudioFrame)>("getAudioFrame")
.def<static_cast<bool (BufferSinkFilterContext::*)(AudioSamples &, int, OptionalErrorCode)>(
&BufferSinkFilterContext::getAudioFrame)>("getAudioFrameFlags")
.def<static_cast<bool (BufferSinkFilterContext::*)(VideoFrame &, OptionalErrorCode)>(
&BufferSinkFilterContext::getVideoFrame),
Nobind::ReturnAsync>("getVideoFrameAsync")
.def<static_cast<bool (BufferSinkFilterContext::*)(VideoFrame &, int, OptionalErrorCode)>(
&BufferSinkFilterContext::getVideoFrame),
Nobind::ReturnAsync>("getVideoFrameFlagsAsync")
.def<static_cast<bool (BufferSinkFilterContext::*)(AudioSamples &, OptionalErrorCode)>(
&BufferSinkFilterContext::getAudioFrame),
Nobind::ReturnAsync>("getAudioFrameAsync")
.def<static_cast<bool (BufferSinkFilterContext::*)(AudioSamples &, int, OptionalErrorCode)>(
&BufferSinkFilterContext::getAudioFrame),
Nobind::ReturnAsync>("getAudioFrameFlagsAsync")
.ext<&GetVideoFrame, Nobind::ReturnNullAccept>("getVideoFrame")
.ext<&GetAudioFrame, Nobind::ReturnNullAccept>("getAudioFrame")
.def<&BufferSinkFilterContext::setFrameSize>("setFrameSize")
.def<&BufferSinkFilterContext::frameRate>("frameRate")
.def<&BufferSinkFilterContext::checkFilter>("checkFilter");
Expand Down
130 changes: 96 additions & 34 deletions src/lib/Filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,17 @@ export interface FilterOptions {
export class Filter extends EventEmitter {
protected filterGraph: any;
protected bufferSrc: Record<string, {
type: 'Audio' | 'Video';
buffer: any;
busy: boolean;
nullFrame: any;
id: string;
}>;
protected bufferSink: Record<string, {
type: 'Audio' | 'Video';
buffer: any;
waitingToRead: number;
id: string;
}>;
protected timeBase: any;
protected stillStreamingSources: number;
Expand All @@ -51,14 +56,18 @@ export class Filter extends EventEmitter {
`pix_fmt=${def.pixelFormat.toString()}:time_base=${def.timeBase.toString()} [${inp}]; `;
}
if (isAudioDefinition(def)) {
throw new Error('later');
filterDescriptor += `abuffer@${inp}=sample_rate=${def.sampleRate}:` +
`channel_layout=${def.channelLayout.toString()}:` +
`sample_fmt=${def.sampleFormat.toString()}:time_base=${def.timeBase.toString()} [${inp}]; `;
}
}
filterDescriptor += options.graph;
for (const outp of Object.keys(options.outputs)) {
const def = options.outputs[outp];
if (isVideoDefinition(def)) {
filterDescriptor += `[${outp}] buffersink@${outp}`;
filterDescriptor += `[${outp}] buffersink@${outp}; `;
} else if (isAudioDefinition(def)) {
filterDescriptor += `[${outp}] abuffersink@${outp}; `;
}
}
verbose(`Filter: constructed graph ${filterDescriptor}`);
Expand All @@ -70,9 +79,27 @@ export class Filter extends EventEmitter {
this.src = {};
this.bufferSrc = {};
for (const inp of Object.keys(options.inputs)) {
const def = options.inputs[inp];
let nullFrame: any;
let id: string;
let type: 'Audio' | 'Video';
if (isVideoDefinition(def)) {
nullFrame = ffmpeg.VideoFrame.null();
id = `buffer@${inp}`;
type = 'Video';
} else if (isAudioDefinition(def)) {
nullFrame = ffmpeg.AudioSamples.null();
id = `abuffer@${inp}`;
type = 'Audio';
} else {
throw new Error('Only Video and Audio filtering is supported');
}
this.bufferSrc[inp] = {
buffer: new ffmpeg.BufferSrcFilterContext(this.filterGraph.filter(`buffer@${inp}`)),
busy: false
type,
id,
buffer: new ffmpeg.BufferSrcFilterContext(this.filterGraph.filter(id)),
busy: false,
nullFrame
};
this.src[inp] = new Writable({
objectMode: true,
Expand All @@ -91,8 +118,7 @@ export class Filter extends EventEmitter {
},
final: (callback: (error?: Error | null | undefined) => void): void => {
verbose(`Filter: end source [${inp}]`);
// VideoFrame.null() is a special EOF frame
this.write(inp, ffmpeg.VideoFrame.null(), callback);
this.write(inp, nullFrame, callback);
callback(null);
this.stillStreamingSources--;
if (this.stillStreamingSources === 0)
Expand All @@ -108,8 +134,22 @@ export class Filter extends EventEmitter {
this.sink = {};
this.bufferSink = {};
for (const outp of Object.keys(options.outputs)) {
const def = options.outputs[outp];
let id: string;
let type: 'Audio' | 'Video';
if (isVideoDefinition(def)) {
id = `buffersink@${outp}`;
type = 'Video';
} else if (isAudioDefinition(def)) {
id = `abuffersink@${outp}`;
type = 'Audio';
} else {
throw new Error('Only Video and Audio filtering is supported');
}
this.bufferSink[outp] = {
buffer: new ffmpeg.BufferSinkFilterContext(this.filterGraph.filter(`buffersink@${outp}`)),
type,
id,
buffer: new ffmpeg.BufferSinkFilterContext(this.filterGraph.filter(id)),
waitingToRead: 0
};
this.sink[outp] = new Readable({
Expand Down Expand Up @@ -145,29 +185,41 @@ export class Filter extends EventEmitter {
}
src.busy = true;

frame.setPictureType(ffmpeg.AV_PICTURE_TYPE_NONE);
verbose(`Filter: received data for source [${id}]`);
frame.setTimeBase(this.timeBase);
frame.setStreamIndex(0);

verbose(`Filter: received data for source [${id}]`);
src.buffer.writeVideoFrameAsync(frame)
.then(() => {
src.busy = false;
verbose(`Filter: consumed data for source [${id}], pts=${frame.pts().toString()}`);
callback(null);
// Now that we pushed more data, try reading again, refer to 1* below
for (const sink of Object.keys(this.bufferSink)) {
// This is fully synchronous on purpose - otherwise we might run
// into complex synchronization issues where someone else manages
// to call read between the two operations
const size = this.bufferSink[sink].waitingToRead;
if (size) {
verbose(`Filter: wake up sink [${sink}]`);
this.bufferSink[sink].waitingToRead = 0;
this.read(sink, size);
}
let q: Promise<void>;
if (src.type === 'Video') {
if (!(frame instanceof ffmpeg.VideoFrame))
throw new Error('Filter source video input must be a stream of VideoFrames');
frame.setPictureType(ffmpeg.AV_PICTURE_TYPE_NONE);
q = src.buffer.writeVideoFrameAsync(frame);
} else if (src.type === 'Audio') {
if (!(frame instanceof ffmpeg.AudioSamples))
throw new Error('Filter source video input must be a stream of AudioSamples');
q = src.buffer.writeAudioSamplesAsync(frame);
} else {
throw new Error('Only Video and Audio filtering is supported');
}

q.then(() => {
src.busy = false;
verbose(`Filter: consumed data for source [${id}], pts=${frame.pts().toString()}`);
callback(null);
// Now that we pushed more data, try reading again, refer to 1* below
for (const sink of Object.keys(this.bufferSink)) {
// This is fully synchronous on purpose - otherwise we might run
// into complex synchronization issues where someone else manages
// to call read between the two operations
const size = this.bufferSink[sink].waitingToRead;
if (size) {
verbose(`Filter: wake up sink [${sink}]`);
this.bufferSink[sink].waitingToRead = 0;
this.read(sink, size);
}
})
}
})
.catch(callback);
}

Expand All @@ -178,17 +230,27 @@ export class Filter extends EventEmitter {
}
verbose(`Filter: received a request for data from sink [${id}]`);

let getFrame: () => any;
if (sink.type === 'Video') {
getFrame = sink.buffer.getVideoFrame.bind(sink.buffer);
} else if (sink.type === 'Audio') {
getFrame = sink.buffer.getAudioFrame.bind(sink.buffer);
} else {
throw new Error('Only Video and Audio filtering is supported');
}
// read must always return immediately
// this means that we depend on ffmpeg not doing any filtering work on this call.
// This is the meaning of the special flag.
const videoFrame = new ffmpeg.VideoFrame;
// TODO: Check to what extent this is true
let frame;
let frames = 0;
while (sink.buffer.getVideoFrame(videoFrame) && frames < size) {
verbose(`Filter: sent data from sink [${id}], pts=${videoFrame.pts().toString()}`);
videoFrame.setPictureType(ffmpeg.AV_PICTURE_TYPE_NONE);
videoFrame.setTimeBase(this.timeBase);
videoFrame.setStreamIndex(0);
this.sink[id].push(videoFrame);
while ((frame = getFrame()) !== null && frames < size) {
verbose(`Filter: sent data from sink [${id}], pts=${frame.pts().toString()}`);
if (sink.type === 'Video') {
frame.setPictureType(ffmpeg.AV_PICTURE_TYPE_NONE);
}
frame.setTimeBase(this.timeBase);
frame.setStreamIndex(0);
this.sink[id].push(frame);
frames++;
}
if (this.stillStreamingSources === 0) {
Expand Down
44 changes: 32 additions & 12 deletions test/overlay.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,10 @@ describe('streaming', () => {
});
});

it('w/ ffmpeg filtering (PiP example)', (done) => {
it('w/ video overlay (ffmpeg PiP filter)', (done) => {
// This uses ffmpeg's filter subsystem to overlay a copy of the video
// in a small thumbnail (Picture-in-Picture).
// It also processes the audio.
// It reads from a file, overlays and transcodes to a realtime stream.
// This pipeline is fast enough to be usable in real-time even on an older CPU.
//
Expand All @@ -260,9 +261,10 @@ describe('streaming', () => {
demuxer.on('ready', () => {
try {

const audioInput = new Discarder;
const audioInput = new AudioDecoder(demuxer.audio[0]);
const videoInput = new VideoDecoder(demuxer.video[0]);

const audioDefinition = audioInput.definition();
const videoDefinition = videoInput.definition();

const videoOutput = new VideoEncoder({
Expand All @@ -278,31 +280,46 @@ describe('streaming', () => {
codecOptions: { preset: 'veryfast' }
});

const audioOutput = new AudioEncoder({
type: 'Audio',
codec: ffmpeg.AV_CODEC_AAC,
bitRate: 128e3,
sampleRate: audioDefinition.sampleRate,
sampleFormat: audioDefinition.sampleFormat,
channelLayout: audioDefinition.channelLayout
});

// A Filter is an ffmpeg filter chain
const filter = new Filter({
inputs: {
// Filter with two identical inputs (the same video)
// Filter with two identical video inputs (the same video)
'main_in': videoDefinition,
'pip_in': videoDefinition
'pip_in': videoDefinition,
// and one audio input
'audio_in': audioDefinition
},
outputs: {
// One output
'out': videoOutput.definition()
// Outputs
'video_out': videoOutput.definition(),
'audio_out': audioOutput.definition()
},
graph:
// Take 'pip_in' and rescale it to 1/8th to obtain 'pip_out'
`[pip_in] scale=${videoDefinition.width / 8}x${videoDefinition.height / 8} [pip_out]; ` +
// Overlay 'pip_out' over 'main_in' at the specified offset to obtain 'out'
`[main_in][pip_out] overlay=x=${videoDefinition.width * 13 / 16}:y=${videoDefinition.height / 16} [out]; `,
`[main_in][pip_out] overlay=x=${videoDefinition.width * 13 / 16}:y=${videoDefinition.height / 16} [video_out]; ` +
// Simply copy the audio through the filter
'[audio_in] acopy [audio_out]; ',
// A filter must have a single time base
timeBase: videoDefinition.timeBase
});
// These should be available based on the above configuration
assert.instanceOf(filter.src['main_in'], Writable);
assert.instanceOf(filter.src['pip_in'], Writable);
assert.instanceOf(filter.sink['out'], Readable);
assert.instanceOf(filter.sink['video_out'], Readable);
assert.instanceOf(filter.sink['audio_out'], Readable);

const muxer = new Muxer({ outputFormat: 'matroska', streams: [videoOutput] });
const muxer = new Muxer({ outputFormat: 'matroska', streams: [videoOutput, audioOutput] });
assert.instanceOf(muxer.output, Readable);

muxer.on('finish', done);
Expand All @@ -317,16 +334,19 @@ describe('streaming', () => {
// Demuxer -> Decoder -> T junction
demuxer.video[0].pipe(videoInput);

// Demuxer -> Decoder -> Filter source 'audio_in'
demuxer.audio[0].pipe(audioInput).pipe(filter.src['audio_in']);

// T junction -> Filter source 'main_in'
videoInput1.pipe(filter.src['main_in']);

// T junction -> Filter source 'pip_in'
videoInput2.pipe(filter.src['pip_in']);

// Filter sink 'out' -> Encoder -> Muxer
filter.sink['out'].pipe(videoOutput).pipe(muxer.video[0]);
// Filter sinks -> Encoder -> Muxer
filter.sink['video_out'].pipe(videoOutput).pipe(muxer.video[0]);
filter.sink['audio_out'].pipe(audioOutput).pipe(muxer.audio[0]);

demuxer.audio[0].pipe(audioInput);
muxer.output!.pipe(output);
} catch (err) {
done(err);
Expand Down

0 comments on commit 1f5d43e

Please sign in to comment.