From 5cda7275a06b5551a657da2025f0ec91ff642bcd Mon Sep 17 00:00:00 2001 From: Adrien Crivelli Date: Mon, 27 Dec 2021 14:49:59 +0100 Subject: [PATCH 01/11] Batch link can cancel operations that are in queue or in flight After an operation has been subscribed to, and so queued, it is possible to unsubscribe from it, and it will be removed from the queue. Unsubscribing will not impact the debounce, so other operations, if any, will not be delayed by an unsubscription. If a batch of operation is already in flight, and all operations are unsubscribed then the entire XHR will be cancelled. If only some operations are unsubscribed the XHR will be left untouched. --- .../batch-http/__tests__/batchHttpLink.ts | 2 +- src/link/batch/__tests__/batchLink.ts | 201 +++++++++++++++--- src/link/batch/batching.ts | 134 +++++++----- 3 files changed, 255 insertions(+), 82 deletions(-) diff --git a/src/link/batch-http/__tests__/batchHttpLink.ts b/src/link/batch-http/__tests__/batchHttpLink.ts index 2a711c95bd0..2a43ed1a25b 100644 --- a/src/link/batch-http/__tests__/batchHttpLink.ts +++ b/src/link/batch-http/__tests__/batchHttpLink.ts @@ -193,7 +193,7 @@ describe('BatchHttpLink', () => { }, batchInterval: 1, //if batchKey does not work, then the batch size would be 3 - batchMax: 3, + batchMax: 2, batchKey, }), ]); diff --git a/src/link/batch/__tests__/batchLink.ts b/src/link/batch/__tests__/batchLink.ts index 78cdefabbee..165ed0db115 100644 --- a/src/link/batch/__tests__/batchLink.ts +++ b/src/link/batch/__tests__/batchLink.ts @@ -1,17 +1,12 @@ import gql from 'graphql-tag'; -import { print } from 'graphql'; - -import { ApolloLink } from '../../core/ApolloLink'; -import { execute } from '../../core/execute'; -import { Operation, FetchResult, GraphQLRequest } from '../../core/types'; -import { Observable } from '../../../utilities/observables/Observable'; -import { - BatchLink, - OperationBatcher, - BatchHandler, - BatchableRequest, -} from '../batchLink'; -import { itAsync } from '../../../testing'; +import {print} from 'graphql'; + +import {ApolloLink} from '../../core/ApolloLink'; +import {execute} from '../../core/execute'; +import {FetchResult, GraphQLRequest, Operation} from '../../core/types'; +import {Observable, ObservableSubscription} from '../../../utilities/observables/Observable'; +import {BatchableRequest, BatchHandler, BatchLink, OperationBatcher,} from '../batchLink'; +import {itAsync} from '../../../testing'; interface MockedResponse { request: GraphQLRequest; @@ -27,7 +22,7 @@ function getKey(operation: GraphQLRequest) { return JSON.stringify([operationName, query, variables]); } -export function createOperation( +function createOperation( starting: any, operation: GraphQLRequest, ): Operation { @@ -188,9 +183,9 @@ describe('OperationBatcher', () => { expect(batcher.queuedRequests.get('')).toBeUndefined(); batcher.enqueueRequest(request).subscribe({}); - expect(batcher.queuedRequests.get('')!.length).toBe(1); + expect(batcher.queuedRequests.get('')!.requests.length).toBe(1); batcher.enqueueRequest(request).subscribe({}); - expect(batcher.queuedRequests.get('')!.length).toBe(2); + expect(batcher.queuedRequests.get('')!.requests.length).toBe(2); }); describe('request queue', () => { @@ -304,7 +299,7 @@ describe('OperationBatcher', () => { }); try { - expect(myBatcher.queuedRequests.get('')!.length).toBe(2); + expect(myBatcher.queuedRequests.get('')!.requests.length).toBe(2); const observables: ( | Observable | undefined)[] = myBatcher.consumeQueue()!; @@ -346,22 +341,22 @@ describe('OperationBatcher', () => { myBatcher.enqueueRequest({ operation }).subscribe({}); myBatcher.enqueueRequest({ operation }).subscribe({}); myBatcher.enqueueRequest({ operation }).subscribe({}); - expect(myBatcher.queuedRequests.get('')!.length).toEqual(3); + expect(myBatcher.queuedRequests.get('')!.requests.length).toEqual(3); // 2. Run the timer halfway. jest.advanceTimersByTime(batchInterval / 2); - expect(myBatcher.queuedRequests.get('')!.length).toEqual(3); + expect(myBatcher.queuedRequests.get('')!.requests.length).toEqual(3); // 3. Queue a 4th request, causing the timer to reset. myBatcher.enqueueRequest({ operation }).subscribe({}); - expect(myBatcher.queuedRequests.get('')!.length).toEqual(4); + expect(myBatcher.queuedRequests.get('')!.requests.length).toEqual(4); // 4. Run the timer to batchInterval + 1, at this point, if debounce were // not set, the original 3 requests would have fired, but we expect // instead that the queries will instead fire at // (batchInterval + batchInterval / 2). jest.advanceTimersByTime(batchInterval / 2 + 1); - expect(myBatcher.queuedRequests.get('')!.length).toEqual(4); + expect(myBatcher.queuedRequests.get('')!.requests.length).toEqual(4); // 5. Finally, run the timer to (batchInterval + batchInterval / 2) +1, // and expect the queue to be empty. @@ -396,7 +391,7 @@ describe('OperationBatcher', () => { batcher.enqueueRequest({ operation }).subscribe({}); try { - expect(batcher.queuedRequests.get('')!.length).toBe(1); + expect(batcher.queuedRequests.get('')!.requests.length).toBe(1); } catch (e) { reject(e); } @@ -409,6 +404,107 @@ describe('OperationBatcher', () => { ); }); + itAsync('should cancel single query in queue when unsubscribing', (resolve, reject) => { + const data = { + lastName: 'Ever', + firstName: 'Greatest', + }; + const batcher = new OperationBatcher({ + batchInterval: 10, + batchHandler: () => + new Observable(observer => { + observer.next([{data}]); + setTimeout(observer.complete.bind(observer)); + }), + }); + const query = gql` + query { + author { + firstName + lastName + } + } + `; + const operation: Operation = createOperation({}, {query}); + + batcher.enqueueRequest({operation}).subscribe(() => reject('next should never be called')).unsubscribe(); + + expect(batcher.queuedRequests.get('')).toBeUndefined(); + resolve(); + }); + + itAsync('should cancel single query in queue with multiple subscriptions', (resolve, reject) => { + const data = { + lastName: 'Ever', + firstName: 'Greatest', + }; + const batcher = new OperationBatcher({ + batchInterval: 10, + batchHandler: () => + new Observable(observer => { + observer.next([{data}]); + setTimeout(observer.complete.bind(observer)); + }), + }); + const query = gql` + query { + author { + firstName + lastName + } + } + `; + const operation: Operation = createOperation({}, {query}); + + const observable = batcher.enqueueRequest({operation}); + const sub1 = observable.subscribe(() => reject('next should never be called')); + expect(batcher.queuedRequests.get('')).not.toBeUndefined(); + expect(batcher.queuedRequests.get('')?.requests[0].subscribers).toBe(1); + + const sub2 = observable.subscribe(() => reject('next should never be called')); + expect(batcher.queuedRequests.get('')).not.toBeUndefined(); + expect(batcher.queuedRequests.get('')?.requests[0].subscribers).toBe(2); + + sub1.unsubscribe(); + expect(batcher.queuedRequests.get('')).not.toBeUndefined(); + expect(batcher.queuedRequests.get('')?.requests[0].subscribers).toBe(1); + + sub2.unsubscribe(); + expect(batcher.queuedRequests.get('')).toBeUndefined(); + resolve(); + }); + + itAsync('should cancel single query in flight when unsubscribing', (resolve, reject) => { + let subscription: ObservableSubscription | undefined; + + const batcher = new OperationBatcher({ + batchInterval: 10, + batchHandler: () => + new Observable(() => { + // Instead of typically starting an XHR, we trigger the unsubscription from outside + setTimeout(() => subscription?.unsubscribe(), 5); + + return () => { + expect(batcher.queuedRequests.get('')).toBeUndefined(); + resolve(); + } + }), + }); + + const query = gql` + query { + author { + firstName + lastName + } + } + `; + + const operation: Operation = createOperation({}, {query}); + + subscription = batcher.enqueueRequest({operation}).subscribe(() => reject('next should never be called')); + }); + itAsync('should correctly batch multiple queries', (resolve, reject) => { const data = { lastName: 'Ever', @@ -441,7 +537,7 @@ describe('OperationBatcher', () => { batcher.enqueueRequest({ operation }).subscribe({}); batcher.enqueueRequest({ operation: operation2 }).subscribe({}); try { - expect(batcher.queuedRequests.get('')!.length).toBe(2); + expect(batcher.queuedRequests.get('')!.requests.length).toBe(2); } catch (e) { reject(e); } @@ -450,7 +546,7 @@ describe('OperationBatcher', () => { // The batch shouldn't be fired yet, so we can add one more request. batcher.enqueueRequest({ operation: operation3 }).subscribe({}); try { - expect(batcher.queuedRequests.get('')!.length).toBe(3); + expect(batcher.queuedRequests.get('')!.requests.length).toBe(3); } catch (e) { reject(e); } @@ -465,6 +561,59 @@ describe('OperationBatcher', () => { ); }); + itAsync('should cancel multiples queries in queue when unsubscribing and let pass still subscribed one', (resolve, reject) => { + const data2 = { + lastName: 'Hauser', + firstName: 'Evans', + }; + + const batcher = new OperationBatcher({ + batchInterval: 10, + batchHandler: () => + new Observable(observer => { + observer.next([{data: data2}]); + setTimeout(observer.complete.bind(observer)); + }), + }); + + const query = gql` + query { + author { + firstName + lastName + } + } + `; + + const operation: Operation = createOperation({}, {query}); + const operation2: Operation = createOperation({}, {query}); + const operation3: Operation = createOperation({}, {query}); + + const sub1 = batcher.enqueueRequest({operation}).subscribe(() => reject('next should never be called')); + batcher.enqueueRequest({operation: operation2}).subscribe(result => { + expect(result.data).toBe(data2); + + // The batch should've been fired by now. + expect(batcher.queuedRequests.get('')).toBeUndefined(); + + resolve(); + }); + + expect(batcher.queuedRequests.get('')!.requests.length).toBe(2); + + sub1.unsubscribe(); + expect(batcher.queuedRequests.get('')!.requests.length).toBe(1); + + setTimeout(() => { + // The batch shouldn't be fired yet, so we can add one more request. + const sub3 = batcher.enqueueRequest({operation: operation3}).subscribe(() => reject('next should never be called')); + expect(batcher.queuedRequests.get('')!.requests.length).toBe(2); + + sub3.unsubscribe(); + expect(batcher.queuedRequests.get('')!.requests.length).toBe(1); + }, 5); + }); + itAsync('should reject the promise if there is a network error', (resolve, reject) => { const query = gql` query { @@ -765,14 +914,14 @@ describe('BatchLink', () => { new BatchLink({ batchInterval: 1, //if batchKey does not work, then the batch size would be 3 - batchMax: 3, + batchMax: 2, batchHandler, batchKey, }), ]); let count = 0; - [1, 2, 3, 4].forEach(x => { + [1, 2, 3, 4].forEach(() => { execute(link, { query, }).subscribe({ diff --git a/src/link/batch/batching.ts b/src/link/batch/batching.ts index f2d7281bdec..de68ac53c56 100644 --- a/src/link/batch/batching.ts +++ b/src/link/batch/batching.ts @@ -1,5 +1,5 @@ -import { Operation, FetchResult, NextLink } from '../core'; -import { Observable } from '../../utilities'; +import {FetchResult, NextLink, Operation} from '../core'; +import {Observable, ObservableSubscription} from '../../utilities'; export type BatchHandler = ( operations: Operation[], @@ -9,14 +9,17 @@ export type BatchHandler = ( export interface BatchableRequest { operation: Operation; forward?: NextLink; +} +interface QueuedRequest extends BatchableRequest { // promise is created when the query fetch request is // added to the queue and is resolved once the result is back // from the server. observable?: Observable; - next?: Array<(result: FetchResult) => void>; - error?: Array<(error: Error) => void>; - complete?: Array<() => void>; + next: Array<(result: FetchResult) => void>; + error: Array<(error: Error) => void>; + complete: Array<() => void>; + subscribers: number; } // QueryBatcher doesn't fire requests immediately. Requests that were enqueued within @@ -25,7 +28,10 @@ export interface BatchableRequest { export class OperationBatcher { // Queue on which the QueryBatcher will operate on a per-tick basis. // Public only for testing - public queuedRequests: Map; + public readonly queuedRequests = new Map(); private scheduledBatchTimer: ReturnType; private batchDebounce?: boolean; @@ -37,19 +43,18 @@ export class OperationBatcher { private batchKey: (operation: Operation) => string; constructor({ - batchDebounce, - batchInterval, - batchMax, - batchHandler, - batchKey, - }: { + batchDebounce, + batchInterval, + batchMax, + batchHandler, + batchKey, + }: { batchDebounce?: boolean; batchInterval?: number; batchMax?: number; batchHandler: BatchHandler; batchKey?: (operation: Operation) => string; }) { - this.queuedRequests = new Map(); this.batchDebounce = batchDebounce; this.batchInterval = batchInterval; this.batchMax = batchMax || 0; @@ -58,38 +63,43 @@ export class OperationBatcher { } public enqueueRequest(request: BatchableRequest): Observable { - const requestCopy = { + const requestCopy: QueuedRequest = { ...request, + next: [], + error: [], + complete: [], + subscribers: 0, }; - let queued = false; const key = this.batchKey(request.operation); if (!requestCopy.observable) { requestCopy.observable = new Observable(observer => { if (!this.queuedRequests.has(key)) { - this.queuedRequests.set(key, []); + this.queuedRequests.set(key, {requests: []}); } + const queuedRequests = this.queuedRequests.get(key)!; - if (!queued) { - this.queuedRequests.get(key)!.push(requestCopy); - queued = true; + requestCopy.subscribers++; + if (requestCopy.subscribers === 1) { + queuedRequests.requests.push(requestCopy); } - //called for each subscriber, so need to save all listeners(next, error, complete) - requestCopy.next = requestCopy.next || []; - if (observer.next) requestCopy.next.push(observer.next.bind(observer)); + // called for each subscriber, so need to save all listeners (next, error, complete) + if (observer.next) { + requestCopy.next.push(observer.next.bind(observer)); + } - requestCopy.error = requestCopy.error || []; - if (observer.error) + if (observer.error) { requestCopy.error.push(observer.error.bind(observer)); + } - requestCopy.complete = requestCopy.complete || []; - if (observer.complete) + if (observer.complete) { requestCopy.complete.push(observer.complete.bind(observer)); + } // The first enqueued request triggers the queue consumption after `batchInterval` milliseconds. - if (this.queuedRequests.get(key)!.length === 1) { + if (queuedRequests.requests.length === 1) { this.scheduleQueueConsumption(key); } else if (this.batchDebounce) { clearTimeout(this.scheduledBatchTimer); @@ -97,9 +107,30 @@ export class OperationBatcher { } // When amount of requests reaches `batchMax`, trigger the queue consumption without waiting on the `batchInterval`. - if (this.queuedRequests.get(key)!.length === this.batchMax) { + if (queuedRequests.requests.length === this.batchMax) { this.consumeQueue(key); } + + return () => { + requestCopy.subscribers--; + + // If this is last subscriber for this request, remove request from queue + if (requestCopy.subscribers < 1) { + const index = queuedRequests.requests.indexOf(requestCopy); + if (index !== undefined && index > -1) { + queuedRequests.requests.splice(index, 1); + + // If this is last request from queue, remove queue entirely + if (queuedRequests.requests.length === 0) { + clearTimeout(this.scheduledBatchTimer); + this.queuedRequests.delete(key); + + // If queue was in flight, cancel it + queuedRequests.batchedSubscription?.unsubscribe(); + } + } + } + } }); } @@ -120,27 +151,24 @@ export class OperationBatcher { this.queuedRequests.delete(requestKey); - const requests: Operation[] = queuedRequests.map( - queuedRequest => queuedRequest.operation, - ); - - const forwards: (NextLink | undefined)[] = queuedRequests.map( - queuedRequest => queuedRequest.forward, - ); - + const operations: Operation[] = []; + const forwards: (NextLink | undefined)[] = []; const observables: (Observable | undefined)[] = []; - const nexts: any[] = []; - const errors: any[] = []; - const completes: any[] = []; - queuedRequests.forEach((batchableRequest, index) => { - observables.push(batchableRequest.observable); - nexts.push(batchableRequest.next); - errors.push(batchableRequest.error); - completes.push(batchableRequest.complete); + const nexts: Array<(result: FetchResult) => void>[] = []; + const errors: Array<(error: Error) => void>[] = []; + const completes: Array<() => void>[] = []; + + queuedRequests.requests.forEach(request => { + operations.push(request.operation); + forwards.push(request.forward); + observables.push(request.observable); + nexts.push(request.next); + errors.push(request.error); + completes.push(request.complete); }); const batchedObservable = - this.batchHandler(requests, forwards) || Observable.of(); + this.batchHandler(operations, forwards) || Observable.of(); const onError = (error: any) => { //each callback list in batch @@ -152,7 +180,7 @@ export class OperationBatcher { }); }; - batchedObservable.subscribe({ + queuedRequests.batchedSubscription = batchedObservable.subscribe({ next: results => { if (!Array.isArray(results)) { results = [results]; @@ -189,15 +217,11 @@ export class OperationBatcher { return observables; } - private scheduleQueueConsumption(key?: string): void { - const requestKey = key || ''; - this.scheduledBatchTimer = (setTimeout(() => { - if ( - this.queuedRequests.get(requestKey) && - this.queuedRequests.get(requestKey)!.length - ) { - this.consumeQueue(requestKey); + private scheduleQueueConsumption(key: string): void { + this.scheduledBatchTimer = setTimeout(() => { + if (this.queuedRequests.get(key)?.requests.length) { + this.consumeQueue(key); } - }, this.batchInterval)); + }, this.batchInterval); } } From 2d098ea43f278c6f715fa7a217cb04081969f5b3 Mon Sep 17 00:00:00 2001 From: Ben Newman Date: Tue, 11 Jan 2022 16:11:59 -0500 Subject: [PATCH 02/11] Assorted minor formatting/style tweaks. --- src/link/batch/__tests__/batchLink.ts | 99 ++++++++++++++------------- src/link/batch/batching.ts | 16 ++--- 2 files changed, 60 insertions(+), 55 deletions(-) diff --git a/src/link/batch/__tests__/batchLink.ts b/src/link/batch/__tests__/batchLink.ts index 165ed0db115..ad883bee745 100644 --- a/src/link/batch/__tests__/batchLink.ts +++ b/src/link/batch/__tests__/batchLink.ts @@ -1,12 +1,16 @@ import gql from 'graphql-tag'; -import {print} from 'graphql'; - -import {ApolloLink} from '../../core/ApolloLink'; -import {execute} from '../../core/execute'; -import {FetchResult, GraphQLRequest, Operation} from '../../core/types'; -import {Observable, ObservableSubscription} from '../../../utilities/observables/Observable'; -import {BatchableRequest, BatchHandler, BatchLink, OperationBatcher,} from '../batchLink'; -import {itAsync} from '../../../testing'; +import { print } from 'graphql'; + +import { ApolloLink, execute } from '../../core'; +import { Operation, FetchResult, GraphQLRequest } from '../../core/types'; +import { Observable } from '../../../utilities'; +import { itAsync } from '../../../testing'; +import { + BatchLink, + OperationBatcher, + BatchHandler, + BatchableRequest, +} from '../batchLink'; interface MockedResponse { request: GraphQLRequest; @@ -409,6 +413,7 @@ describe('OperationBatcher', () => { lastName: 'Ever', firstName: 'Greatest', }; + const batcher = new OperationBatcher({ batchInterval: 10, batchHandler: () => @@ -417,17 +422,19 @@ describe('OperationBatcher', () => { setTimeout(observer.complete.bind(observer)); }), }); - const query = gql` - query { - author { - firstName - lastName - } - } - `; - const operation: Operation = createOperation({}, {query}); - batcher.enqueueRequest({operation}).subscribe(() => reject('next should never be called')).unsubscribe(); + const query = gql` + query { + author { + firstName + lastName + } + } + `; + + batcher.enqueueRequest({ + operation: createOperation({}, { query }), + }).subscribe(() => reject('next should never be called')).unsubscribe(); expect(batcher.queuedRequests.get('')).toBeUndefined(); resolve(); @@ -446,14 +453,14 @@ describe('OperationBatcher', () => { setTimeout(observer.complete.bind(observer)); }), }); - const query = gql` - query { - author { - firstName - lastName - } - } - `; + const query = gql` + query { + author { + firstName + lastName + } + } + `; const operation: Operation = createOperation({}, {query}); const observable = batcher.enqueueRequest({operation}); @@ -475,8 +482,6 @@ describe('OperationBatcher', () => { }); itAsync('should cancel single query in flight when unsubscribing', (resolve, reject) => { - let subscription: ObservableSubscription | undefined; - const batcher = new OperationBatcher({ batchInterval: 10, batchHandler: () => @@ -487,22 +492,22 @@ describe('OperationBatcher', () => { return () => { expect(batcher.queuedRequests.get('')).toBeUndefined(); resolve(); - } + }; }), }); const query = gql` - query { - author { - firstName - lastName - } + query { + author { + firstName + lastName } + } `; - const operation: Operation = createOperation({}, {query}); - - subscription = batcher.enqueueRequest({operation}).subscribe(() => reject('next should never be called')); + const subscription = batcher.enqueueRequest({ + operation: createOperation({}, { query }), + }).subscribe(() => reject('next should never be called')); }); itAsync('should correctly batch multiple queries', (resolve, reject) => { @@ -561,7 +566,7 @@ describe('OperationBatcher', () => { ); }); - itAsync('should cancel multiples queries in queue when unsubscribing and let pass still subscribed one', (resolve, reject) => { + itAsync('should cancel multiple queries in queue when unsubscribing and let pass still subscribed one', (resolve, reject) => { const data2 = { lastName: 'Hauser', firstName: 'Evans', @@ -571,19 +576,19 @@ describe('OperationBatcher', () => { batchInterval: 10, batchHandler: () => new Observable(observer => { - observer.next([{data: data2}]); + observer.next([{ data: data2 }]); setTimeout(observer.complete.bind(observer)); }), }); - const query = gql` - query { - author { - firstName - lastName - } - } - `; + const query = gql` + query { + author { + firstName + lastName + } + } + `; const operation: Operation = createOperation({}, {query}); const operation2: Operation = createOperation({}, {query}); diff --git a/src/link/batch/batching.ts b/src/link/batch/batching.ts index de68ac53c56..e5bc14b9e21 100644 --- a/src/link/batch/batching.ts +++ b/src/link/batch/batching.ts @@ -1,5 +1,5 @@ -import {FetchResult, NextLink, Operation} from '../core'; -import {Observable, ObservableSubscription} from '../../utilities'; +import { Operation, FetchResult, NextLink } from '../core'; +import { Observable, ObservableSubscription } from '../../utilities'; export type BatchHandler = ( operations: Operation[], @@ -43,12 +43,12 @@ export class OperationBatcher { private batchKey: (operation: Operation) => string; constructor({ - batchDebounce, - batchInterval, - batchMax, - batchHandler, - batchKey, - }: { + batchDebounce, + batchInterval, + batchMax, + batchHandler, + batchKey, + }: { batchDebounce?: boolean; batchInterval?: number; batchMax?: number; From c9477a35186e8e364a1c935697f6861073335c81 Mon Sep 17 00:00:00 2001 From: Ben Newman Date: Tue, 11 Jan 2022 16:57:22 -0500 Subject: [PATCH 03/11] Restore BatchHandler field types. Since BatchHandler is an exported type, it seems appropriate to preserve its optional fields (with their expected types), then override those fields (next, error, complete) with non-optional versions in the QueuedRequest subtype. The field/types of QueuedRequest will end up the same either way. --- src/link/batch/batching.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/link/batch/batching.ts b/src/link/batch/batching.ts index e5bc14b9e21..b6f7f5d6ad7 100644 --- a/src/link/batch/batching.ts +++ b/src/link/batch/batching.ts @@ -9,13 +9,13 @@ export type BatchHandler = ( export interface BatchableRequest { operation: Operation; forward?: NextLink; + observable?: Observable; + next?: Array<(result: FetchResult) => void>; + error?: Array<(error: Error) => void>; + complete?: Array<() => void>; } interface QueuedRequest extends BatchableRequest { - // promise is created when the query fetch request is - // added to the queue and is resolved once the result is back - // from the server. - observable?: Observable; next: Array<(result: FetchResult) => void>; error: Array<(error: Error) => void>; complete: Array<() => void>; From 33a340f235a8f2c5a2572a8f5d866199fe709b04 Mon Sep 17 00:00:00 2001 From: Ben Newman Date: Tue, 11 Jan 2022 17:44:43 -0500 Subject: [PATCH 04/11] Use Sets to track batched requests and their subscribers. Since ECMAScript Set and Map preserve the order of their keys (by order of first insertion), they can often be used to keep track of queues or LRU usage chains, while also supporting constant-time deletion of keys (no need for indexOf/splice). --- src/link/batch/__tests__/batchLink.ts | 49 ++++++++++------- src/link/batch/batching.ts | 77 ++++++++++++++------------- 2 files changed, 68 insertions(+), 58 deletions(-) diff --git a/src/link/batch/__tests__/batchLink.ts b/src/link/batch/__tests__/batchLink.ts index ad883bee745..06e37d6853c 100644 --- a/src/link/batch/__tests__/batchLink.ts +++ b/src/link/batch/__tests__/batchLink.ts @@ -187,9 +187,9 @@ describe('OperationBatcher', () => { expect(batcher.queuedRequests.get('')).toBeUndefined(); batcher.enqueueRequest(request).subscribe({}); - expect(batcher.queuedRequests.get('')!.requests.length).toBe(1); + expect(batcher.queuedRequests.get('')!.requests.size).toBe(1); batcher.enqueueRequest(request).subscribe({}); - expect(batcher.queuedRequests.get('')!.requests.length).toBe(2); + expect(batcher.queuedRequests.get('')!.requests.size).toBe(2); }); describe('request queue', () => { @@ -303,7 +303,7 @@ describe('OperationBatcher', () => { }); try { - expect(myBatcher.queuedRequests.get('')!.requests.length).toBe(2); + expect(myBatcher.queuedRequests.get('')!.requests.size).toBe(2); const observables: ( | Observable | undefined)[] = myBatcher.consumeQueue()!; @@ -345,22 +345,22 @@ describe('OperationBatcher', () => { myBatcher.enqueueRequest({ operation }).subscribe({}); myBatcher.enqueueRequest({ operation }).subscribe({}); myBatcher.enqueueRequest({ operation }).subscribe({}); - expect(myBatcher.queuedRequests.get('')!.requests.length).toEqual(3); + expect(myBatcher.queuedRequests.get('')!.requests.size).toEqual(3); // 2. Run the timer halfway. jest.advanceTimersByTime(batchInterval / 2); - expect(myBatcher.queuedRequests.get('')!.requests.length).toEqual(3); + expect(myBatcher.queuedRequests.get('')!.requests.size).toEqual(3); // 3. Queue a 4th request, causing the timer to reset. myBatcher.enqueueRequest({ operation }).subscribe({}); - expect(myBatcher.queuedRequests.get('')!.requests.length).toEqual(4); + expect(myBatcher.queuedRequests.get('')!.requests.size).toEqual(4); // 4. Run the timer to batchInterval + 1, at this point, if debounce were // not set, the original 3 requests would have fired, but we expect // instead that the queries will instead fire at // (batchInterval + batchInterval / 2). jest.advanceTimersByTime(batchInterval / 2 + 1); - expect(myBatcher.queuedRequests.get('')!.requests.length).toEqual(4); + expect(myBatcher.queuedRequests.get('')!.requests.size).toEqual(4); // 5. Finally, run the timer to (batchInterval + batchInterval / 2) +1, // and expect the queue to be empty. @@ -395,7 +395,7 @@ describe('OperationBatcher', () => { batcher.enqueueRequest({ operation }).subscribe({}); try { - expect(batcher.queuedRequests.get('')!.requests.length).toBe(1); + expect(batcher.queuedRequests.get('')!.requests.size).toBe(1); } catch (e) { reject(e); } @@ -464,17 +464,26 @@ describe('OperationBatcher', () => { const operation: Operation = createOperation({}, {query}); const observable = batcher.enqueueRequest({operation}); + + const checkQueuedRequests = ( + expectedSubscriberCount: number, + ) => { + const queued = batcher.queuedRequests.get(''); + expect(queued).not.toBeUndefined(); + expect(queued!.requests.size).toBe(1); + queued!.requests.forEach(request => { + expect(request.subscribers.size).toBe(expectedSubscriberCount); + }); + }; + const sub1 = observable.subscribe(() => reject('next should never be called')); - expect(batcher.queuedRequests.get('')).not.toBeUndefined(); - expect(batcher.queuedRequests.get('')?.requests[0].subscribers).toBe(1); + checkQueuedRequests(1); const sub2 = observable.subscribe(() => reject('next should never be called')); - expect(batcher.queuedRequests.get('')).not.toBeUndefined(); - expect(batcher.queuedRequests.get('')?.requests[0].subscribers).toBe(2); + checkQueuedRequests(2); sub1.unsubscribe(); - expect(batcher.queuedRequests.get('')).not.toBeUndefined(); - expect(batcher.queuedRequests.get('')?.requests[0].subscribers).toBe(1); + checkQueuedRequests(1); sub2.unsubscribe(); expect(batcher.queuedRequests.get('')).toBeUndefined(); @@ -542,7 +551,7 @@ describe('OperationBatcher', () => { batcher.enqueueRequest({ operation }).subscribe({}); batcher.enqueueRequest({ operation: operation2 }).subscribe({}); try { - expect(batcher.queuedRequests.get('')!.requests.length).toBe(2); + expect(batcher.queuedRequests.get('')!.requests.size).toBe(2); } catch (e) { reject(e); } @@ -551,7 +560,7 @@ describe('OperationBatcher', () => { // The batch shouldn't be fired yet, so we can add one more request. batcher.enqueueRequest({ operation: operation3 }).subscribe({}); try { - expect(batcher.queuedRequests.get('')!.requests.length).toBe(3); + expect(batcher.queuedRequests.get('')!.requests.size).toBe(3); } catch (e) { reject(e); } @@ -604,18 +613,18 @@ describe('OperationBatcher', () => { resolve(); }); - expect(batcher.queuedRequests.get('')!.requests.length).toBe(2); + expect(batcher.queuedRequests.get('')!.requests.size).toBe(2); sub1.unsubscribe(); - expect(batcher.queuedRequests.get('')!.requests.length).toBe(1); + expect(batcher.queuedRequests.get('')!.requests.size).toBe(1); setTimeout(() => { // The batch shouldn't be fired yet, so we can add one more request. const sub3 = batcher.enqueueRequest({operation: operation3}).subscribe(() => reject('next should never be called')); - expect(batcher.queuedRequests.get('')!.requests.length).toBe(2); + expect(batcher.queuedRequests.get('')!.requests.size).toBe(2); sub3.unsubscribe(); - expect(batcher.queuedRequests.get('')!.requests.length).toBe(1); + expect(batcher.queuedRequests.get('')!.requests.size).toBe(1); }, 5); }); diff --git a/src/link/batch/batching.ts b/src/link/batch/batching.ts index b6f7f5d6ad7..edbbcd57d4b 100644 --- a/src/link/batch/batching.ts +++ b/src/link/batch/batching.ts @@ -19,7 +19,7 @@ interface QueuedRequest extends BatchableRequest { next: Array<(result: FetchResult) => void>; error: Array<(error: Error) => void>; complete: Array<() => void>; - subscribers: number; + subscribers: Set; } // QueryBatcher doesn't fire requests immediately. Requests that were enqueued within @@ -29,8 +29,8 @@ export class OperationBatcher { // Queue on which the QueryBatcher will operate on a per-tick basis. // Public only for testing public readonly queuedRequests = new Map; + batchedSubscription?: ObservableSubscription; }>(); private scheduledBatchTimer: ReturnType; @@ -68,21 +68,28 @@ export class OperationBatcher { next: [], error: [], complete: [], - subscribers: 0, + subscribers: new Set, }; const key = this.batchKey(request.operation); if (!requestCopy.observable) { requestCopy.observable = new Observable(observer => { - if (!this.queuedRequests.has(key)) { - this.queuedRequests.set(key, {requests: []}); + let queued = this.queuedRequests.get(key)!; + if (!queued) { + this.queuedRequests.set(key, queued = { + requests: new Set, + }); } - const queuedRequests = this.queuedRequests.get(key)!; - requestCopy.subscribers++; - if (requestCopy.subscribers === 1) { - queuedRequests.requests.push(requestCopy); + // These booleans seem to me (@benjamn) like they might always be the + // same (and thus we could do with only one of them), but I'm not 100% + // sure about that. + const isFirstEnqueuedRequest = queued.requests.size === 0; + const isFirstSubscriber = requestCopy.subscribers.size === 0; + requestCopy.subscribers.add(observer); + if (isFirstSubscriber) { + queued.requests.add(requestCopy); } // called for each subscriber, so need to save all listeners (next, error, complete) @@ -99,7 +106,7 @@ export class OperationBatcher { } // The first enqueued request triggers the queue consumption after `batchInterval` milliseconds. - if (queuedRequests.requests.length === 1) { + if (isFirstEnqueuedRequest) { this.scheduleQueueConsumption(key); } else if (this.batchDebounce) { clearTimeout(this.scheduledBatchTimer); @@ -107,27 +114,21 @@ export class OperationBatcher { } // When amount of requests reaches `batchMax`, trigger the queue consumption without waiting on the `batchInterval`. - if (queuedRequests.requests.length === this.batchMax) { + if (queued.requests.size === this.batchMax) { this.consumeQueue(key); } return () => { - requestCopy.subscribers--; - // If this is last subscriber for this request, remove request from queue - if (requestCopy.subscribers < 1) { - const index = queuedRequests.requests.indexOf(requestCopy); - if (index !== undefined && index > -1) { - queuedRequests.requests.splice(index, 1); - - // If this is last request from queue, remove queue entirely - if (queuedRequests.requests.length === 0) { - clearTimeout(this.scheduledBatchTimer); - this.queuedRequests.delete(key); - - // If queue was in flight, cancel it - queuedRequests.batchedSubscription?.unsubscribe(); - } + if (requestCopy.subscribers.delete(observer) && + requestCopy.subscribers.size < 1) { + // If this is last request from queue, remove queue entirely + if (queued.requests.delete(requestCopy) && + queued.requests.size < 1) { + clearTimeout(this.scheduledBatchTimer); + this.queuedRequests.delete(key); + // If queue was in flight, cancel it + queued.batchedSubscription?.unsubscribe(); } } } @@ -140,16 +141,12 @@ export class OperationBatcher { // Consumes the queue. // Returns a list of promises (one for each query). public consumeQueue( - key?: string, + key: string = '', ): (Observable | undefined)[] | undefined { - const requestKey = key || ''; - const queuedRequests = this.queuedRequests.get(requestKey); - - if (!queuedRequests) { - return; - } + const queued = this.queuedRequests.get(key); + if (!queued) return; - this.queuedRequests.delete(requestKey); + this.queuedRequests.delete(key); const operations: Operation[] = []; const forwards: (NextLink | undefined)[] = []; @@ -158,7 +155,11 @@ export class OperationBatcher { const errors: Array<(error: Error) => void>[] = []; const completes: Array<() => void>[] = []; - queuedRequests.requests.forEach(request => { + // Even though queued.requests is a Set, it preserves the order of first + // insertion when iterating (per ECMAScript specification), so these + // requests will be handled in the order they were enqueued (minus any + // deleted ones). + queued.requests.forEach(request => { operations.push(request.operation); forwards.push(request.forward); observables.push(request.observable); @@ -180,7 +181,7 @@ export class OperationBatcher { }); }; - queuedRequests.batchedSubscription = batchedObservable.subscribe({ + queued.batchedSubscription = batchedObservable.subscribe({ next: results => { if (!Array.isArray(results)) { results = [results]; @@ -219,7 +220,7 @@ export class OperationBatcher { private scheduleQueueConsumption(key: string): void { this.scheduledBatchTimer = setTimeout(() => { - if (this.queuedRequests.get(key)?.requests.length) { + if (this.queuedRequests.get(key)?.requests.size) { this.consumeQueue(key); } }, this.batchInterval); From e17ccab05e71a3215dabf200192cfc9f913ebe0f Mon Sep 17 00:00:00 2001 From: Ben Newman Date: Tue, 11 Jan 2022 18:04:51 -0500 Subject: [PATCH 05/11] Use RequestBatch as value type of queuedRequests Map. --- src/link/batch/__tests__/batchLink.ts | 36 ++++++++++---------- src/link/batch/batching.ts | 47 +++++++++++++-------------- 2 files changed, 40 insertions(+), 43 deletions(-) diff --git a/src/link/batch/__tests__/batchLink.ts b/src/link/batch/__tests__/batchLink.ts index 06e37d6853c..d4203381156 100644 --- a/src/link/batch/__tests__/batchLink.ts +++ b/src/link/batch/__tests__/batchLink.ts @@ -187,9 +187,9 @@ describe('OperationBatcher', () => { expect(batcher.queuedRequests.get('')).toBeUndefined(); batcher.enqueueRequest(request).subscribe({}); - expect(batcher.queuedRequests.get('')!.requests.size).toBe(1); + expect(batcher.queuedRequests.get('')!.size).toBe(1); batcher.enqueueRequest(request).subscribe({}); - expect(batcher.queuedRequests.get('')!.requests.size).toBe(2); + expect(batcher.queuedRequests.get('')!.size).toBe(2); }); describe('request queue', () => { @@ -303,7 +303,7 @@ describe('OperationBatcher', () => { }); try { - expect(myBatcher.queuedRequests.get('')!.requests.size).toBe(2); + expect(myBatcher.queuedRequests.get('')!.size).toBe(2); const observables: ( | Observable | undefined)[] = myBatcher.consumeQueue()!; @@ -345,22 +345,22 @@ describe('OperationBatcher', () => { myBatcher.enqueueRequest({ operation }).subscribe({}); myBatcher.enqueueRequest({ operation }).subscribe({}); myBatcher.enqueueRequest({ operation }).subscribe({}); - expect(myBatcher.queuedRequests.get('')!.requests.size).toEqual(3); + expect(myBatcher.queuedRequests.get('')!.size).toEqual(3); // 2. Run the timer halfway. jest.advanceTimersByTime(batchInterval / 2); - expect(myBatcher.queuedRequests.get('')!.requests.size).toEqual(3); + expect(myBatcher.queuedRequests.get('')!.size).toEqual(3); // 3. Queue a 4th request, causing the timer to reset. myBatcher.enqueueRequest({ operation }).subscribe({}); - expect(myBatcher.queuedRequests.get('')!.requests.size).toEqual(4); + expect(myBatcher.queuedRequests.get('')!.size).toEqual(4); // 4. Run the timer to batchInterval + 1, at this point, if debounce were // not set, the original 3 requests would have fired, but we expect // instead that the queries will instead fire at // (batchInterval + batchInterval / 2). jest.advanceTimersByTime(batchInterval / 2 + 1); - expect(myBatcher.queuedRequests.get('')!.requests.size).toEqual(4); + expect(myBatcher.queuedRequests.get('')!.size).toEqual(4); // 5. Finally, run the timer to (batchInterval + batchInterval / 2) +1, // and expect the queue to be empty. @@ -395,7 +395,7 @@ describe('OperationBatcher', () => { batcher.enqueueRequest({ operation }).subscribe({}); try { - expect(batcher.queuedRequests.get('')!.requests.size).toBe(1); + expect(batcher.queuedRequests.get('')!.size).toBe(1); } catch (e) { reject(e); } @@ -468,10 +468,10 @@ describe('OperationBatcher', () => { const checkQueuedRequests = ( expectedSubscriberCount: number, ) => { - const queued = batcher.queuedRequests.get(''); - expect(queued).not.toBeUndefined(); - expect(queued!.requests.size).toBe(1); - queued!.requests.forEach(request => { + const batch = batcher.queuedRequests.get(''); + expect(batch).not.toBeUndefined(); + expect(batch!.size).toBe(1); + batch!.forEach(request => { expect(request.subscribers.size).toBe(expectedSubscriberCount); }); }; @@ -551,7 +551,7 @@ describe('OperationBatcher', () => { batcher.enqueueRequest({ operation }).subscribe({}); batcher.enqueueRequest({ operation: operation2 }).subscribe({}); try { - expect(batcher.queuedRequests.get('')!.requests.size).toBe(2); + expect(batcher.queuedRequests.get('')!.size).toBe(2); } catch (e) { reject(e); } @@ -560,7 +560,7 @@ describe('OperationBatcher', () => { // The batch shouldn't be fired yet, so we can add one more request. batcher.enqueueRequest({ operation: operation3 }).subscribe({}); try { - expect(batcher.queuedRequests.get('')!.requests.size).toBe(3); + expect(batcher.queuedRequests.get('')!.size).toBe(3); } catch (e) { reject(e); } @@ -613,18 +613,18 @@ describe('OperationBatcher', () => { resolve(); }); - expect(batcher.queuedRequests.get('')!.requests.size).toBe(2); + expect(batcher.queuedRequests.get('')!.size).toBe(2); sub1.unsubscribe(); - expect(batcher.queuedRequests.get('')!.requests.size).toBe(1); + expect(batcher.queuedRequests.get('')!.size).toBe(1); setTimeout(() => { // The batch shouldn't be fired yet, so we can add one more request. const sub3 = batcher.enqueueRequest({operation: operation3}).subscribe(() => reject('next should never be called')); - expect(batcher.queuedRequests.get('')!.requests.size).toBe(2); + expect(batcher.queuedRequests.get('')!.size).toBe(2); sub3.unsubscribe(); - expect(batcher.queuedRequests.get('')!.requests.size).toBe(1); + expect(batcher.queuedRequests.get('')!.size).toBe(1); }, 5); }); diff --git a/src/link/batch/batching.ts b/src/link/batch/batching.ts index edbbcd57d4b..dfc8661d045 100644 --- a/src/link/batch/batching.ts +++ b/src/link/batch/batching.ts @@ -22,16 +22,19 @@ interface QueuedRequest extends BatchableRequest { subscribers: Set; } +// Batches are primarily a Set, but may have other optional +// properties, such as batch.subscription. +type RequestBatch = Set & { + subscription?: ObservableSubscription; +} + // QueryBatcher doesn't fire requests immediately. Requests that were enqueued within // a certain amount of time (configurable through `batchInterval`) will be batched together // into one query. export class OperationBatcher { // Queue on which the QueryBatcher will operate on a per-tick basis. // Public only for testing - public readonly queuedRequests = new Map; - batchedSubscription?: ObservableSubscription; - }>(); + public readonly queuedRequests = new Map(); private scheduledBatchTimer: ReturnType; private batchDebounce?: boolean; @@ -75,21 +78,17 @@ export class OperationBatcher { if (!requestCopy.observable) { requestCopy.observable = new Observable(observer => { - let queued = this.queuedRequests.get(key)!; - if (!queued) { - this.queuedRequests.set(key, queued = { - requests: new Set, - }); - } + let batch = this.queuedRequests.get(key)!; + if (!batch) this.queuedRequests.set(key, batch = new Set); // These booleans seem to me (@benjamn) like they might always be the // same (and thus we could do with only one of them), but I'm not 100% // sure about that. - const isFirstEnqueuedRequest = queued.requests.size === 0; + const isFirstEnqueuedRequest = batch.size === 0; const isFirstSubscriber = requestCopy.subscribers.size === 0; requestCopy.subscribers.add(observer); if (isFirstSubscriber) { - queued.requests.add(requestCopy); + batch.add(requestCopy); } // called for each subscriber, so need to save all listeners (next, error, complete) @@ -114,7 +113,7 @@ export class OperationBatcher { } // When amount of requests reaches `batchMax`, trigger the queue consumption without waiting on the `batchInterval`. - if (queued.requests.size === this.batchMax) { + if (batch.size === this.batchMax) { this.consumeQueue(key); } @@ -123,12 +122,11 @@ export class OperationBatcher { if (requestCopy.subscribers.delete(observer) && requestCopy.subscribers.size < 1) { // If this is last request from queue, remove queue entirely - if (queued.requests.delete(requestCopy) && - queued.requests.size < 1) { + if (batch.delete(requestCopy) && batch.size < 1) { clearTimeout(this.scheduledBatchTimer); this.queuedRequests.delete(key); // If queue was in flight, cancel it - queued.batchedSubscription?.unsubscribe(); + batch.subscription?.unsubscribe(); } } } @@ -143,8 +141,8 @@ export class OperationBatcher { public consumeQueue( key: string = '', ): (Observable | undefined)[] | undefined { - const queued = this.queuedRequests.get(key); - if (!queued) return; + const batch = this.queuedRequests.get(key); + if (!batch) return; this.queuedRequests.delete(key); @@ -155,11 +153,10 @@ export class OperationBatcher { const errors: Array<(error: Error) => void>[] = []; const completes: Array<() => void>[] = []; - // Even though queued.requests is a Set, it preserves the order of first - // insertion when iterating (per ECMAScript specification), so these - // requests will be handled in the order they were enqueued (minus any - // deleted ones). - queued.requests.forEach(request => { + // Even though batch is a Set, it preserves the order of first insertion + // when iterating (per ECMAScript specification), so these requests will be + // handled in the order they were enqueued (minus any deleted ones). + batch.forEach(request => { operations.push(request.operation); forwards.push(request.forward); observables.push(request.observable); @@ -181,7 +178,7 @@ export class OperationBatcher { }); }; - queued.batchedSubscription = batchedObservable.subscribe({ + batch.subscription = batchedObservable.subscribe({ next: results => { if (!Array.isArray(results)) { results = [results]; @@ -220,7 +217,7 @@ export class OperationBatcher { private scheduleQueueConsumption(key: string): void { this.scheduledBatchTimer = setTimeout(() => { - if (this.queuedRequests.get(key)?.requests.size) { + if (this.queuedRequests.get(key)?.size) { this.consumeQueue(key); } }, this.batchInterval); From 3def764fe3be355bb5b098738c09a7684a4ee626 Mon Sep 17 00:00:00 2001 From: Ben Newman Date: Tue, 11 Jan 2022 18:09:20 -0500 Subject: [PATCH 06/11] Privatize and rename batcher.queuedRequests property. Since we're already changing the type of this pseudo-private property, it seems like a good idea also to privatize/rename it for real, so we can test the hypothesis "nobody is using this field directly" by trialing these changes in beta releases of Apollo Client v3.6. --- src/link/batch/__tests__/batchLink.ts | 58 +++++++++++++-------------- src/link/batch/batching.ts | 15 ++++--- 2 files changed, 36 insertions(+), 37 deletions(-) diff --git a/src/link/batch/__tests__/batchLink.ts b/src/link/batch/__tests__/batchLink.ts index d4203381156..175c34c9126 100644 --- a/src/link/batch/__tests__/batchLink.ts +++ b/src/link/batch/__tests__/batchLink.ts @@ -157,11 +157,11 @@ describe('OperationBatcher', () => { batchKey: () => 'yo', }); - expect(batcher.queuedRequests.get('')).toBeUndefined(); - expect(batcher.queuedRequests.get('yo')).toBeUndefined(); + expect(batcher["batchesByKey"].get('')).toBeUndefined(); + expect(batcher["batchesByKey"].get('yo')).toBeUndefined(); batcher.consumeQueue(); - expect(batcher.queuedRequests.get('')).toBeUndefined(); - expect(batcher.queuedRequests.get('yo')).toBeUndefined(); + expect(batcher["batchesByKey"].get('')).toBeUndefined(); + expect(batcher["batchesByKey"].get('yo')).toBeUndefined(); }); it('should be able to add to the queue', () => { @@ -185,11 +185,11 @@ describe('OperationBatcher', () => { operation: createOperation({}, { query }), }; - expect(batcher.queuedRequests.get('')).toBeUndefined(); + expect(batcher["batchesByKey"].get('')).toBeUndefined(); batcher.enqueueRequest(request).subscribe({}); - expect(batcher.queuedRequests.get('')!.size).toBe(1); + expect(batcher["batchesByKey"].get('')!.size).toBe(1); batcher.enqueueRequest(request).subscribe({}); - expect(batcher.queuedRequests.get('')!.size).toBe(2); + expect(batcher["batchesByKey"].get('')!.size).toBe(2); }); describe('request queue', () => { @@ -232,7 +232,7 @@ describe('OperationBatcher', () => { myBatcher.enqueueRequest({ operation }).subscribe( terminatingCheck(resolve, reject, (resultObj: any) => { - expect(myBatcher.queuedRequests.get('')).toBeUndefined(); + expect(myBatcher["batchesByKey"].get('')).toBeUndefined(); expect(resultObj).toEqual({ data }); }), ); @@ -303,11 +303,11 @@ describe('OperationBatcher', () => { }); try { - expect(myBatcher.queuedRequests.get('')!.size).toBe(2); + expect(myBatcher["batchesByKey"].get('')!.size).toBe(2); const observables: ( | Observable | undefined)[] = myBatcher.consumeQueue()!; - expect(myBatcher.queuedRequests.get('')).toBeUndefined(); + expect(myBatcher["batchesByKey"].get('')).toBeUndefined(); expect(observables.length).toBe(2); } catch (e) { reject(e); @@ -345,27 +345,27 @@ describe('OperationBatcher', () => { myBatcher.enqueueRequest({ operation }).subscribe({}); myBatcher.enqueueRequest({ operation }).subscribe({}); myBatcher.enqueueRequest({ operation }).subscribe({}); - expect(myBatcher.queuedRequests.get('')!.size).toEqual(3); + expect(myBatcher["batchesByKey"].get('')!.size).toEqual(3); // 2. Run the timer halfway. jest.advanceTimersByTime(batchInterval / 2); - expect(myBatcher.queuedRequests.get('')!.size).toEqual(3); + expect(myBatcher["batchesByKey"].get('')!.size).toEqual(3); // 3. Queue a 4th request, causing the timer to reset. myBatcher.enqueueRequest({ operation }).subscribe({}); - expect(myBatcher.queuedRequests.get('')!.size).toEqual(4); + expect(myBatcher["batchesByKey"].get('')!.size).toEqual(4); // 4. Run the timer to batchInterval + 1, at this point, if debounce were // not set, the original 3 requests would have fired, but we expect // instead that the queries will instead fire at // (batchInterval + batchInterval / 2). jest.advanceTimersByTime(batchInterval / 2 + 1); - expect(myBatcher.queuedRequests.get('')!.size).toEqual(4); + expect(myBatcher["batchesByKey"].get('')!.size).toEqual(4); // 5. Finally, run the timer to (batchInterval + batchInterval / 2) +1, // and expect the queue to be empty. jest.advanceTimersByTime(batchInterval / 2); - expect(myBatcher.queuedRequests.size).toEqual(0); + expect(myBatcher["batchesByKey"].size).toEqual(0); resolve(); }); }); @@ -395,14 +395,14 @@ describe('OperationBatcher', () => { batcher.enqueueRequest({ operation }).subscribe({}); try { - expect(batcher.queuedRequests.get('')!.size).toBe(1); + expect(batcher["batchesByKey"].get('')!.size).toBe(1); } catch (e) { reject(e); } setTimeout( terminatingCheck(resolve, reject, () => { - expect(batcher.queuedRequests.get('')).toBeUndefined(); + expect(batcher["batchesByKey"].get('')).toBeUndefined(); }), 20, ); @@ -436,7 +436,7 @@ describe('OperationBatcher', () => { operation: createOperation({}, { query }), }).subscribe(() => reject('next should never be called')).unsubscribe(); - expect(batcher.queuedRequests.get('')).toBeUndefined(); + expect(batcher["batchesByKey"].get('')).toBeUndefined(); resolve(); }); @@ -468,7 +468,7 @@ describe('OperationBatcher', () => { const checkQueuedRequests = ( expectedSubscriberCount: number, ) => { - const batch = batcher.queuedRequests.get(''); + const batch = batcher["batchesByKey"].get(''); expect(batch).not.toBeUndefined(); expect(batch!.size).toBe(1); batch!.forEach(request => { @@ -486,7 +486,7 @@ describe('OperationBatcher', () => { checkQueuedRequests(1); sub2.unsubscribe(); - expect(batcher.queuedRequests.get('')).toBeUndefined(); + expect(batcher["batchesByKey"].get('')).toBeUndefined(); resolve(); }); @@ -499,7 +499,7 @@ describe('OperationBatcher', () => { setTimeout(() => subscription?.unsubscribe(), 5); return () => { - expect(batcher.queuedRequests.get('')).toBeUndefined(); + expect(batcher["batchesByKey"].get('')).toBeUndefined(); resolve(); }; }), @@ -551,7 +551,7 @@ describe('OperationBatcher', () => { batcher.enqueueRequest({ operation }).subscribe({}); batcher.enqueueRequest({ operation: operation2 }).subscribe({}); try { - expect(batcher.queuedRequests.get('')!.size).toBe(2); + expect(batcher["batchesByKey"].get('')!.size).toBe(2); } catch (e) { reject(e); } @@ -560,7 +560,7 @@ describe('OperationBatcher', () => { // The batch shouldn't be fired yet, so we can add one more request. batcher.enqueueRequest({ operation: operation3 }).subscribe({}); try { - expect(batcher.queuedRequests.get('')!.size).toBe(3); + expect(batcher["batchesByKey"].get('')!.size).toBe(3); } catch (e) { reject(e); } @@ -569,7 +569,7 @@ describe('OperationBatcher', () => { setTimeout( terminatingCheck(resolve, reject, () => { // The batch should've been fired by now. - expect(batcher.queuedRequests.get('')).toBeUndefined(); + expect(batcher["batchesByKey"].get('')).toBeUndefined(); }), 20, ); @@ -608,23 +608,23 @@ describe('OperationBatcher', () => { expect(result.data).toBe(data2); // The batch should've been fired by now. - expect(batcher.queuedRequests.get('')).toBeUndefined(); + expect(batcher["batchesByKey"].get('')).toBeUndefined(); resolve(); }); - expect(batcher.queuedRequests.get('')!.size).toBe(2); + expect(batcher["batchesByKey"].get('')!.size).toBe(2); sub1.unsubscribe(); - expect(batcher.queuedRequests.get('')!.size).toBe(1); + expect(batcher["batchesByKey"].get('')!.size).toBe(1); setTimeout(() => { // The batch shouldn't be fired yet, so we can add one more request. const sub3 = batcher.enqueueRequest({operation: operation3}).subscribe(() => reject('next should never be called')); - expect(batcher.queuedRequests.get('')!.size).toBe(2); + expect(batcher["batchesByKey"].get('')!.size).toBe(2); sub3.unsubscribe(); - expect(batcher.queuedRequests.get('')!.size).toBe(1); + expect(batcher["batchesByKey"].get('')!.size).toBe(1); }, 5); }); diff --git a/src/link/batch/batching.ts b/src/link/batch/batching.ts index dfc8661d045..b630d6abe20 100644 --- a/src/link/batch/batching.ts +++ b/src/link/batch/batching.ts @@ -33,8 +33,7 @@ type RequestBatch = Set & { // into one query. export class OperationBatcher { // Queue on which the QueryBatcher will operate on a per-tick basis. - // Public only for testing - public readonly queuedRequests = new Map(); + private batchesByKey = new Map(); private scheduledBatchTimer: ReturnType; private batchDebounce?: boolean; @@ -78,8 +77,8 @@ export class OperationBatcher { if (!requestCopy.observable) { requestCopy.observable = new Observable(observer => { - let batch = this.queuedRequests.get(key)!; - if (!batch) this.queuedRequests.set(key, batch = new Set); + let batch = this.batchesByKey.get(key)!; + if (!batch) this.batchesByKey.set(key, batch = new Set); // These booleans seem to me (@benjamn) like they might always be the // same (and thus we could do with only one of them), but I'm not 100% @@ -124,7 +123,7 @@ export class OperationBatcher { // If this is last request from queue, remove queue entirely if (batch.delete(requestCopy) && batch.size < 1) { clearTimeout(this.scheduledBatchTimer); - this.queuedRequests.delete(key); + this.batchesByKey.delete(key); // If queue was in flight, cancel it batch.subscription?.unsubscribe(); } @@ -141,10 +140,10 @@ export class OperationBatcher { public consumeQueue( key: string = '', ): (Observable | undefined)[] | undefined { - const batch = this.queuedRequests.get(key); + const batch = this.batchesByKey.get(key); if (!batch) return; - this.queuedRequests.delete(key); + this.batchesByKey.delete(key); const operations: Operation[] = []; const forwards: (NextLink | undefined)[] = []; @@ -217,7 +216,7 @@ export class OperationBatcher { private scheduleQueueConsumption(key: string): void { this.scheduledBatchTimer = setTimeout(() => { - if (this.queuedRequests.get(key)?.size) { + if (this.batchesByKey.get(key)?.size) { this.consumeQueue(key); } }, this.batchInterval); From 51881d6f215ccb224e2273b51453fe4f3f0da7e6 Mon Sep 17 00:00:00 2001 From: Ben Newman Date: Tue, 11 Jan 2022 18:52:52 -0500 Subject: [PATCH 07/11] Return early in consumeQueue if no requests to process. --- src/link/batch/batching.ts | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/link/batch/batching.ts b/src/link/batch/batching.ts index b630d6abe20..b31b0be0892 100644 --- a/src/link/batch/batching.ts +++ b/src/link/batch/batching.ts @@ -141,9 +141,12 @@ export class OperationBatcher { key: string = '', ): (Observable | undefined)[] | undefined { const batch = this.batchesByKey.get(key); - if (!batch) return; - + // Delete this batch and process it below. this.batchesByKey.delete(key); + if (!batch || !batch.size) { + // No requests to be processed. + return; + } const operations: Operation[] = []; const forwards: (NextLink | undefined)[] = []; @@ -216,9 +219,7 @@ export class OperationBatcher { private scheduleQueueConsumption(key: string): void { this.scheduledBatchTimer = setTimeout(() => { - if (this.batchesByKey.get(key)?.size) { - this.consumeQueue(key); - } + this.consumeQueue(key); }, this.batchInterval); } } From e70e072e51cb2d7479dcb3f7b71c6fcf91fec16c Mon Sep 17 00:00:00 2001 From: Adrien Crivelli Date: Wed, 19 Jan 2022 11:48:28 +0100 Subject: [PATCH 08/11] Add parens for `new` for consistency --- src/link/batch/batching.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/link/batch/batching.ts b/src/link/batch/batching.ts index b31b0be0892..2f570e53123 100644 --- a/src/link/batch/batching.ts +++ b/src/link/batch/batching.ts @@ -70,7 +70,7 @@ export class OperationBatcher { next: [], error: [], complete: [], - subscribers: new Set, + subscribers: new Set(), }; const key = this.batchKey(request.operation); @@ -78,7 +78,7 @@ export class OperationBatcher { if (!requestCopy.observable) { requestCopy.observable = new Observable(observer => { let batch = this.batchesByKey.get(key)!; - if (!batch) this.batchesByKey.set(key, batch = new Set); + if (!batch) this.batchesByKey.set(key, batch = new Set()); // These booleans seem to me (@benjamn) like they might always be the // same (and thus we could do with only one of them), but I'm not 100% From 29fb39ed033614aa2d441034acda32c51545171b Mon Sep 17 00:00:00 2001 From: Adrien Crivelli Date: Wed, 19 Jan 2022 11:54:59 +0100 Subject: [PATCH 09/11] Remove unused BatchHandler properties Because those are never used inside this project, and they are very likely to break our indices based callback calls and throw an `server returned results with length...`. --- src/link/batch/batching.ts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/link/batch/batching.ts b/src/link/batch/batching.ts index 2f570e53123..90979255621 100644 --- a/src/link/batch/batching.ts +++ b/src/link/batch/batching.ts @@ -9,13 +9,10 @@ export type BatchHandler = ( export interface BatchableRequest { operation: Operation; forward?: NextLink; - observable?: Observable; - next?: Array<(result: FetchResult) => void>; - error?: Array<(error: Error) => void>; - complete?: Array<() => void>; } interface QueuedRequest extends BatchableRequest { + observable?: Observable; next: Array<(result: FetchResult) => void>; error: Array<(error: Error) => void>; complete: Array<() => void>; From fb04ca484fa152f0a333c2c68ddbba4377d2b6b4 Mon Sep 17 00:00:00 2001 From: Adrien Crivelli Date: Wed, 19 Jan 2022 11:59:55 +0100 Subject: [PATCH 10/11] Reduce usage of `any` --- src/link/batch/batching.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/link/batch/batching.ts b/src/link/batch/batching.ts index 90979255621..3f76afdb7fa 100644 --- a/src/link/batch/batching.ts +++ b/src/link/batch/batching.ts @@ -167,12 +167,12 @@ export class OperationBatcher { const batchedObservable = this.batchHandler(operations, forwards) || Observable.of(); - const onError = (error: any) => { + const onError = (error: Error) => { //each callback list in batch errors.forEach(rejecters => { if (rejecters) { //each subscriber to request - rejecters.forEach((e: any) => e(error)); + rejecters.forEach((e) => e(error)); } }); }; @@ -196,7 +196,7 @@ export class OperationBatcher { results.forEach((result, index) => { if (nexts[index]) { - nexts[index].forEach((next: any) => next(result)); + nexts[index].forEach((next) => next(result)); } }); }, @@ -205,7 +205,7 @@ export class OperationBatcher { completes.forEach(complete => { if (complete) { //each subscriber to request - complete.forEach((c: any) => c()); + complete.forEach((c) => c()); } }); }, From 98f9bc2f4167bd60a315b65ab69d2379c98b22a0 Mon Sep 17 00:00:00 2001 From: Adrien Crivelli Date: Wed, 19 Jan 2022 12:07:43 +0100 Subject: [PATCH 11/11] Avoid duplication in types --- src/link/batch/batching.ts | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/link/batch/batching.ts b/src/link/batch/batching.ts index 3f76afdb7fa..549df9c9728 100644 --- a/src/link/batch/batching.ts +++ b/src/link/batch/batching.ts @@ -1,4 +1,4 @@ -import { Operation, FetchResult, NextLink } from '../core'; +import { FetchResult, NextLink, Operation } from '../core'; import { Observable, ObservableSubscription } from '../../utilities'; export type BatchHandler = ( @@ -11,7 +11,7 @@ export interface BatchableRequest { forward?: NextLink; } -interface QueuedRequest extends BatchableRequest { +type QueuedRequest = BatchableRequest & { observable?: Observable; next: Array<(result: FetchResult) => void>; error: Array<(error: Error) => void>; @@ -145,12 +145,12 @@ export class OperationBatcher { return; } - const operations: Operation[] = []; - const forwards: (NextLink | undefined)[] = []; - const observables: (Observable | undefined)[] = []; - const nexts: Array<(result: FetchResult) => void>[] = []; - const errors: Array<(error: Error) => void>[] = []; - const completes: Array<() => void>[] = []; + const operations: QueuedRequest['operation'][] = []; + const forwards: QueuedRequest['forward'][] = []; + const observables: QueuedRequest['observable'][] = []; + const nexts: QueuedRequest['next'][] = []; + const errors: QueuedRequest['error'][] = []; + const completes: QueuedRequest['complete'][] = []; // Even though batch is a Set, it preserves the order of first insertion // when iterating (per ECMAScript specification), so these requests will be