From dcc2312754fc3dfad686e5a2bd94bb4736ee0ef8 Mon Sep 17 00:00:00 2001 From: Gidi Meir Morris Date: Mon, 13 Jul 2020 19:38:52 +0100 Subject: [PATCH] added missing unit tests --- .../server/buffered_task_store.test.ts | 82 +++++++++++++++++++ .../server/buffered_task_store.ts | 31 ++++--- .../server/lib/bulk_operation_buffer.test.ts | 22 +++-- .../server/lib/bulk_operation_buffer.ts | 35 ++++---- .../task_manager/server/lib/result_type.ts | 19 +++-- .../task_manager/server/task_store.mock.ts | 31 +++++++ 6 files changed, 179 insertions(+), 41 deletions(-) create mode 100644 x-pack/plugins/task_manager/server/buffered_task_store.test.ts create mode 100644 x-pack/plugins/task_manager/server/task_store.mock.ts diff --git a/x-pack/plugins/task_manager/server/buffered_task_store.test.ts b/x-pack/plugins/task_manager/server/buffered_task_store.test.ts new file mode 100644 index 0000000000000..1397d7ee6ee07 --- /dev/null +++ b/x-pack/plugins/task_manager/server/buffered_task_store.test.ts @@ -0,0 +1,82 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import uuid from 'uuid'; +import { taskStoreMock } from './task_store.mock'; +import { BufferedTaskStore } from './buffered_task_store'; +import { asErr, asOk } from './lib/result_type'; +import { TaskStatus } from './task'; + +describe('Buffered Task Store', () => { + test('proxies the TaskStore for `maxAttempts` and `remove`', async () => { + const taskStore = taskStoreMock.create({ maxAttempts: 10 }); + taskStore.bulkUpdate.mockResolvedValue([]); + const bufferedStore = new BufferedTaskStore(taskStore); + + expect(bufferedStore.maxAttempts).toEqual(10); + + bufferedStore.remove('1'); + expect(taskStore.remove).toHaveBeenCalledWith('1'); + }); + + describe('update', () => { + test("proxies the TaskStore's `bulkUpdate`", async () => { + const taskStore = taskStoreMock.create({ maxAttempts: 10 }); + const bufferedStore = new BufferedTaskStore(taskStore); + + const task = mockTask(); + + taskStore.bulkUpdate.mockResolvedValue([asOk(task)]); + + expect(await bufferedStore.update(task)).toMatchObject(task); + expect(taskStore.bulkUpdate).toHaveBeenCalledWith([task]); + }); + + test('handles partially successfull bulkUpdates resolving each call appropriately', async () => { + const taskStore = taskStoreMock.create({ maxAttempts: 10 }); + const bufferedStore = new BufferedTaskStore(taskStore); + + const tasks = [mockTask(), mockTask(), mockTask()]; + + taskStore.bulkUpdate.mockResolvedValueOnce([ + asOk(tasks[0]), + asErr(new Error('Oh no, something went terribly wrong')), + asOk(tasks[2]), + ]); + + const results = [ + bufferedStore.update(tasks[0]), + bufferedStore.update(tasks[1]), + bufferedStore.update(tasks[2]), + ]; + expect(await results[0]).toMatchObject(tasks[0]); + expect(results[1]).rejects.toMatchInlineSnapshot( + `[Error: Oh no, something went terribly wrong]` + ); + expect(await results[2]).toMatchObject(tasks[2]); + }); + }); +}); + +function mockTask() { + return { + id: `task_${uuid.v4()}`, + attempts: 0, + schedule: undefined, + params: { hello: 'world' }, + retryAt: null, + runAt: new Date(), + scheduledAt: new Date(), + scope: undefined, + startedAt: null, + state: { foo: 'bar' }, + status: TaskStatus.Idle, + taskType: 'report', + user: undefined, + version: '123', + ownerId: '123', + }; +} diff --git a/x-pack/plugins/task_manager/server/buffered_task_store.ts b/x-pack/plugins/task_manager/server/buffered_task_store.ts index 9836e8cda592b..4633b261ffed7 100644 --- a/x-pack/plugins/task_manager/server/buffered_task_store.ts +++ b/x-pack/plugins/task_manager/server/buffered_task_store.ts @@ -8,22 +8,27 @@ import { TaskStore } from './task_store'; import { ConcreteTaskInstance } from './task'; import { Updatable } from './task_runner'; import { createBuffer, Operation } from './lib/bulk_operation_buffer'; -import { unwrapPromise, mapErr } from './lib/result_type'; +import { unwrapPromise, asErr, mapErr } from './lib/result_type'; export class BufferedTaskStore implements Updatable { - private bufferedUpdate: Operation; + private bufferedUpdate: Operation; constructor(private readonly taskStore: TaskStore) { - this.bufferedUpdate = createBuffer(async (docs) => { - return (await taskStore.bulkUpdate(docs)).map((entityOrError, index) => - mapErr( - (error: Error) => ({ - entity: docs[index], - error, - }), - entityOrError - ) - ); - }); + this.bufferedUpdate = createBuffer( + async (docs) => { + return (await taskStore.bulkUpdate(docs)).map((entityOrError, index) => + mapErr( + (error: Error) => + asErr({ + // TaskStore's bulkUpdate maintains the order of the docs + // so we can rely on the index in the `docs` to match an entity with an index + entity: docs[index], + error, + }), + entityOrError + ) + ); + } + ); } public get maxAttempts(): number { diff --git a/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.test.ts b/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.test.ts index 87e383b311a86..6e7ae97ea294e 100644 --- a/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.test.ts +++ b/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.test.ts @@ -33,10 +33,10 @@ function errorAttempts(task: TaskInstance): Err { +describe('Bulk Operation Buffer', () => { describe('createBuffer()', () => { test('batches up multiple Operation calls', async () => { - const bulkUpdate: jest.Mocked> = jest.fn( + const bulkUpdate: jest.Mocked> = jest.fn( ([task1, task2]) => { return Promise.resolve([incrementAttempts(task1), incrementAttempts(task2)]); } @@ -55,9 +55,11 @@ describe('Task Store Buffer', () => { }); test('batch updates are executed at most by the next Event Loop tick', async () => { - const bulkUpdate: jest.Mocked> = jest.fn((tasks) => { - return Promise.resolve(tasks.map(incrementAttempts)); - }); + const bulkUpdate: jest.Mocked> = jest.fn( + (tasks) => { + return Promise.resolve(tasks.map(incrementAttempts)); + } + ); const bufferedUpdate = createBuffer(bulkUpdate); @@ -97,7 +99,7 @@ describe('Task Store Buffer', () => { }); test('handles both resolutions and rejections at individual task level', async (done) => { - const bulkUpdate: jest.Mocked> = jest.fn( + const bulkUpdate: jest.Mocked> = jest.fn( ([task1, task2, task3]) => { return Promise.resolve([ incrementAttempts(task1), @@ -129,9 +131,11 @@ describe('Task Store Buffer', () => { }); test('handles bulkUpdate failure', async (done) => { - const bulkUpdate: jest.Mocked> = jest.fn(() => { - return Promise.reject(new Error('bulkUpdate is an illusion')); - }); + const bulkUpdate: jest.Mocked> = jest.fn( + () => { + return Promise.reject(new Error('bulkUpdate is an illusion')); + } + ); const bufferedUpdate = createBuffer(bulkUpdate); diff --git a/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.ts b/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.ts index a02a96607f4ec..3f5354e621555 100644 --- a/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.ts +++ b/x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.ts @@ -13,24 +13,31 @@ export interface Entity { id: string; } -export interface OperationError { - entity: H; - error: E; +export interface OperationError { + entity: Input; + error: ErrorOutput; } -export type OperationResult = Result>; +export type OperationResult = Result< + Output, + OperationError +>; -export type Operation = (entity: H) => Promise>; -export type BulkOperation = (entities: H[]) => Promise>>; +export type Operation = ( + entity: Input +) => Promise>; +export type BulkOperation = ( + entities: Input[] +) => Promise>>; -export function createBuffer( - bulkOperation: BulkOperation -): Operation { +export function createBuffer( + bulkOperation: BulkOperation +): Operation { const flushBuffer = new Subject(); const storeUpdateBuffer = new Subject<{ - entity: H; - onSuccess: (entity: Ok) => void; - onFailure: (error: Err) => void; + entity: Input; + onSuccess: (entity: Ok) => void; + onFailure: (error: Err) => void; }>(); storeUpdateBuffer @@ -48,7 +55,7 @@ export function createBuffer( (entity) => { entityById[entity.id].onSuccess(asOk(entity)); }, - ({ entity, error }: OperationError) => { + ({ entity, error }: OperationError) => { entityById[entity.id].onFailure(asErr(error)); } ) @@ -59,7 +66,7 @@ export function createBuffer( }); }); - return async function (entity: H) { + return async function (entity: Input) { return new Promise((resolve, reject) => { // ensure we flush by the end of the "current" event loop tick setImmediate(() => flushBuffer.next()); diff --git a/x-pack/plugins/task_manager/server/lib/result_type.ts b/x-pack/plugins/task_manager/server/lib/result_type.ts index a7fbdf80679d7..b0eee7a10fc56 100644 --- a/x-pack/plugins/task_manager/server/lib/result_type.ts +++ b/x-pack/plugins/task_manager/server/lib/result_type.ts @@ -31,25 +31,34 @@ export function asErr(error: T): Err { }; } +export function isResult(maybeResult: unknown): maybeResult is Result { + return ( + (maybeResult as Result)?.tag === 'ok' || (maybeResult as Result)?.tag === 'err' + ); +} + export function isOk(result: Result): result is Ok { - return result.tag === 'ok'; + return result?.tag === 'ok'; } export function isErr(result: Result): result is Err { return !isOk(result); } -export async function promiseResult(future: Promise): Promise> { +export async function promiseResult( + future: Promise> +): Promise> { try { - return asOk(await future); + const result = await future; + return isResult(result) ? result : asOk(result); } catch (e) { - return asErr(e); + return isResult(e) ? e : asErr(e); } } export async function unwrapPromise(future: Promise>): Promise { return map( - await future, + await promiseResult(future), (value: T) => Promise.resolve(value), (err: E) => Promise.reject(err) ); diff --git a/x-pack/plugins/task_manager/server/task_store.mock.ts b/x-pack/plugins/task_manager/server/task_store.mock.ts new file mode 100644 index 0000000000000..86db695bc5e2c --- /dev/null +++ b/x-pack/plugins/task_manager/server/task_store.mock.ts @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { TaskStore } from './task_store'; + +interface TaskStoreOptions { + maxAttempts?: number; + index?: string; + taskManagerId?: string; +} +export const taskStoreMock = { + create({ maxAttempts = 0, index = '', taskManagerId = '' }: TaskStoreOptions) { + const mocked = ({ + update: jest.fn(), + remove: jest.fn(), + schedule: jest.fn(), + claimAvailableTasks: jest.fn(), + bulkUpdate: jest.fn(), + get: jest.fn(), + getLifecycle: jest.fn(), + fetch: jest.fn(), + maxAttempts, + index, + taskManagerId, + } as unknown) as jest.Mocked; + return mocked; + }, +};