Skip to content

Commit

Permalink
RTMP streaming test-example (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
mmomtchev authored Jan 2, 2024
1 parent 59b2eb4 commit 8e3f405
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## []
- Add `streams/Filter` to support ffmpeg filters
- Support the built-in networking capabilities of ffmpeg
- Support piping from a `ReadStream` to a `Demuxer`
- Support piping from a `Muxer` to a `WriteStream`
- Send `error` events on `Demuxer` and `Muxer`
Expand Down
10 changes: 10 additions & 0 deletions src/binding/avcpp-nobind.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ NOBIND_MODULE_DATA(ffmpeg, m, ffmpegInstanceData) {
"openInput")
.def<static_cast<void (FormatContext::*)(const std::string &, OptionalErrorCode)>(&FormatContext::openInput),
Nobind::ReturnAsync>("openInputAsync")
.def<static_cast<void (FormatContext::*)(const std::string &, Dictionary &, OptionalErrorCode)>(
&FormatContext::openInput)>("openInputOptions")
.def<static_cast<void (FormatContext::*)(const std::string &, Dictionary &, OptionalErrorCode)>(
&FormatContext::openInput),
Nobind::ReturnAsync>("openInputOptionsAsync")
.def<static_cast<void (FormatContext::*)(CustomIO *, InputFormat, OptionalErrorCode, size_t)>(
&FormatContext::openInput),
Nobind::ReturnAsync>("openWritableAsync")
Expand All @@ -84,6 +89,11 @@ NOBIND_MODULE_DATA(ffmpeg, m, ffmpegInstanceData) {
"openOutput")
.def<static_cast<void (FormatContext::*)(const std::string &, OptionalErrorCode)>(&FormatContext::openOutput),
Nobind::ReturnAsync>("openOutputAsync")
.def<static_cast<void (FormatContext::*)(const std::string &, Dictionary &, OptionalErrorCode)>(
&FormatContext::openOutput)>("openOutputOptions")
.def<static_cast<void (FormatContext::*)(const std::string &, Dictionary &, OptionalErrorCode)>(
&FormatContext::openOutput),
Nobind::ReturnAsync>("openOutputOptionsAsync")
.def<static_cast<void (FormatContext::*)(CustomIO *, OptionalErrorCode, size_t)>(&FormatContext::openOutput),
Nobind::ReturnAsync>("openReadableAsync")
.def<static_cast<Stream (FormatContext::*)(const VideoEncoderContext &, OptionalErrorCode)>(
Expand Down
12 changes: 10 additions & 2 deletions src/lib/Demuxer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ export interface DemuxerOptions extends ReadableOptions {
* Amount of data to buffer, only when reading from a ReadStream, @default 64Kb
*/
highWaterMark?: number;
/**
* Open options
*/
openOptions?: Record<string, string>;
}

/**
Expand Down Expand Up @@ -49,6 +53,7 @@ export class Demuxer extends EventEmitter {
protected highWaterMark: number;
protected formatContext: any;
protected rawStreams: any[];
protected openOptions: Record<string, string>;
streams: EncodedMediaReadable[];
video: EncodedMediaReadable[];
audio: EncodedMediaReadable[];
Expand All @@ -64,6 +69,7 @@ export class Demuxer extends EventEmitter {
this.input = new ffmpeg.WritableCustomIO;
}
this.highWaterMark = options?.highWaterMark ?? (64 * 1024);
this.openOptions = options?.openOptions ?? {};
this.rawStreams = [];
this.streams = [];
this.video = [];
Expand All @@ -76,8 +82,8 @@ export class Demuxer extends EventEmitter {
try {
this.formatContext = new FormatContext;
if (this.inputFile) {
verbose(`Demuxer: opening ${this.inputFile}`);
await this.formatContext.openInputAsync(this.inputFile);
verbose(`Demuxer: opening ${this.inputFile}`, this.openOptions);
await this.formatContext.openInputOptionsAsync(this.inputFile, this.openOptions);
} else {
verbose('Demuxer: reading from ReadStream');
const format = new ffmpeg.InputFormat;
Expand Down Expand Up @@ -125,6 +131,7 @@ export class Demuxer extends EventEmitter {
if (pkt.isNull()) {
verbose('Demuxer: End of stream');
for (const s of this.streams) s.push(null);
this.emit('close');
return;
}
if (!this.streams[pkt.streamIndex()]) {
Expand All @@ -140,6 +147,7 @@ export class Demuxer extends EventEmitter {
verbose('Demuxer: end of _read');
})()
.catch((err) => {
verbose(`Demuxer: ${err}`);
for (const s of this.streams) s.destroy(err);
this.emit('error', err);
})
Expand Down
11 changes: 8 additions & 3 deletions src/lib/Muxer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ export interface MuxerOptions extends WritableOptions {
* Output format options
*/
outputFormatOptions?: Record<string, string>;
/**
* Open options
*/
openOptions?: Record<string, string>;
}

/**
Expand Down Expand Up @@ -54,6 +58,7 @@ export class Muxer extends EventEmitter {
protected outputFormatName: string;
protected outputFormatOptions: Record<string, string>;
protected outputFormat: any;
protected openOptions: Record<string, string>;
protected formatContext: any;
protected rawStreams: MediaEncoder[];
protected writing: boolean;
Expand All @@ -78,6 +83,7 @@ export class Muxer extends EventEmitter {
this.highWaterMark = options.highWaterMark ?? (64 * 1024);
this.outputFormatName = options.outputFormat ?? '';
this.outputFormatOptions = options.outputFormatOptions ?? {};
this.openOptions = options.openOptions ?? {};
this.rawStreams = options.streams;
this.streams = [];
this.audio = [];
Expand Down Expand Up @@ -178,7 +184,7 @@ export class Muxer extends EventEmitter {
// If all inputs are not properly primed before opening the muxer, this can lead
// to some very subtle problems such as the codec flags not being properly carried over
await Promise.all(this.ready);
verbose(`Muxer: opening ${this.outputFile}, all inputs are primed`);
verbose(`Muxer: opening ${this.outputFile}, all inputs are primed`, this.openOptions);

for (const idx in this.rawStreams) {
const codec = this.rawStreams[idx].codec();
Expand All @@ -198,7 +204,7 @@ export class Muxer extends EventEmitter {
}

if (!this.output) {
await this.formatContext.openOutputAsync(this.outputFile);
await this.formatContext.openOutputOptionsAsync(this.outputFile, this.openOptions);
} else {
await this.formatContext.openReadableAsync(this.output, this.highWaterMark);
}
Expand Down Expand Up @@ -242,7 +248,6 @@ export class Muxer extends EventEmitter {
} catch (err) {
verbose(`Muxer: ${err}`);
job.callback(err as Error);
for (const s of this.streams) s.destroy(err as Error);
this.destroy(err as Error);
}
}
Expand Down
103 changes: 103 additions & 0 deletions test/rtmp.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import * as path from 'node:path';

import { assert } from 'chai';

import ffmpeg from '@mmomtchev/ffmpeg';
import { Muxer, Demuxer, VideoDecoder, VideoEncoder, AudioDecoder, AudioEncoder } from '@mmomtchev/ffmpeg/stream';

// These test-examples uses ffmpeg's built-in network capabilities - which include the RTMP protocol
describe('using ffmpeg built-in networking', () => {
// This tests-example sets both a server and a client that communicate over 'rtmp://localhost:9099/video'
it('ffmpeg RTMP network streaming', (done) => {
const demuxer = new Demuxer({ inputFile: path.resolve(__dirname, 'data', 'launch.mp4') });

demuxer.on('error', done);
demuxer.on('ready', () => {
try {
// Setup the server
const audioInput = new AudioDecoder(demuxer.audio[0]);
const videoInput = new VideoDecoder(demuxer.video[0]);

const videoDefinition = videoInput.definition();
const audioDefinition = audioInput.definition();

const videoOutput = new VideoEncoder({
type: 'Video',
codec: ffmpeg.AV_CODEC_H264,
bitRate: 2.5e6,
width: videoDefinition.width,
height: videoDefinition.height,
frameRate: new ffmpeg.Rational(25, 1),
pixelFormat: videoDefinition.pixelFormat
});

const audioOutput = new AudioEncoder({
type: 'Audio',
codec: ffmpeg.AV_CODEC_AAC,
bitRate: 128e3,
sampleRate: audioDefinition.sampleRate,
sampleFormat: audioDefinition.sampleFormat,
channelLayout: audioDefinition.channelLayout
});

const muxer = new Muxer({
outputFile: 'rtmp://localhost:9099/video',
outputFormat: 'flv',
openOptions: { listen: '1' },
streams: [videoOutput, audioOutput]
});

muxer.on('error', done);

// Run the server
demuxer.video[0].pipe(videoInput).pipe(videoOutput).pipe(muxer.video[0]);
demuxer.audio[0].pipe(audioInput).pipe(audioOutput).pipe(muxer.audio[0]);

// Wait for it to be ready
// Alas, it is impossible to implement a reliable event for when the server is ready
// The method which binds ffmpeg to the port does not return until a client has connected
setTimeout(() => {
// Setup the client
const client = new Demuxer({ inputFile: 'rtmp://localhost:9099/show' });
client.on('error', done);
// Wait for the client to start streaming
client.on('ready', () => {
try {
const audioStream = new AudioDecoder(client.audio[0]);
const videoStream = new VideoDecoder(client.video[0]);

let audioFrames = 0, videoFrames = 0;
audioStream.on('data', (frame) => {
assert.instanceOf(frame, ffmpeg.AudioSamples);
audioFrames++;
});
videoStream.on('data', (frame) => {
assert.instanceOf(frame, ffmpeg.VideoFrame);
videoFrames++;
});

// Alas, closing the connection on the server-side
// and reporting an I/O error on the last read on the client side
// is the "normal" closing in ffmpeg
client.audio[0].on('error', () => undefined);
client.video[0].on('error', () => undefined);
client.removeAllListeners('error');
client.on('error', () => {
assert.isAbove(audioFrames, 200);
assert.isAbove(videoFrames, 100);
done();
});

client.audio[0].pipe(audioStream);
client.video[0].pipe(videoStream);
} catch (err) {
done(err);
}
});
}, 5000);
} catch (err) {
done(err);
}
});
});
});

0 comments on commit 8e3f405

Please sign in to comment.