Skip to content

Commit

Permalink
test the error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
mmomtchev committed Jan 1, 2024
1 parent 1f5d43e commit 7950488
Show file tree
Hide file tree
Showing 3 changed files with 330 additions and 170 deletions.
84 changes: 46 additions & 38 deletions src/lib/Filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ export class Filter extends EventEmitter {
this.emit('finish');
}
});
this.src[inp].on('error', this.destroy.bind(this));
this.stillStreamingSources++;
Promise.resolve().then(() => {
this.emit('ready');
Expand Down Expand Up @@ -158,22 +159,23 @@ export class Filter extends EventEmitter {
this.read(outp, size);
}
});
this.sink[outp].on('error', this.destroy.bind(this));
}
}

destroy(error: Error) {
protected destroy(error: Error) {
if (this.destroyed) return;
this.destroyed = true;
for (const s of Object.keys(this.bufferSrc)) {
this.sink[s].destroy(error);
this.src[s].destroy(error);
}
for (const s of Object.keys(this.bufferSink)) {
this.src[s].destroy(error);
this.sink[s].destroy(error);
}
this.emit('error');
this.emit('error', error);
}

write(id: string, frame: any, callback: (error?: Error | null | undefined) => void) {
protected write(id: string, frame: any, callback: (error?: Error | null | undefined) => void) {
const src = this.bufferSrc[id];
if (!src) {
return void callback(new Error(`Invalid buffer src [${id}]`));
Expand All @@ -186,44 +188,50 @@ export class Filter extends EventEmitter {
src.busy = true;

verbose(`Filter: received data for source [${id}]`);
frame.setTimeBase(this.timeBase);
frame.setStreamIndex(0);

let q: Promise<void>;
if (src.type === 'Video') {
if (!(frame instanceof ffmpeg.VideoFrame))
throw new Error('Filter source video input must be a stream of VideoFrames');
frame.setPictureType(ffmpeg.AV_PICTURE_TYPE_NONE);
q = src.buffer.writeVideoFrameAsync(frame);
} else if (src.type === 'Audio') {
if (!(frame instanceof ffmpeg.AudioSamples))
throw new Error('Filter source video input must be a stream of AudioSamples');
q = src.buffer.writeAudioSamplesAsync(frame);
} else {
throw new Error('Only Video and Audio filtering is supported');
}
try {
let q: Promise<void>;
if (src.type === 'Video') {
if (!(frame instanceof ffmpeg.VideoFrame))
return void callback(new Error('Filter source video input must be a stream of VideoFrames'));
frame.setPictureType(ffmpeg.AV_PICTURE_TYPE_NONE);
frame.setTimeBase(this.timeBase);
frame.setStreamIndex(0);
q = src.buffer.writeVideoFrameAsync(frame);
} else if (src.type === 'Audio') {
if (!(frame instanceof ffmpeg.AudioSamples))
return void callback(new Error('Filter source video input must be a stream of AudioSamples'));
frame.setTimeBase(this.timeBase);
frame.setStreamIndex(0);
q = src.buffer.writeAudioSamplesAsync(frame);
} else {
return void callback(new Error('Only Video and Audio filtering is supported'));
}

q.then(() => {
src.busy = false;
verbose(`Filter: consumed data for source [${id}], pts=${frame.pts().toString()}`);
callback(null);
// Now that we pushed more data, try reading again, refer to 1* below
for (const sink of Object.keys(this.bufferSink)) {
// This is fully synchronous on purpose - otherwise we might run
// into complex synchronization issues where someone else manages
// to call read between the two operations
const size = this.bufferSink[sink].waitingToRead;
if (size) {
verbose(`Filter: wake up sink [${sink}]`);
this.bufferSink[sink].waitingToRead = 0;
this.read(sink, size);
q.then(() => {
src.busy = false;
verbose(`Filter: consumed data for source [${id}], pts=${frame.pts().toString()}`);
callback(null);
// Now that we pushed more data, try reading again, refer to 1* below
for (const sink of Object.keys(this.bufferSink)) {
// This is fully synchronous on purpose - otherwise we might run
// into complex synchronization issues where someone else manages
// to call read between the two operations
const size = this.bufferSink[sink].waitingToRead;
if (size) {
verbose(`Filter: wake up sink [${sink}]`);
this.bufferSink[sink].waitingToRead = 0;
this.read(sink, size);
}
}
}
})
.catch(callback);
})
.catch(callback);
} catch (err) {
callback(err as Error);
}
}

read(id: string, size: number) {
protected read(id: string, size: number) {
const sink = this.bufferSink[id];
if (!sink) {
throw new Error(`Invalid buffer sink [${id}]`);
Expand Down
Loading

0 comments on commit 7950488

Please sign in to comment.