diff --git a/src/link/batch-http/__tests__/batchHttpLink.ts b/src/link/batch-http/__tests__/batchHttpLink.ts index 2a711c95bd0..2a43ed1a25b 100644 --- a/src/link/batch-http/__tests__/batchHttpLink.ts +++ b/src/link/batch-http/__tests__/batchHttpLink.ts @@ -193,7 +193,7 @@ describe('BatchHttpLink', () => { }, batchInterval: 1, //if batchKey does not work, then the batch size would be 3 - batchMax: 3, + batchMax: 2, batchKey, }), ]); diff --git a/src/link/batch/__tests__/batchLink.ts b/src/link/batch/__tests__/batchLink.ts index 78cdefabbee..175c34c9126 100644 --- a/src/link/batch/__tests__/batchLink.ts +++ b/src/link/batch/__tests__/batchLink.ts @@ -1,17 +1,16 @@ import gql from 'graphql-tag'; import { print } from 'graphql'; -import { ApolloLink } from '../../core/ApolloLink'; -import { execute } from '../../core/execute'; +import { ApolloLink, execute } from '../../core'; import { Operation, FetchResult, GraphQLRequest } from '../../core/types'; -import { Observable } from '../../../utilities/observables/Observable'; +import { Observable } from '../../../utilities'; +import { itAsync } from '../../../testing'; import { BatchLink, OperationBatcher, BatchHandler, BatchableRequest, } from '../batchLink'; -import { itAsync } from '../../../testing'; interface MockedResponse { request: GraphQLRequest; @@ -27,7 +26,7 @@ function getKey(operation: GraphQLRequest) { return JSON.stringify([operationName, query, variables]); } -export function createOperation( +function createOperation( starting: any, operation: GraphQLRequest, ): Operation { @@ -158,11 +157,11 @@ describe('OperationBatcher', () => { batchKey: () => 'yo', }); - expect(batcher.queuedRequests.get('')).toBeUndefined(); - expect(batcher.queuedRequests.get('yo')).toBeUndefined(); + expect(batcher["batchesByKey"].get('')).toBeUndefined(); + expect(batcher["batchesByKey"].get('yo')).toBeUndefined(); batcher.consumeQueue(); - expect(batcher.queuedRequests.get('')).toBeUndefined(); - expect(batcher.queuedRequests.get('yo')).toBeUndefined(); + expect(batcher["batchesByKey"].get('')).toBeUndefined(); + expect(batcher["batchesByKey"].get('yo')).toBeUndefined(); }); it('should be able to add to the queue', () => { @@ -186,11 +185,11 @@ describe('OperationBatcher', () => { operation: createOperation({}, { query }), }; - expect(batcher.queuedRequests.get('')).toBeUndefined(); + expect(batcher["batchesByKey"].get('')).toBeUndefined(); batcher.enqueueRequest(request).subscribe({}); - expect(batcher.queuedRequests.get('')!.length).toBe(1); + expect(batcher["batchesByKey"].get('')!.size).toBe(1); batcher.enqueueRequest(request).subscribe({}); - expect(batcher.queuedRequests.get('')!.length).toBe(2); + expect(batcher["batchesByKey"].get('')!.size).toBe(2); }); describe('request queue', () => { @@ -233,7 +232,7 @@ describe('OperationBatcher', () => { myBatcher.enqueueRequest({ operation }).subscribe( terminatingCheck(resolve, reject, (resultObj: any) => { - expect(myBatcher.queuedRequests.get('')).toBeUndefined(); + expect(myBatcher["batchesByKey"].get('')).toBeUndefined(); expect(resultObj).toEqual({ data }); }), ); @@ -304,11 +303,11 @@ describe('OperationBatcher', () => { }); try { - expect(myBatcher.queuedRequests.get('')!.length).toBe(2); + expect(myBatcher["batchesByKey"].get('')!.size).toBe(2); const observables: ( | Observable | undefined)[] = myBatcher.consumeQueue()!; - expect(myBatcher.queuedRequests.get('')).toBeUndefined(); + expect(myBatcher["batchesByKey"].get('')).toBeUndefined(); expect(observables.length).toBe(2); } catch (e) { reject(e); @@ -346,27 +345,27 @@ describe('OperationBatcher', () => { myBatcher.enqueueRequest({ operation }).subscribe({}); myBatcher.enqueueRequest({ operation }).subscribe({}); myBatcher.enqueueRequest({ operation }).subscribe({}); - expect(myBatcher.queuedRequests.get('')!.length).toEqual(3); + expect(myBatcher["batchesByKey"].get('')!.size).toEqual(3); // 2. Run the timer halfway. jest.advanceTimersByTime(batchInterval / 2); - expect(myBatcher.queuedRequests.get('')!.length).toEqual(3); + expect(myBatcher["batchesByKey"].get('')!.size).toEqual(3); // 3. Queue a 4th request, causing the timer to reset. myBatcher.enqueueRequest({ operation }).subscribe({}); - expect(myBatcher.queuedRequests.get('')!.length).toEqual(4); + expect(myBatcher["batchesByKey"].get('')!.size).toEqual(4); // 4. Run the timer to batchInterval + 1, at this point, if debounce were // not set, the original 3 requests would have fired, but we expect // instead that the queries will instead fire at // (batchInterval + batchInterval / 2). jest.advanceTimersByTime(batchInterval / 2 + 1); - expect(myBatcher.queuedRequests.get('')!.length).toEqual(4); + expect(myBatcher["batchesByKey"].get('')!.size).toEqual(4); // 5. Finally, run the timer to (batchInterval + batchInterval / 2) +1, // and expect the queue to be empty. jest.advanceTimersByTime(batchInterval / 2); - expect(myBatcher.queuedRequests.size).toEqual(0); + expect(myBatcher["batchesByKey"].size).toEqual(0); resolve(); }); }); @@ -396,19 +395,130 @@ describe('OperationBatcher', () => { batcher.enqueueRequest({ operation }).subscribe({}); try { - expect(batcher.queuedRequests.get('')!.length).toBe(1); + expect(batcher["batchesByKey"].get('')!.size).toBe(1); } catch (e) { reject(e); } setTimeout( terminatingCheck(resolve, reject, () => { - expect(batcher.queuedRequests.get('')).toBeUndefined(); + expect(batcher["batchesByKey"].get('')).toBeUndefined(); }), 20, ); }); + itAsync('should cancel single query in queue when unsubscribing', (resolve, reject) => { + const data = { + lastName: 'Ever', + firstName: 'Greatest', + }; + + const batcher = new OperationBatcher({ + batchInterval: 10, + batchHandler: () => + new Observable(observer => { + observer.next([{data}]); + setTimeout(observer.complete.bind(observer)); + }), + }); + + const query = gql` + query { + author { + firstName + lastName + } + } + `; + + batcher.enqueueRequest({ + operation: createOperation({}, { query }), + }).subscribe(() => reject('next should never be called')).unsubscribe(); + + expect(batcher["batchesByKey"].get('')).toBeUndefined(); + resolve(); + }); + + itAsync('should cancel single query in queue with multiple subscriptions', (resolve, reject) => { + const data = { + lastName: 'Ever', + firstName: 'Greatest', + }; + const batcher = new OperationBatcher({ + batchInterval: 10, + batchHandler: () => + new Observable(observer => { + observer.next([{data}]); + setTimeout(observer.complete.bind(observer)); + }), + }); + const query = gql` + query { + author { + firstName + lastName + } + } + `; + const operation: Operation = createOperation({}, {query}); + + const observable = batcher.enqueueRequest({operation}); + + const checkQueuedRequests = ( + expectedSubscriberCount: number, + ) => { + const batch = batcher["batchesByKey"].get(''); + expect(batch).not.toBeUndefined(); + expect(batch!.size).toBe(1); + batch!.forEach(request => { + expect(request.subscribers.size).toBe(expectedSubscriberCount); + }); + }; + + const sub1 = observable.subscribe(() => reject('next should never be called')); + checkQueuedRequests(1); + + const sub2 = observable.subscribe(() => reject('next should never be called')); + checkQueuedRequests(2); + + sub1.unsubscribe(); + checkQueuedRequests(1); + + sub2.unsubscribe(); + expect(batcher["batchesByKey"].get('')).toBeUndefined(); + resolve(); + }); + + itAsync('should cancel single query in flight when unsubscribing', (resolve, reject) => { + const batcher = new OperationBatcher({ + batchInterval: 10, + batchHandler: () => + new Observable(() => { + // Instead of typically starting an XHR, we trigger the unsubscription from outside + setTimeout(() => subscription?.unsubscribe(), 5); + + return () => { + expect(batcher["batchesByKey"].get('')).toBeUndefined(); + resolve(); + }; + }), + }); + + const query = gql` + query { + author { + firstName + lastName + } + } + `; + + const subscription = batcher.enqueueRequest({ + operation: createOperation({}, { query }), + }).subscribe(() => reject('next should never be called')); + }); + itAsync('should correctly batch multiple queries', (resolve, reject) => { const data = { lastName: 'Ever', @@ -441,7 +551,7 @@ describe('OperationBatcher', () => { batcher.enqueueRequest({ operation }).subscribe({}); batcher.enqueueRequest({ operation: operation2 }).subscribe({}); try { - expect(batcher.queuedRequests.get('')!.length).toBe(2); + expect(batcher["batchesByKey"].get('')!.size).toBe(2); } catch (e) { reject(e); } @@ -450,7 +560,7 @@ describe('OperationBatcher', () => { // The batch shouldn't be fired yet, so we can add one more request. batcher.enqueueRequest({ operation: operation3 }).subscribe({}); try { - expect(batcher.queuedRequests.get('')!.length).toBe(3); + expect(batcher["batchesByKey"].get('')!.size).toBe(3); } catch (e) { reject(e); } @@ -459,12 +569,65 @@ describe('OperationBatcher', () => { setTimeout( terminatingCheck(resolve, reject, () => { // The batch should've been fired by now. - expect(batcher.queuedRequests.get('')).toBeUndefined(); + expect(batcher["batchesByKey"].get('')).toBeUndefined(); }), 20, ); }); + itAsync('should cancel multiple queries in queue when unsubscribing and let pass still subscribed one', (resolve, reject) => { + const data2 = { + lastName: 'Hauser', + firstName: 'Evans', + }; + + const batcher = new OperationBatcher({ + batchInterval: 10, + batchHandler: () => + new Observable(observer => { + observer.next([{ data: data2 }]); + setTimeout(observer.complete.bind(observer)); + }), + }); + + const query = gql` + query { + author { + firstName + lastName + } + } + `; + + const operation: Operation = createOperation({}, {query}); + const operation2: Operation = createOperation({}, {query}); + const operation3: Operation = createOperation({}, {query}); + + const sub1 = batcher.enqueueRequest({operation}).subscribe(() => reject('next should never be called')); + batcher.enqueueRequest({operation: operation2}).subscribe(result => { + expect(result.data).toBe(data2); + + // The batch should've been fired by now. + expect(batcher["batchesByKey"].get('')).toBeUndefined(); + + resolve(); + }); + + expect(batcher["batchesByKey"].get('')!.size).toBe(2); + + sub1.unsubscribe(); + expect(batcher["batchesByKey"].get('')!.size).toBe(1); + + setTimeout(() => { + // The batch shouldn't be fired yet, so we can add one more request. + const sub3 = batcher.enqueueRequest({operation: operation3}).subscribe(() => reject('next should never be called')); + expect(batcher["batchesByKey"].get('')!.size).toBe(2); + + sub3.unsubscribe(); + expect(batcher["batchesByKey"].get('')!.size).toBe(1); + }, 5); + }); + itAsync('should reject the promise if there is a network error', (resolve, reject) => { const query = gql` query { @@ -765,14 +928,14 @@ describe('BatchLink', () => { new BatchLink({ batchInterval: 1, //if batchKey does not work, then the batch size would be 3 - batchMax: 3, + batchMax: 2, batchHandler, batchKey, }), ]); let count = 0; - [1, 2, 3, 4].forEach(x => { + [1, 2, 3, 4].forEach(() => { execute(link, { query, }).subscribe({ diff --git a/src/link/batch/batching.ts b/src/link/batch/batching.ts index f2d7281bdec..549df9c9728 100644 --- a/src/link/batch/batching.ts +++ b/src/link/batch/batching.ts @@ -1,5 +1,5 @@ -import { Operation, FetchResult, NextLink } from '../core'; -import { Observable } from '../../utilities'; +import { FetchResult, NextLink, Operation } from '../core'; +import { Observable, ObservableSubscription } from '../../utilities'; export type BatchHandler = ( operations: Operation[], @@ -9,14 +9,20 @@ export type BatchHandler = ( export interface BatchableRequest { operation: Operation; forward?: NextLink; +} - // promise is created when the query fetch request is - // added to the queue and is resolved once the result is back - // from the server. +type QueuedRequest = BatchableRequest & { observable?: Observable; - next?: Array<(result: FetchResult) => void>; - error?: Array<(error: Error) => void>; - complete?: Array<() => void>; + next: Array<(result: FetchResult) => void>; + error: Array<(error: Error) => void>; + complete: Array<() => void>; + subscribers: Set; +} + +// Batches are primarily a Set, but may have other optional +// properties, such as batch.subscription. +type RequestBatch = Set & { + subscription?: ObservableSubscription; } // QueryBatcher doesn't fire requests immediately. Requests that were enqueued within @@ -24,8 +30,7 @@ export interface BatchableRequest { // into one query. export class OperationBatcher { // Queue on which the QueryBatcher will operate on a per-tick basis. - // Public only for testing - public queuedRequests: Map; + private batchesByKey = new Map(); private scheduledBatchTimer: ReturnType; private batchDebounce?: boolean; @@ -49,7 +54,6 @@ export class OperationBatcher { batchHandler: BatchHandler; batchKey?: (operation: Operation) => string; }) { - this.queuedRequests = new Map(); this.batchDebounce = batchDebounce; this.batchInterval = batchInterval; this.batchMax = batchMax || 0; @@ -58,38 +62,46 @@ export class OperationBatcher { } public enqueueRequest(request: BatchableRequest): Observable { - const requestCopy = { + const requestCopy: QueuedRequest = { ...request, + next: [], + error: [], + complete: [], + subscribers: new Set(), }; - let queued = false; const key = this.batchKey(request.operation); if (!requestCopy.observable) { requestCopy.observable = new Observable(observer => { - if (!this.queuedRequests.has(key)) { - this.queuedRequests.set(key, []); + let batch = this.batchesByKey.get(key)!; + if (!batch) this.batchesByKey.set(key, batch = new Set()); + + // These booleans seem to me (@benjamn) like they might always be the + // same (and thus we could do with only one of them), but I'm not 100% + // sure about that. + const isFirstEnqueuedRequest = batch.size === 0; + const isFirstSubscriber = requestCopy.subscribers.size === 0; + requestCopy.subscribers.add(observer); + if (isFirstSubscriber) { + batch.add(requestCopy); } - if (!queued) { - this.queuedRequests.get(key)!.push(requestCopy); - queued = true; + // called for each subscriber, so need to save all listeners (next, error, complete) + if (observer.next) { + requestCopy.next.push(observer.next.bind(observer)); } - //called for each subscriber, so need to save all listeners(next, error, complete) - requestCopy.next = requestCopy.next || []; - if (observer.next) requestCopy.next.push(observer.next.bind(observer)); - - requestCopy.error = requestCopy.error || []; - if (observer.error) + if (observer.error) { requestCopy.error.push(observer.error.bind(observer)); + } - requestCopy.complete = requestCopy.complete || []; - if (observer.complete) + if (observer.complete) { requestCopy.complete.push(observer.complete.bind(observer)); + } // The first enqueued request triggers the queue consumption after `batchInterval` milliseconds. - if (this.queuedRequests.get(key)!.length === 1) { + if (isFirstEnqueuedRequest) { this.scheduleQueueConsumption(key); } else if (this.batchDebounce) { clearTimeout(this.scheduledBatchTimer); @@ -97,9 +109,23 @@ export class OperationBatcher { } // When amount of requests reaches `batchMax`, trigger the queue consumption without waiting on the `batchInterval`. - if (this.queuedRequests.get(key)!.length === this.batchMax) { + if (batch.size === this.batchMax) { this.consumeQueue(key); } + + return () => { + // If this is last subscriber for this request, remove request from queue + if (requestCopy.subscribers.delete(observer) && + requestCopy.subscribers.size < 1) { + // If this is last request from queue, remove queue entirely + if (batch.delete(requestCopy) && batch.size < 1) { + clearTimeout(this.scheduledBatchTimer); + this.batchesByKey.delete(key); + // If queue was in flight, cancel it + batch.subscription?.unsubscribe(); + } + } + } }); } @@ -109,50 +135,49 @@ export class OperationBatcher { // Consumes the queue. // Returns a list of promises (one for each query). public consumeQueue( - key?: string, + key: string = '', ): (Observable | undefined)[] | undefined { - const requestKey = key || ''; - const queuedRequests = this.queuedRequests.get(requestKey); - - if (!queuedRequests) { + const batch = this.batchesByKey.get(key); + // Delete this batch and process it below. + this.batchesByKey.delete(key); + if (!batch || !batch.size) { + // No requests to be processed. return; } - this.queuedRequests.delete(requestKey); - - const requests: Operation[] = queuedRequests.map( - queuedRequest => queuedRequest.operation, - ); - - const forwards: (NextLink | undefined)[] = queuedRequests.map( - queuedRequest => queuedRequest.forward, - ); - - const observables: (Observable | undefined)[] = []; - const nexts: any[] = []; - const errors: any[] = []; - const completes: any[] = []; - queuedRequests.forEach((batchableRequest, index) => { - observables.push(batchableRequest.observable); - nexts.push(batchableRequest.next); - errors.push(batchableRequest.error); - completes.push(batchableRequest.complete); + const operations: QueuedRequest['operation'][] = []; + const forwards: QueuedRequest['forward'][] = []; + const observables: QueuedRequest['observable'][] = []; + const nexts: QueuedRequest['next'][] = []; + const errors: QueuedRequest['error'][] = []; + const completes: QueuedRequest['complete'][] = []; + + // Even though batch is a Set, it preserves the order of first insertion + // when iterating (per ECMAScript specification), so these requests will be + // handled in the order they were enqueued (minus any deleted ones). + batch.forEach(request => { + operations.push(request.operation); + forwards.push(request.forward); + observables.push(request.observable); + nexts.push(request.next); + errors.push(request.error); + completes.push(request.complete); }); const batchedObservable = - this.batchHandler(requests, forwards) || Observable.of(); + this.batchHandler(operations, forwards) || Observable.of(); - const onError = (error: any) => { + const onError = (error: Error) => { //each callback list in batch errors.forEach(rejecters => { if (rejecters) { //each subscriber to request - rejecters.forEach((e: any) => e(error)); + rejecters.forEach((e) => e(error)); } }); }; - batchedObservable.subscribe({ + batch.subscription = batchedObservable.subscribe({ next: results => { if (!Array.isArray(results)) { results = [results]; @@ -171,7 +196,7 @@ export class OperationBatcher { results.forEach((result, index) => { if (nexts[index]) { - nexts[index].forEach((next: any) => next(result)); + nexts[index].forEach((next) => next(result)); } }); }, @@ -180,7 +205,7 @@ export class OperationBatcher { completes.forEach(complete => { if (complete) { //each subscriber to request - complete.forEach((c: any) => c()); + complete.forEach((c) => c()); } }); }, @@ -189,15 +214,9 @@ export class OperationBatcher { return observables; } - private scheduleQueueConsumption(key?: string): void { - const requestKey = key || ''; - this.scheduledBatchTimer = (setTimeout(() => { - if ( - this.queuedRequests.get(requestKey) && - this.queuedRequests.get(requestKey)!.length - ) { - this.consumeQueue(requestKey); - } - }, this.batchInterval)); + private scheduleQueueConsumption(key: string): void { + this.scheduledBatchTimer = setTimeout(() => { + this.consumeQueue(key); + }, this.batchInterval); } }