From 0e68db1e64dc6996615c322b72a875ee797d0f71 Mon Sep 17 00:00:00 2001 From: Ben Newman Date: Tue, 7 Jun 2022 14:00:29 -0400 Subject: [PATCH] Temporarily revert "Merge pull request #9248 from PowerKiKi/batch-cancel" This reverts commit cefd24c69b6a5df251bb55547f2c4f3a1a9c5b91, reversing changes made to d98f1dec2e2afcf32e686baf449cc941510975b3. I plan to publish this version in the next v3.7.0 beta release, then immediately reinstate most of this functionality, with a narrower attempt to solve issue #9773 in a follow-up beta release, to allow folks to compare the behavior of those two versions. --- .../batch-http/__tests__/batchHttpLink.ts | 2 +- src/link/batch/__tests__/batchLink.ts | 217 +++--------------- src/link/batch/batching.ts | 155 ++++++------- 3 files changed, 96 insertions(+), 278 deletions(-) diff --git a/src/link/batch-http/__tests__/batchHttpLink.ts b/src/link/batch-http/__tests__/batchHttpLink.ts index 2a43ed1a25b..2a711c95bd0 100644 --- a/src/link/batch-http/__tests__/batchHttpLink.ts +++ b/src/link/batch-http/__tests__/batchHttpLink.ts @@ -193,7 +193,7 @@ describe('BatchHttpLink', () => { }, batchInterval: 1, //if batchKey does not work, then the batch size would be 3 - batchMax: 2, + batchMax: 3, batchKey, }), ]); diff --git a/src/link/batch/__tests__/batchLink.ts b/src/link/batch/__tests__/batchLink.ts index 175c34c9126..78cdefabbee 100644 --- a/src/link/batch/__tests__/batchLink.ts +++ b/src/link/batch/__tests__/batchLink.ts @@ -1,16 +1,17 @@ import gql from 'graphql-tag'; import { print } from 'graphql'; -import { ApolloLink, execute } from '../../core'; +import { ApolloLink } from '../../core/ApolloLink'; +import { execute } from '../../core/execute'; import { Operation, FetchResult, GraphQLRequest } from '../../core/types'; -import { Observable } from '../../../utilities'; -import { itAsync } from '../../../testing'; +import { Observable } from '../../../utilities/observables/Observable'; import { BatchLink, OperationBatcher, BatchHandler, BatchableRequest, } from '../batchLink'; +import { itAsync } from '../../../testing'; interface MockedResponse { request: GraphQLRequest; @@ -26,7 +27,7 @@ function getKey(operation: GraphQLRequest) { return JSON.stringify([operationName, query, variables]); } -function createOperation( +export function createOperation( starting: any, operation: GraphQLRequest, ): Operation { @@ -157,11 +158,11 @@ describe('OperationBatcher', () => { batchKey: () => 'yo', }); - expect(batcher["batchesByKey"].get('')).toBeUndefined(); - expect(batcher["batchesByKey"].get('yo')).toBeUndefined(); + expect(batcher.queuedRequests.get('')).toBeUndefined(); + expect(batcher.queuedRequests.get('yo')).toBeUndefined(); batcher.consumeQueue(); - expect(batcher["batchesByKey"].get('')).toBeUndefined(); - expect(batcher["batchesByKey"].get('yo')).toBeUndefined(); + expect(batcher.queuedRequests.get('')).toBeUndefined(); + expect(batcher.queuedRequests.get('yo')).toBeUndefined(); }); it('should be able to add to the queue', () => { @@ -185,11 +186,11 @@ describe('OperationBatcher', () => { operation: createOperation({}, { query }), }; - expect(batcher["batchesByKey"].get('')).toBeUndefined(); + expect(batcher.queuedRequests.get('')).toBeUndefined(); batcher.enqueueRequest(request).subscribe({}); - expect(batcher["batchesByKey"].get('')!.size).toBe(1); + expect(batcher.queuedRequests.get('')!.length).toBe(1); batcher.enqueueRequest(request).subscribe({}); - expect(batcher["batchesByKey"].get('')!.size).toBe(2); + expect(batcher.queuedRequests.get('')!.length).toBe(2); }); describe('request queue', () => { @@ -232,7 +233,7 @@ describe('OperationBatcher', () => { myBatcher.enqueueRequest({ operation }).subscribe( terminatingCheck(resolve, reject, (resultObj: any) => { - expect(myBatcher["batchesByKey"].get('')).toBeUndefined(); + expect(myBatcher.queuedRequests.get('')).toBeUndefined(); expect(resultObj).toEqual({ data }); }), ); @@ -303,11 +304,11 @@ describe('OperationBatcher', () => { }); try { - expect(myBatcher["batchesByKey"].get('')!.size).toBe(2); + expect(myBatcher.queuedRequests.get('')!.length).toBe(2); const observables: ( | Observable | undefined)[] = myBatcher.consumeQueue()!; - expect(myBatcher["batchesByKey"].get('')).toBeUndefined(); + expect(myBatcher.queuedRequests.get('')).toBeUndefined(); expect(observables.length).toBe(2); } catch (e) { reject(e); @@ -345,27 +346,27 @@ describe('OperationBatcher', () => { myBatcher.enqueueRequest({ operation }).subscribe({}); myBatcher.enqueueRequest({ operation }).subscribe({}); myBatcher.enqueueRequest({ operation }).subscribe({}); - expect(myBatcher["batchesByKey"].get('')!.size).toEqual(3); + expect(myBatcher.queuedRequests.get('')!.length).toEqual(3); // 2. Run the timer halfway. jest.advanceTimersByTime(batchInterval / 2); - expect(myBatcher["batchesByKey"].get('')!.size).toEqual(3); + expect(myBatcher.queuedRequests.get('')!.length).toEqual(3); // 3. Queue a 4th request, causing the timer to reset. myBatcher.enqueueRequest({ operation }).subscribe({}); - expect(myBatcher["batchesByKey"].get('')!.size).toEqual(4); + expect(myBatcher.queuedRequests.get('')!.length).toEqual(4); // 4. Run the timer to batchInterval + 1, at this point, if debounce were // not set, the original 3 requests would have fired, but we expect // instead that the queries will instead fire at // (batchInterval + batchInterval / 2). jest.advanceTimersByTime(batchInterval / 2 + 1); - expect(myBatcher["batchesByKey"].get('')!.size).toEqual(4); + expect(myBatcher.queuedRequests.get('')!.length).toEqual(4); // 5. Finally, run the timer to (batchInterval + batchInterval / 2) +1, // and expect the queue to be empty. jest.advanceTimersByTime(batchInterval / 2); - expect(myBatcher["batchesByKey"].size).toEqual(0); + expect(myBatcher.queuedRequests.size).toEqual(0); resolve(); }); }); @@ -395,130 +396,19 @@ describe('OperationBatcher', () => { batcher.enqueueRequest({ operation }).subscribe({}); try { - expect(batcher["batchesByKey"].get('')!.size).toBe(1); + expect(batcher.queuedRequests.get('')!.length).toBe(1); } catch (e) { reject(e); } setTimeout( terminatingCheck(resolve, reject, () => { - expect(batcher["batchesByKey"].get('')).toBeUndefined(); + expect(batcher.queuedRequests.get('')).toBeUndefined(); }), 20, ); }); - itAsync('should cancel single query in queue when unsubscribing', (resolve, reject) => { - const data = { - lastName: 'Ever', - firstName: 'Greatest', - }; - - const batcher = new OperationBatcher({ - batchInterval: 10, - batchHandler: () => - new Observable(observer => { - observer.next([{data}]); - setTimeout(observer.complete.bind(observer)); - }), - }); - - const query = gql` - query { - author { - firstName - lastName - } - } - `; - - batcher.enqueueRequest({ - operation: createOperation({}, { query }), - }).subscribe(() => reject('next should never be called')).unsubscribe(); - - expect(batcher["batchesByKey"].get('')).toBeUndefined(); - resolve(); - }); - - itAsync('should cancel single query in queue with multiple subscriptions', (resolve, reject) => { - const data = { - lastName: 'Ever', - firstName: 'Greatest', - }; - const batcher = new OperationBatcher({ - batchInterval: 10, - batchHandler: () => - new Observable(observer => { - observer.next([{data}]); - setTimeout(observer.complete.bind(observer)); - }), - }); - const query = gql` - query { - author { - firstName - lastName - } - } - `; - const operation: Operation = createOperation({}, {query}); - - const observable = batcher.enqueueRequest({operation}); - - const checkQueuedRequests = ( - expectedSubscriberCount: number, - ) => { - const batch = batcher["batchesByKey"].get(''); - expect(batch).not.toBeUndefined(); - expect(batch!.size).toBe(1); - batch!.forEach(request => { - expect(request.subscribers.size).toBe(expectedSubscriberCount); - }); - }; - - const sub1 = observable.subscribe(() => reject('next should never be called')); - checkQueuedRequests(1); - - const sub2 = observable.subscribe(() => reject('next should never be called')); - checkQueuedRequests(2); - - sub1.unsubscribe(); - checkQueuedRequests(1); - - sub2.unsubscribe(); - expect(batcher["batchesByKey"].get('')).toBeUndefined(); - resolve(); - }); - - itAsync('should cancel single query in flight when unsubscribing', (resolve, reject) => { - const batcher = new OperationBatcher({ - batchInterval: 10, - batchHandler: () => - new Observable(() => { - // Instead of typically starting an XHR, we trigger the unsubscription from outside - setTimeout(() => subscription?.unsubscribe(), 5); - - return () => { - expect(batcher["batchesByKey"].get('')).toBeUndefined(); - resolve(); - }; - }), - }); - - const query = gql` - query { - author { - firstName - lastName - } - } - `; - - const subscription = batcher.enqueueRequest({ - operation: createOperation({}, { query }), - }).subscribe(() => reject('next should never be called')); - }); - itAsync('should correctly batch multiple queries', (resolve, reject) => { const data = { lastName: 'Ever', @@ -551,7 +441,7 @@ describe('OperationBatcher', () => { batcher.enqueueRequest({ operation }).subscribe({}); batcher.enqueueRequest({ operation: operation2 }).subscribe({}); try { - expect(batcher["batchesByKey"].get('')!.size).toBe(2); + expect(batcher.queuedRequests.get('')!.length).toBe(2); } catch (e) { reject(e); } @@ -560,7 +450,7 @@ describe('OperationBatcher', () => { // The batch shouldn't be fired yet, so we can add one more request. batcher.enqueueRequest({ operation: operation3 }).subscribe({}); try { - expect(batcher["batchesByKey"].get('')!.size).toBe(3); + expect(batcher.queuedRequests.get('')!.length).toBe(3); } catch (e) { reject(e); } @@ -569,65 +459,12 @@ describe('OperationBatcher', () => { setTimeout( terminatingCheck(resolve, reject, () => { // The batch should've been fired by now. - expect(batcher["batchesByKey"].get('')).toBeUndefined(); + expect(batcher.queuedRequests.get('')).toBeUndefined(); }), 20, ); }); - itAsync('should cancel multiple queries in queue when unsubscribing and let pass still subscribed one', (resolve, reject) => { - const data2 = { - lastName: 'Hauser', - firstName: 'Evans', - }; - - const batcher = new OperationBatcher({ - batchInterval: 10, - batchHandler: () => - new Observable(observer => { - observer.next([{ data: data2 }]); - setTimeout(observer.complete.bind(observer)); - }), - }); - - const query = gql` - query { - author { - firstName - lastName - } - } - `; - - const operation: Operation = createOperation({}, {query}); - const operation2: Operation = createOperation({}, {query}); - const operation3: Operation = createOperation({}, {query}); - - const sub1 = batcher.enqueueRequest({operation}).subscribe(() => reject('next should never be called')); - batcher.enqueueRequest({operation: operation2}).subscribe(result => { - expect(result.data).toBe(data2); - - // The batch should've been fired by now. - expect(batcher["batchesByKey"].get('')).toBeUndefined(); - - resolve(); - }); - - expect(batcher["batchesByKey"].get('')!.size).toBe(2); - - sub1.unsubscribe(); - expect(batcher["batchesByKey"].get('')!.size).toBe(1); - - setTimeout(() => { - // The batch shouldn't be fired yet, so we can add one more request. - const sub3 = batcher.enqueueRequest({operation: operation3}).subscribe(() => reject('next should never be called')); - expect(batcher["batchesByKey"].get('')!.size).toBe(2); - - sub3.unsubscribe(); - expect(batcher["batchesByKey"].get('')!.size).toBe(1); - }, 5); - }); - itAsync('should reject the promise if there is a network error', (resolve, reject) => { const query = gql` query { @@ -928,14 +765,14 @@ describe('BatchLink', () => { new BatchLink({ batchInterval: 1, //if batchKey does not work, then the batch size would be 3 - batchMax: 2, + batchMax: 3, batchHandler, batchKey, }), ]); let count = 0; - [1, 2, 3, 4].forEach(() => { + [1, 2, 3, 4].forEach(x => { execute(link, { query, }).subscribe({ diff --git a/src/link/batch/batching.ts b/src/link/batch/batching.ts index 549df9c9728..f2d7281bdec 100644 --- a/src/link/batch/batching.ts +++ b/src/link/batch/batching.ts @@ -1,5 +1,5 @@ -import { FetchResult, NextLink, Operation } from '../core'; -import { Observable, ObservableSubscription } from '../../utilities'; +import { Operation, FetchResult, NextLink } from '../core'; +import { Observable } from '../../utilities'; export type BatchHandler = ( operations: Operation[], @@ -9,20 +9,14 @@ export type BatchHandler = ( export interface BatchableRequest { operation: Operation; forward?: NextLink; -} -type QueuedRequest = BatchableRequest & { + // promise is created when the query fetch request is + // added to the queue and is resolved once the result is back + // from the server. observable?: Observable; - next: Array<(result: FetchResult) => void>; - error: Array<(error: Error) => void>; - complete: Array<() => void>; - subscribers: Set; -} - -// Batches are primarily a Set, but may have other optional -// properties, such as batch.subscription. -type RequestBatch = Set & { - subscription?: ObservableSubscription; + next?: Array<(result: FetchResult) => void>; + error?: Array<(error: Error) => void>; + complete?: Array<() => void>; } // QueryBatcher doesn't fire requests immediately. Requests that were enqueued within @@ -30,7 +24,8 @@ type RequestBatch = Set & { // into one query. export class OperationBatcher { // Queue on which the QueryBatcher will operate on a per-tick basis. - private batchesByKey = new Map(); + // Public only for testing + public queuedRequests: Map; private scheduledBatchTimer: ReturnType; private batchDebounce?: boolean; @@ -54,6 +49,7 @@ export class OperationBatcher { batchHandler: BatchHandler; batchKey?: (operation: Operation) => string; }) { + this.queuedRequests = new Map(); this.batchDebounce = batchDebounce; this.batchInterval = batchInterval; this.batchMax = batchMax || 0; @@ -62,46 +58,38 @@ export class OperationBatcher { } public enqueueRequest(request: BatchableRequest): Observable { - const requestCopy: QueuedRequest = { + const requestCopy = { ...request, - next: [], - error: [], - complete: [], - subscribers: new Set(), }; + let queued = false; const key = this.batchKey(request.operation); if (!requestCopy.observable) { requestCopy.observable = new Observable(observer => { - let batch = this.batchesByKey.get(key)!; - if (!batch) this.batchesByKey.set(key, batch = new Set()); - - // These booleans seem to me (@benjamn) like they might always be the - // same (and thus we could do with only one of them), but I'm not 100% - // sure about that. - const isFirstEnqueuedRequest = batch.size === 0; - const isFirstSubscriber = requestCopy.subscribers.size === 0; - requestCopy.subscribers.add(observer); - if (isFirstSubscriber) { - batch.add(requestCopy); + if (!this.queuedRequests.has(key)) { + this.queuedRequests.set(key, []); } - // called for each subscriber, so need to save all listeners (next, error, complete) - if (observer.next) { - requestCopy.next.push(observer.next.bind(observer)); + if (!queued) { + this.queuedRequests.get(key)!.push(requestCopy); + queued = true; } - if (observer.error) { + //called for each subscriber, so need to save all listeners(next, error, complete) + requestCopy.next = requestCopy.next || []; + if (observer.next) requestCopy.next.push(observer.next.bind(observer)); + + requestCopy.error = requestCopy.error || []; + if (observer.error) requestCopy.error.push(observer.error.bind(observer)); - } - if (observer.complete) { + requestCopy.complete = requestCopy.complete || []; + if (observer.complete) requestCopy.complete.push(observer.complete.bind(observer)); - } // The first enqueued request triggers the queue consumption after `batchInterval` milliseconds. - if (isFirstEnqueuedRequest) { + if (this.queuedRequests.get(key)!.length === 1) { this.scheduleQueueConsumption(key); } else if (this.batchDebounce) { clearTimeout(this.scheduledBatchTimer); @@ -109,23 +97,9 @@ export class OperationBatcher { } // When amount of requests reaches `batchMax`, trigger the queue consumption without waiting on the `batchInterval`. - if (batch.size === this.batchMax) { + if (this.queuedRequests.get(key)!.length === this.batchMax) { this.consumeQueue(key); } - - return () => { - // If this is last subscriber for this request, remove request from queue - if (requestCopy.subscribers.delete(observer) && - requestCopy.subscribers.size < 1) { - // If this is last request from queue, remove queue entirely - if (batch.delete(requestCopy) && batch.size < 1) { - clearTimeout(this.scheduledBatchTimer); - this.batchesByKey.delete(key); - // If queue was in flight, cancel it - batch.subscription?.unsubscribe(); - } - } - } }); } @@ -135,49 +109,50 @@ export class OperationBatcher { // Consumes the queue. // Returns a list of promises (one for each query). public consumeQueue( - key: string = '', + key?: string, ): (Observable | undefined)[] | undefined { - const batch = this.batchesByKey.get(key); - // Delete this batch and process it below. - this.batchesByKey.delete(key); - if (!batch || !batch.size) { - // No requests to be processed. + const requestKey = key || ''; + const queuedRequests = this.queuedRequests.get(requestKey); + + if (!queuedRequests) { return; } - const operations: QueuedRequest['operation'][] = []; - const forwards: QueuedRequest['forward'][] = []; - const observables: QueuedRequest['observable'][] = []; - const nexts: QueuedRequest['next'][] = []; - const errors: QueuedRequest['error'][] = []; - const completes: QueuedRequest['complete'][] = []; - - // Even though batch is a Set, it preserves the order of first insertion - // when iterating (per ECMAScript specification), so these requests will be - // handled in the order they were enqueued (minus any deleted ones). - batch.forEach(request => { - operations.push(request.operation); - forwards.push(request.forward); - observables.push(request.observable); - nexts.push(request.next); - errors.push(request.error); - completes.push(request.complete); + this.queuedRequests.delete(requestKey); + + const requests: Operation[] = queuedRequests.map( + queuedRequest => queuedRequest.operation, + ); + + const forwards: (NextLink | undefined)[] = queuedRequests.map( + queuedRequest => queuedRequest.forward, + ); + + const observables: (Observable | undefined)[] = []; + const nexts: any[] = []; + const errors: any[] = []; + const completes: any[] = []; + queuedRequests.forEach((batchableRequest, index) => { + observables.push(batchableRequest.observable); + nexts.push(batchableRequest.next); + errors.push(batchableRequest.error); + completes.push(batchableRequest.complete); }); const batchedObservable = - this.batchHandler(operations, forwards) || Observable.of(); + this.batchHandler(requests, forwards) || Observable.of(); - const onError = (error: Error) => { + const onError = (error: any) => { //each callback list in batch errors.forEach(rejecters => { if (rejecters) { //each subscriber to request - rejecters.forEach((e) => e(error)); + rejecters.forEach((e: any) => e(error)); } }); }; - batch.subscription = batchedObservable.subscribe({ + batchedObservable.subscribe({ next: results => { if (!Array.isArray(results)) { results = [results]; @@ -196,7 +171,7 @@ export class OperationBatcher { results.forEach((result, index) => { if (nexts[index]) { - nexts[index].forEach((next) => next(result)); + nexts[index].forEach((next: any) => next(result)); } }); }, @@ -205,7 +180,7 @@ export class OperationBatcher { completes.forEach(complete => { if (complete) { //each subscriber to request - complete.forEach((c) => c()); + complete.forEach((c: any) => c()); } }); }, @@ -214,9 +189,15 @@ export class OperationBatcher { return observables; } - private scheduleQueueConsumption(key: string): void { - this.scheduledBatchTimer = setTimeout(() => { - this.consumeQueue(key); - }, this.batchInterval); + private scheduleQueueConsumption(key?: string): void { + const requestKey = key || ''; + this.scheduledBatchTimer = (setTimeout(() => { + if ( + this.queuedRequests.get(requestKey) && + this.queuedRequests.get(requestKey)!.length + ) { + this.consumeQueue(requestKey); + } + }, this.batchInterval)); } }