Skip to content

Commit

Permalink
feat: add presence set change listener and related types
Browse files Browse the repository at this point in the history
- Provides a way to listen for presence set changes, as well as the presence message that moved the set into its current state.
- Also exposes the current sync state of the presence set in the change event.
  • Loading branch information
splindsay-92 committed Feb 13, 2025
1 parent 471cdef commit 01a535a
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 1 deletion.
59 changes: 59 additions & 0 deletions ably.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,40 @@ export interface RealtimePresenceParams {
connectionId?: string;
}

export interface PresenceSetChange {
/**
* An array containing the members of the current presence set.
*
* Each member is represented by their latest {@link PresenceMessage}.
*/
members: PresenceMessage[],
/**
* The presence message that triggered the change.
*
* Represented as a {@link PresenceMessage} object.
*/
current: PresenceMessage,
/**
* The previous presence member before the change, if any.
*
* Represented as a {@link PresenceMessage} object.
*/
previous?: PresenceMessage,
/**
* Indicates if the presence members' synchronization is in progress.
*
* When `true`, the presence set is not fully synchronized, and the `members` list may not
* accurately reflect the current state of the channel. It often occurs when there is a
* disruption in the connection or during an initial presence sync.
*
* If the presence set was previously synchronized, but became desynchronized due to a
* connection issue, this value will remain `true` until re-synchronization completes.
*
* It is recommended to only rely on the `members` list when `syncInProgress` is `false`.
*/
syncInProgress: boolean
}

/**
* The `RealtimeHistoryParams` interface describes the parameters accepted by the following methods:
*
Expand Down Expand Up @@ -1512,6 +1546,14 @@ export type messageCallback<T> = (message: T) => void;
* @param changeStateChange - The state change that occurred.
*/
export type channelEventCallback = (changeStateChange: ChannelStateChange) => void;
/**
* A callback invoked whenever there is a change in the presence set of a channel.
* This is used to listen to changes in the members and their states within a channel's presence set.
*
* @param presenceSetChange - The details of the presence set change event that occurred.
*/
export type PresenceSetChangeListener = (presenceSetChange: PresenceSetChange) => void;

/**
* The callback used for the events emitted by {@link Connection}.
*
Expand Down Expand Up @@ -1903,6 +1945,23 @@ export declare interface RealtimePresence {
* Indicates whether the presence set synchronization between Ably and the clients on the channel has been completed. Set to `true` when the sync is complete.
*/
syncComplete: boolean;
/**
* Registers a listener that is called each time the presence state for the channel changes. This includes when members join or leave the set, or when their metadata changes.
* The listener is provided with the current set of members,
* the current presence message that triggered the set update, and the previous presence message if applicable.
* The listener is also provided with a flag that indicates whether the presence set is synchronized.
*
* @param listener - A function of type {@link PresenceSetChangeListener}, which is invoked with details of the presence set change.
* @returns A promise which resolves upon success of the operation and rejects with an {@link ErrorInfo} object upon its failure.
*/
onPresenceSetChange(listener: PresenceSetChangeListener): Promise<void>
/**
* Deregisters a previously registered listener for presence state changes on the channel.
* This ensures that the provided listener will no longer be invoked when the presence state of the channel changes.
*
* @param listener - A function of type {@link PresenceSetChangeListener} that was previously registered using {@link onPresenceSetChange}.
*/
offPresenceSetChange(listener: PresenceSetChangeListener): void;
/**
* Deregisters a specific listener that is registered to receive {@link PresenceMessage} on the channel for a given {@link PresenceAction}.
*
Expand Down
56 changes: 55 additions & 1 deletion src/common/lib/client/realtimepresence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ function waitAttached(channel: RealtimeChannel, callback: ErrCallback, action: (
}
}

type PresenceSetChangeListener = (event: {
members: PresenceMessage[];
current: PresenceMessage;
previous?: PresenceMessage;
syncInProgress: boolean;
}) => void;

class RealtimePresence extends EventEmitter {
channel: RealtimeChannel;
pendingPresence: { presence: WirePresenceMessage; callback: ErrCallback }[];
Expand All @@ -63,6 +70,9 @@ class RealtimePresence extends EventEmitter {
_myMembers: PresenceMap;
subscriptions: EventEmitter;
name?: string;
private _current?: PresenceMessage;
private _previous?: PresenceMessage;
private _presenceSetChangeEventEmitter: EventEmitter

constructor(channel: RealtimeChannel) {
super(channel.logger);
Expand All @@ -73,6 +83,10 @@ class RealtimePresence extends EventEmitter {
this._myMembers = new PresenceMap(this, (item) => item.clientId!);
this.subscriptions = new EventEmitter(this.logger);
this.pendingPresence = [];
this._presenceSetChangeEventEmitter = new EventEmitter(this.logger);

// Subscribe the internal listener to the presence set change event
this._internalPresenceSubscribe();
}

async enter(data: unknown): Promise<void> {
Expand Down Expand Up @@ -101,7 +115,7 @@ class RealtimePresence extends EventEmitter {
id: string | undefined,
clientId: string | undefined,
data: unknown,
action: string,
action: string
): Promise<void> {
const channel = this.channel;
if (!channel.connectionManager.activeState()) {
Expand Down Expand Up @@ -443,6 +457,42 @@ class RealtimePresence extends EventEmitter {
});
}

private _internalPresenceSubscribe(): void {
this.subscriptions.on((...args: any[]) => {
this._previous = this._current;
this._current = args[0];
this._presenceSetChangeEventEmitter.emit('internal',{
members: this.members.values(),
current: this._current,
previous: this._previous,
syncInProgress: this.members.syncInProgress,
});
});
}

async onPresenceSetChange(listener: PresenceSetChangeListener): Promise<void> {
const channel = this.channel;
if (channel.state === 'failed') {
throw ErrorInfo.fromValues(channel.invalidStateError());
}

// Add the listener to the dedicated presence emitter
this._presenceSetChangeEventEmitter.on(listener);

// TODO - Add spec point for this
if (channel.channelOptions.attachOnSubscribe !== false) {
await channel.attach();
}
}

/**
* Removes a previously subscribed listener.
*/
offPresenceSetChange(listener: PresenceSetChangeListener): void {
// Remove the listener from the dedicated presence emitter
this._presenceSetChangeEventEmitter.off(listener);
}

async subscribe(..._args: unknown[] /* [event], listener */): Promise<void> {
const args = RealtimeChannel.processListenerArgs(_args);
const event = args[0];
Expand All @@ -466,6 +516,10 @@ class RealtimePresence extends EventEmitter {
const event = args[0];
const listener = args[1];
this.subscriptions.off(event, listener);
if (this.subscriptions.listeners.length === 0) {
// Resubscribe the internal listener if this unsubscribe() call has removed all listeners
this._internalPresenceSubscribe();
}
}
}

Expand Down

0 comments on commit 01a535a

Please sign in to comment.