Skip to content

Commit

Permalink
add subscribe since API (#146)
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia authored Aug 28, 2021
1 parent 799076b commit 9f5570d
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 5 deletions.
10 changes: 7 additions & 3 deletions dist/centrifuge.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ declare class Centrifuge extends EventEmitter {
stopBatching(): void;
startSubscribeBatching(): void;
stopSubscribeBatching(): void;
subscribe(channel: string, events?: (...args: any[]) => void): Centrifuge.Subscription;
subscribe(channel: string, events?: Centrifuge.SubscriptionEvents): Centrifuge.Subscription;
subscribe(channel: string, events?: (...args: any[], opts?: Centrifuge.SubscribeOptions) => void): Centrifuge.Subscription;
subscribe(channel: string, events?: Centrifuge.SubscriptionEvents, opts?: Centrifuge.SubscribeOptions): Centrifuge.Subscription;
}

declare namespace Centrifuge {
Expand Down Expand Up @@ -93,7 +93,7 @@ declare namespace Centrifuge {
export class Subscription extends EventEmitter {
channel: string;
ready(callback: (ctx: SubscribeSuccessContext) => void, errback: (ctx: SubscribeErrorContext) => void): void;
subscribe(): void;
subscribe(opts?: SubscribeOptions): void;
unsubscribe(): void;
publish(data: any): Promise<PublishResult>;
presence(): Promise<PresenceResult>;
Expand Down Expand Up @@ -207,6 +207,10 @@ declare namespace Centrifuge {
reverse?: boolean;
}

export interface SubscribeOptions {
since?: StreamPosition;
}

export interface StreamPosition {
offset: number;
epoch: string;
Expand Down
11 changes: 10 additions & 1 deletion src/centrifuge.js
Original file line number Diff line number Diff line change
Expand Up @@ -1887,7 +1887,13 @@ export class Centrifuge extends EventEmitter {
}
};

subscribe(channel, events) {
_setSubscribeSince(sub, since) {
this._lastOffset[sub.channel] = since.offset;
this._lastEpoch[sub.channel] = since.epoch;
sub._setNeedRecover(true);
}

subscribe(channel, events, opts) {
const currentSub = this._getSub(channel);
if (currentSub !== null) {
currentSub._setEvents(events);
Expand All @@ -1898,6 +1904,9 @@ export class Centrifuge extends EventEmitter {
}
const sub = new Subscription(this, channel, events);
this._subs[channel] = sub;
if (opts && opts.since) {
this._setSubscribeSince(sub, opts.since);
}
sub.subscribe();
return sub;
};
Expand Down
10 changes: 9 additions & 1 deletion src/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ export default class Subscription extends EventEmitter {
}).then(function () {}, function () {});
};

_setNeedRecover(enabled) {
this._recoverable = enabled;
this._recover = enabled;
}

_needRecover() {
return this._recoverable === true && this._recover === true;
};
Expand Down Expand Up @@ -203,11 +208,14 @@ export default class Subscription extends EventEmitter {
}
};

subscribe() {
subscribe(opts) {
if (this._status === _STATE_SUCCESS) {
return;
}
this._noResubscribe = false;
if (opts && opts.since) {
this._centrifuge._setSubscribeSince(this, opts.since);
}
this._centrifuge._subscribe(this);
};

Expand Down

0 comments on commit 9f5570d

Please sign in to comment.