Skip to content

Commit

Permalink
feat: add presence set change listener and related types
Browse files Browse the repository at this point in the history
- Provides a way to listen for presence set changes, as well as the presence message that moved the set into its current state.
- Also exposes the current sync state of the presence set in the change event.
  • Loading branch information
splindsay-92 committed Feb 13, 2025
1 parent 471cdef commit bfe4a2d
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 1 deletion.
59 changes: 59 additions & 0 deletions ably.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,40 @@ export interface RealtimePresenceParams {
connectionId?: string;
}

export interface PresenceSetChange {
/**
* An array containing the members of the current presence set.
*
* Each member is represented by their latest {@link PresenceMessage}.
*/
members: PresenceMessage[],
/**
* The presence message that triggered the change.
*
* Represented as a {@link PresenceMessage} object.
*/
current: PresenceMessage,
/**
* The previous presence member before the change, if any.
*
* Represented as a {@link PresenceMessage} object.
*/
previous?: PresenceMessage,
/**
* Indicates if the presence members' synchronization is in progress.
*
* When `true`, the presence set is not fully synchronized, and the `members` list may not
* accurately reflect the current state of the channel. It often occurs when there is a
* disruption in the connection or during an initial presence sync.
*
* If the presence set was previously synchronized, but became desynchronized due to a
* connection issue, this value will remain `true` until re-synchronization completes.
*
* It is recommended to only rely on the `members` list when `syncInProgress` is `false`.
*/
syncInProgress: boolean
}

/**
* The `RealtimeHistoryParams` interface describes the parameters accepted by the following methods:
*
Expand Down Expand Up @@ -1512,6 +1546,14 @@ export type messageCallback<T> = (message: T) => void;
* @param changeStateChange - The state change that occurred.
*/
export type channelEventCallback = (changeStateChange: ChannelStateChange) => void;
/**
* A callback invoked whenever there is a change in the presence set of a channel.
* This is used to listen to changes in the members and their states within a channel's presence set.
*
* @param presenceSetChange - The details of the presence set change event that occurred.
*/
export type PresenceSetChangeListener = (presenceSetChange: PresenceSetChange) => void;

/**
* The callback used for the events emitted by {@link Connection}.
*
Expand Down Expand Up @@ -1903,6 +1945,23 @@ export declare interface RealtimePresence {
* Indicates whether the presence set synchronization between Ably and the clients on the channel has been completed. Set to `true` when the sync is complete.
*/
syncComplete: boolean;
/**
* Registers a listener that is called each time the presence state for the channel changes. This includes when members join or leave the set, or when their metadata changes.
* The listener is provided with the current set of members,
* the current presence message that triggered the set update, and the previous presence message if applicable.
* The listener is also provided with a flag that indicates whether the presence set is synchronized.
*
* @param listener - A function of type {@link PresenceSetChangeListener}, which is invoked with details of the presence set change.
* @returns A promise which resolves upon success of the operation and rejects with an {@link ErrorInfo} object upon its failure.
*/
onPresenceSetChange(listener: PresenceSetChangeListener): Promise<void>
/**
* Deregisters a previously registered listener for presence state changes on the channel.
* This ensures that the provided listener will no longer be invoked when the presence state of the channel changes.
*
* @param listener - A function of type {@link PresenceSetChangeListener} that was previously registered using {@link onPresenceSetChange}.
*/
offPresenceSetChange(listener: PresenceSetChangeListener): void;
/**
* Deregisters a specific listener that is registered to receive {@link PresenceMessage} on the channel for a given {@link PresenceAction}.
*
Expand Down
50 changes: 49 additions & 1 deletion src/common/lib/client/realtimepresence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,25 @@ function waitAttached(channel: RealtimeChannel, callback: ErrCallback, action: (
}
}

type PresenceSetChangeListener = (event: {
members: PresenceMessage[];
current: PresenceMessage;
previous?: PresenceMessage;
syncInProgress: boolean;
}) => void;

class RealtimePresence extends EventEmitter {
channel: RealtimeChannel;
pendingPresence: { presence: WirePresenceMessage; callback: ErrCallback }[];
syncComplete: boolean;
private _current?: PresenceMessage;
private _previous?: PresenceMessage;
members: PresenceMap;
_myMembers: PresenceMap;
subscriptions: EventEmitter;
name?: string;
private _wrappedListenerMap: WeakMap<PresenceSetChangeListener, (...args: any[]) => void>;


constructor(channel: RealtimeChannel) {
super(channel.logger);
Expand All @@ -73,6 +84,8 @@ class RealtimePresence extends EventEmitter {
this._myMembers = new PresenceMap(this, (item) => item.clientId!);
this.subscriptions = new EventEmitter(this.logger);
this.pendingPresence = [];
this._wrappedListenerMap = new WeakMap();

}

async enter(data: unknown): Promise<void> {
Expand Down Expand Up @@ -101,7 +114,7 @@ class RealtimePresence extends EventEmitter {
id: string | undefined,
clientId: string | undefined,
data: unknown,
action: string,
action: string
): Promise<void> {
const channel = this.channel;
if (!channel.connectionManager.activeState()) {
Expand Down Expand Up @@ -443,6 +456,41 @@ class RealtimePresence extends EventEmitter {
});
}

async onPresenceSetChange(listener: PresenceSetChangeListener): Promise<void> {
const channel = this.channel;

const wrappedListener = (...args: any[]) => {
this._previous = this._current;
this._current = args[0];
listener({
members: this.members.values(),
current: args[0],
previous: this._previous,
syncInProgress: this.members.syncInProgress
});
};

// Store the listener so we can remove it later
this._wrappedListenerMap.set(listener, wrappedListener);

// Subscribe to the `subscriptions` Emitter
this.subscriptions.on(wrappedListener);

// TODO Add spec for Attach the channel if attachOnSubscribe is not set to false
if (channel.channelOptions.attachOnSubscribe !== false) {
await channel.attach();
}
}

offPresenceSetChange(listener: PresenceSetChangeListener): void {
const wrappedListener = this._wrappedListenerMap.get(listener);
if (wrappedListener) {
// Unsubscribe using the wrapped listener
this.subscriptions.off(wrappedListener);
this._wrappedListenerMap.delete(listener);
}
}

async subscribe(..._args: unknown[] /* [event], listener */): Promise<void> {
const args = RealtimeChannel.processListenerArgs(_args);
const event = args[0];
Expand Down

0 comments on commit bfe4a2d

Please sign in to comment.