Skip to content

Commit

Permalink
Support arguments to invoke a rate limited function with
Browse files Browse the repository at this point in the history
  • Loading branch information
afshin committed Jul 20, 2022
1 parent 3185c80 commit e60c936
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 12 deletions.
7 changes: 5 additions & 2 deletions packages/polling/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,11 @@ export namespace IPoll {
* @typeparam T - The resolved type of the underlying function. Defaults to any.
*
* @typeparam U - The rejected type of the underlying function. Defaults to any.
*
* @typeparam V - Arguments for the underlying function. Defaults to any[].
*/
export interface IRateLimiter<T = any, U = any> extends IDisposable {
export interface IRateLimiter<T = any, U = any, V extends any[] = any[]>
extends IDisposable {
/**
* The rate limit in milliseconds.
*/
Expand All @@ -169,7 +172,7 @@ export interface IRateLimiter<T = any, U = any> extends IDisposable {
/**
* Invoke the rate limited function.
*/
invoke(): Promise<T>;
invoke(...args: V): Promise<T>;

/**
* Stop the function if it is mid-flight.
Expand Down
49 changes: 40 additions & 9 deletions packages/polling/src/ratelimiter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,23 @@ import { Poll } from './poll';
* @typeparam T - The resolved type of the underlying function.
*
* @typeparam U - The rejected type of the underlying function.
*
* @typeparam V - Arguments for the underlying function.
*/
export abstract class RateLimiter<T, U> implements IRateLimiter<T, U> {
export abstract class RateLimiter<T, U, V extends any[]>
implements IRateLimiter<T, U, V> {
/**
* Instantiate a rate limiter.
*
* @param fn - The function to rate limit.
*
* @param limit - The rate limit; defaults to 500ms.
*/
constructor(fn: () => T | Promise<T>, limit = 500) {
constructor(fn: RateLimiter.Function<T, V>, limit = 500) {
this.limit = limit;
this.poll = new Poll({
auto: false,
factory: async () => await fn(),
factory: async () => (this.args ? fn(...this.args!) : fn()),
frequency: { backoff: false, interval: Poll.NEVER, max: Poll.NEVER },
standby: 'never'
});
Expand Down Expand Up @@ -75,7 +78,7 @@ export abstract class RateLimiter<T, U> implements IRateLimiter<T, U> {
/**
* Invoke the rate limited function.
*/
abstract invoke(): Promise<T>;
abstract invoke(...args: V): Promise<T>;

/**
* Stop the function if it is mid-flight.
Expand All @@ -84,6 +87,11 @@ export abstract class RateLimiter<T, U> implements IRateLimiter<T, U> {
return this.poll.stop();
}

/**
* Arguments for the underlying function.
*/
protected args: V | undefined = undefined;

/**
* A promise that resolves on each successful invocation.
*/
Expand All @@ -95,20 +103,33 @@ export abstract class RateLimiter<T, U> implements IRateLimiter<T, U> {
protected poll: Poll<T, U, 'invoked'>;
}

export namespace RateLimiter {
export type Function<T, V extends any[]> =
| ((...args: V) => T | Promise<T>)
| (() => T | Promise<T>);
}

/**
* Wraps and debounces a function that can be called multiple times and only
* executes the underlying function one `interval` after the last invocation.
*
* @typeparam T - The resolved type of the underlying function. Defaults to any.
*
* @typeparam U - The rejected type of the underlying function. Defaults to any.
*
* @typeparam V - Arguments for the underlying function. Defaults to any[].
*/
export class Debouncer<T = any, U = any> extends RateLimiter<T, U> {
export class Debouncer<
T = any,
U = any,
V extends any[] = any[]
> extends RateLimiter<T, U, V> {
/**
* Invokes the function and only executes after rate limit has elapsed.
* Each invocation resets the timer.
*/
invoke(): Promise<T> {
invoke(...args: V): Promise<T> {
this.args = args;
void this.poll.schedule({ interval: this.limit, phase: 'invoked' });
return this.payload!.promise;
}
Expand All @@ -121,8 +142,14 @@ export class Debouncer<T = any, U = any> extends RateLimiter<T, U> {
* @typeparam T - The resolved type of the underlying function. Defaults to any.
*
* @typeparam U - The rejected type of the underlying function. Defaults to any.
*
* @typeparam V - Arguments for the underlying function. Defaults to any[].
*/
export class Throttler<T = any, U = any> extends RateLimiter<T, U> {
export class Throttler<
T = any,
U = any,
V extends any[] = any[]
> extends RateLimiter<T, U, V> {
/**
* Instantiate a throttler.
*
Expand All @@ -133,7 +160,10 @@ export class Throttler<T = any, U = any> extends RateLimiter<T, U> {
* #### Notes
* The `edge` defaults to `leading`; the `limit` defaults to `500`.
*/
constructor(fn: () => T | Promise<T>, options?: Throttler.IOptions | number) {
constructor(
fn: RateLimiter.Function<T, V>,
options?: Throttler.IOptions | number
) {
super(fn, typeof options === 'number' ? options : options && options.limit);
let edge: 'leading' | 'trailing' = 'leading';
if (typeof options !== 'number') {
Expand All @@ -146,8 +176,9 @@ export class Throttler<T = any, U = any> extends RateLimiter<T, U> {
/**
* Throttles function invocations if one is currently in flight.
*/
invoke(): Promise<T> {
invoke(...args: V): Promise<T> {
if (this.poll.state.phase !== 'invoked') {
this.args = args;
void this.poll.schedule({ interval: this._interval, phase: 'invoked' });
}
return this.payload!.promise;
Expand Down
36 changes: 35 additions & 1 deletion packages/polling/tests/src/ratelimiter.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,23 @@ describe('Debouncer', () => {
await debouncer.invoke();
expect(counter).to.equal(2);
});

it('should accept arguments', async () => {
let sum = 0;
const fn = (value = 0) => {
sum += value;
};
debouncer = new Debouncer<void, any, [number]>(fn);
expect(sum).to.equal(0);
await debouncer.invoke(1);
expect(sum).to.equal(1);
void debouncer.invoke(10);
await debouncer.invoke(1);
expect(sum).to.equal(2);
void debouncer.invoke(10);
await debouncer.invoke(1);
expect(sum).to.equal(3);
});
});
});

Expand All @@ -44,7 +61,7 @@ describe('Throttler', () => {
});

describe('#constructor()', () => {
it('should create a debouncer', () => {
it('should create a throttler', () => {
throttler = new Throttler(async () => undefined);
expect(throttler).to.be.an.instanceof(Throttler);
});
Expand All @@ -64,6 +81,23 @@ describe('Throttler', () => {
expect(counter).to.equal(2);
});

it('should accept arguments', async () => {
let sum = 0;
const fn = (value = 0) => {
sum += value;
};
throttler = new Throttler<void, any, [number]>(fn);
expect(sum).to.equal(0);
await throttler.invoke(1);
expect(sum).to.equal(1);
void throttler.invoke(10);
await throttler.invoke(1);
expect(sum).to.equal(11); // add 10 to sum NOT 1
void throttler.invoke(10);
await throttler.invoke(1);
expect(sum).to.equal(21); // add 10 to sum NOT 1
});

it('should collapse invocations into one promise per cycle', async () => {
throttler = new Throttler(() => undefined, limit);
const first = throttler.invoke();
Expand Down

0 comments on commit e60c936

Please sign in to comment.