Skip to content

Commit

Permalink
feat: new events (streamUpdate)
Browse files Browse the repository at this point in the history
  • Loading branch information
aiko-chan-ai committed Oct 29, 2024
1 parent d410202 commit f13e0e5
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 5 deletions.
2 changes: 1 addition & 1 deletion docs/main.json

Large diffs are not rendered by default.

112 changes: 112 additions & 0 deletions src/client/voice/VoiceConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,10 @@ class VoiceConnection extends EventEmitter {
this.streamConnection.disconnect();
break;
}
case 'STREAM_UPDATE': {
this.streamConnection.update(data);
break;
}
}
}
if (this.streamWatchConnection.has(StreamKey.userId) && this.channel.id == StreamKey.channelId) {
Expand All @@ -703,6 +707,11 @@ class VoiceConnection extends EventEmitter {
}
case 'STREAM_DELETE': {
streamConnection.disconnect();
streamConnection.receiver.packets.destroyAllStream();
break;
}
case 'STREAM_UPDATE': {
streamConnection.update(data);
break;
}
}
Expand Down Expand Up @@ -791,6 +800,10 @@ class VoiceConnection extends EventEmitter {
this.streamConnection.disconnect();
break;
}
case 'STREAM_UPDATE': {
this.streamConnection.update(data);
break;
}
}
}
if (this.streamWatchConnection.has(StreamKey.userId) && this.channel.id == StreamKey.channelId) {
Expand All @@ -808,6 +821,11 @@ class VoiceConnection extends EventEmitter {
}
case 'STREAM_DELETE': {
streamConnection.disconnect();
streamConnection.receiver.packets.destroyAllStream();
break;
}
case 'STREAM_UPDATE': {
streamConnection.update(data);
break;
}
}
Expand Down Expand Up @@ -844,6 +862,20 @@ class VoiceConnection extends EventEmitter {
}
});
}

/**
* @event VoiceConnection#streamUpdate
* @description Emitted when the StreamConnection or StreamConnectionReadonly
* state changes, providing the previous and current stream state.
*
* @param {StreamState} oldData - The previous state of the stream.
* @param {StreamState} newData - The current state of the stream.
*
* @typedef {Object} StreamState
* @property {boolean} isPaused - Indicates whether the stream is currently paused.
* @property {string|null} region - The region where the stream is hosted, or null if not specified.
* @property {Snowflake[]} viewerIds - An array of Snowflake IDs representing the viewers connected to the stream.
*/
}

/**
Expand Down Expand Up @@ -890,6 +922,18 @@ class StreamConnection extends VoiceConnection {
* @type {boolean}
*/
this.isPaused = false;

/**
* Viewer IDs
* @type {Snowflake[]}
*/
this.viewerIds = [];

/**
* Voice region name
* @type {string | null}
*/
this.region = null;
}

createStreamConnection() {
Expand Down Expand Up @@ -951,6 +995,19 @@ class StreamConnection extends VoiceConnection {
*/
sendScreenshareState(isPaused = false) {
if (isPaused == this.isPaused) return;
this.emit(
'streamUpdate',
{
isPaused: this.isPaused,
region: this.region,
viewerIds: this.viewerIds,
},
{
isPaused,
region: this.region,
viewerIds: this.viewerIds,
},
);
this.isPaused = isPaused;
this.channel.client.ws.broadcast({
op: Opcodes.STREAM_SET_PAUSED,
Expand All @@ -976,6 +1033,24 @@ class StreamConnection extends VoiceConnection {
});
}

update(data) {
this.emit(
'streamUpdate',
{
isPaused: this.isPaused,
region: this.region,
viewerIds: this.viewerIds.slice(),
},
{
isPaused: data.paused,
region: data.region,
viewerIds: data.viewer_ids,
},
);
this.viewerIds = data.viewer_ids;
this.region = data.region;
}

/**
* Current stream key
* @type {string}
Expand Down Expand Up @@ -1032,6 +1107,24 @@ class StreamConnectionReadonly extends VoiceConnection {
* @type {string | null}
*/
this.serverId = null;

/**
* Stream state
* @type {boolean}
*/
this.isPaused = false;

/**
* Viewer IDs
* @type {Snowflake[]}
*/
this.viewerIds = [];

/**
* Voice region name
* @type {string | null}
*/
this.region = null;
}

createStreamConnection() {
Expand Down Expand Up @@ -1097,6 +1190,25 @@ class StreamConnectionReadonly extends VoiceConnection {
});
}

update(data) {
this.emit(
'streamUpdate',
{
isPaused: this.isPaused,
region: this.region,
viewerIds: this.viewerIds.slice(),
},
{
isPaused: data.paused,
region: data.region,
viewerIds: data.viewer_ids,
},
);
this.isPaused = data.paused;
this.viewerIds = data.viewer_ids;
this.region = data.region;
}

/**
* Current stream key
* @type {string}
Expand Down
22 changes: 21 additions & 1 deletion src/client/voice/receiver/FFmpegHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,17 @@ const { StreamOutput } = require('../util/Socket');
* @extends {EventEmitter}
*/
class FFmpegHandler extends EventEmitter {
constructor(codec, portUdp, output, isEnableAudio) {
constructor(receiver, userId, codec, portUdp, output, isEnableAudio) {
super();

Object.defineProperty(this, 'receiver', { value: receiver });

/**
* The user ID
* @type {Snowflake}
*/
this.userId = userId;

/**
* If the audio is enabled
* @type {boolean}
Expand Down Expand Up @@ -130,9 +138,21 @@ class FFmpegHandler extends EventEmitter {
let process = list.find(o => o.pid === ffmpegPid || o.ppid === ffmpegPid || o.cmd.includes(args));
if (process) {
kill(process.pid);
this.receiver.videoStreams.delete(this.userId);
this.emit('closed');
}
});
}

/**
* Emitted when the FFmpegHandler becomes ready to start working.
* @event FFmpegHandler#ready
*/

/**
* Emitted when the FFmpegHandler is closed.
* @event FFmpegHandler#closed
*/
}

module.exports = FFmpegHandler;
14 changes: 13 additions & 1 deletion src/client/voice/receiver/PacketHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class PacketHandler extends EventEmitter {

makeVideoStream(user, portUdp, codec, output, isEnableAudio = false) {
if (this.videoStreams.has(user)) return this.videoStreams.get(user);
const stream = new FFmpegHandler(codec, portUdp, output, isEnableAudio);
const stream = new FFmpegHandler(this, user, codec, portUdp, output, isEnableAudio);
stream.on('ready', () => {
this.videoStreams.set(user, stream);
});
Expand Down Expand Up @@ -231,6 +231,18 @@ class PacketHandler extends EventEmitter {
this.videoReceiver(buffer);
this.audioReceiverForStream(buffer);
}

// When udp connection is closed (STREAM_DELETE), destroy all streams (Memory leak)
destroyAllStream() {
for (const stream of this.streams.values()) {
stream.stream.destroy();
}
this.streams.clear();
for (const stream of this.videoStreams.values()) {
stream.destroy();
}
this.videoStreams.clear();
}
}

module.exports = PacketHandler;
2 changes: 1 addition & 1 deletion src/util/Util.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const payloadTypes = [
payload_type: 103,
rtx_payload_type: 104,
encode: false,
decode: false,
decode: false, // Working but very glitchy
},
{
name: 'H264',
Expand Down
18 changes: 17 additions & 1 deletion typings/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1059,25 +1059,35 @@ export class VoiceConnection extends EventEmitter {
public on(event: 'error' | 'failed' | 'disconnect', listener: (error: Error) => void): this;
public on(event: 'speaking', listener: (user: User, speaking: Readonly<Speaking>) => void): this;
public on(event: 'warn', listener: (warning: string | Error) => void): this;
public on(event: 'streamUpdate', listener: (oldState: StreamState, newState: StreamState) => void): this;
public on(event: string, listener: (...args: any[]) => void): this;

public once(event: 'authenticated' | 'closing' | 'newSession' | 'ready' | 'reconnecting', listener: () => void): this;
public once(event: 'debug', listener: (message: string) => void): this;
public once(event: 'error' | 'failed' | 'disconnect', listener: (error: Error) => void): this;
public once(event: 'speaking', listener: (user: User, speaking: Readonly<Speaking>) => void): this;
public once(event: 'warn', listener: (warning: string | Error) => void): this;
public once(event: 'streamUpdate', listener: (oldState: StreamState, newState: StreamState) => void): this;
public once(event: string, listener: (...args: any[]) => void): this;

public createStreamConnection(): Promise<StreamConnection>;
public joinStreamConnection(user: UserResolvable): Promise<StreamConnectionReadonly>;
}

export interface StreamState {
isPaused: boolean;
region: string | null;
viewerIds: Snowflake[];
}

export class StreamConnection extends VoiceConnection {
public createStreamConnection(): Promise<this>;
public readonly voiceConnection: VoiceConnection;
public serverId: Snowflake;
public isPaused: boolean;
public region: string | null;
public streamConnection: this;
public viewerIds: Snowflake[];
public sendSignalScreenshare(): void;
public sendScreenshareState(isPause: boolean): void;
private sendStopScreenshare(): void;
Expand All @@ -1088,8 +1098,11 @@ export class StreamConnectionReadonly extends VoiceConnection {
public joinStreamConnection(): Promise<this>;
public readonly voiceConnection: VoiceConnection;
public serverId: Snowflake;
public isPaused: boolean;
public region: string | null;
public userId: Snowflake;
public streamConnection: null;
public viewerIds: Snowflake[];
public sendSignalScreenshare(): void;
private sendStopScreenshare(): void;
public readonly streamKey: string;
Expand All @@ -1100,17 +1113,20 @@ export class StreamConnectionReadonly extends VoiceConnection {
}

export class FFmpegHandler extends EventEmitter {
public codec: VideoCodec | 'H265' | 'VP9' | 'AV1';
public codec: 'H264';
public portUdp: number;
public ready: boolean;
public stream: ChildProcessWithoutNullStreams;
public socket: Socket;
public socketAudio: Socket;
public output: Writable | string;
public isEnableAudio: boolean;
public userId: Snowflake;
public sendPayloadToFFmpeg(payload: Buffer, isAudio?: boolean): void;
public on(event: 'ready', listener: () => void): this;
public once(event: 'ready', listener: () => void): this;
public on(event: 'closed', listener: () => void): this;
public once(event: 'closed', listener: () => void): this;
public destroy(): void;
}

Expand Down

0 comments on commit f13e0e5

Please sign in to comment.