From 188123bf22ad2183ebbf16de5ea5352dd163f0db Mon Sep 17 00:00:00 2001 From: Mengwei Ding Date: Thu, 28 Mar 2019 14:22:06 +0800 Subject: [PATCH] [Code] Implement the index checkpointing (#32682) * [Code] Persist index checkpoint into index progress in ES * [Code] apply checkpoint to lsp indexer * [Code] Add unit tests for index checkpointing * [Code] move checkpoint from text to object --- x-pack/plugins/code/model/repository.ts | 15 ++ .../{lsp_indexer_test.ts => lsp_indexer.ts} | 64 +++++- .../code/server/indexer/abstract_indexer.ts | 42 +++- x-pack/plugins/code/server/indexer/indexer.ts | 12 +- .../code/server/indexer/lsp_indexer.ts | 4 +- .../code/server/indexer/schema/document.ts | 22 ++ .../code/server/queue/index_worker.test.ts | 200 +++++++++++++++++- .../plugins/code/server/queue/index_worker.ts | 69 +++++- 8 files changed, 387 insertions(+), 41 deletions(-) rename x-pack/plugins/code/server/__tests__/{lsp_indexer_test.ts => lsp_indexer.ts} (80%) diff --git a/x-pack/plugins/code/model/repository.ts b/x-pack/plugins/code/model/repository.ts index a209fe3bbe220..378b93ba93f91 100644 --- a/x-pack/plugins/code/model/repository.ts +++ b/x-pack/plugins/code/model/repository.ts @@ -4,6 +4,8 @@ * you may not use this file except in compliance with the Elastic License. */ +import { IndexRequest } from './search'; + export type RepositoryUri = string; export interface Repository { @@ -125,3 +127,16 @@ export interface CloneProgress { export interface CloneWorkerProgress extends WorkerProgress { cloneProgress?: CloneProgress; } + +export interface IndexProgress { + type: string; + total: number; + success: number; + fail: number; + percentage: number; + checkpoint?: IndexRequest; +} + +export interface IndexWorkerProgress extends WorkerProgress { + indexProgress?: IndexProgress; +} diff --git a/x-pack/plugins/code/server/__tests__/lsp_indexer_test.ts b/x-pack/plugins/code/server/__tests__/lsp_indexer.ts similarity index 80% rename from x-pack/plugins/code/server/__tests__/lsp_indexer_test.ts rename to x-pack/plugins/code/server/__tests__/lsp_indexer.ts index 6f9fc3771c1ea..d8d634fb9b88c 100644 --- a/x-pack/plugins/code/server/__tests__/lsp_indexer_test.ts +++ b/x-pack/plugins/code/server/__tests__/lsp_indexer.ts @@ -152,7 +152,6 @@ describe('lsp_indexer', () => { it('Normal LSP index process.', async () => { // Setup the esClient spies const { - getSpy, existsAliasSpy, createSpy, putAliasSpy, @@ -181,9 +180,6 @@ describe('lsp_indexer', () => { ); await indexer.start(); - // Expect EsClient get called once to get the repo git status. - assert.ok(getSpy.calledOnce); - // Expect EsClient deleteByQuery called 3 times for repository cleaning before // the index for document, symbol and reference, respectively. assert.strictEqual(deleteByQuerySpy.callCount, 3); @@ -204,7 +200,6 @@ describe('lsp_indexer', () => { it('Cancel LSP index process.', async () => { // Setup the esClient spies const { - getSpy, existsAliasSpy, createSpy, putAliasSpy, @@ -235,9 +230,6 @@ describe('lsp_indexer', () => { indexer.cancel(); await indexer.start(); - // Expect EsClient get called once to get the repo git status. - assert.ok(getSpy.calledOnce); - // Expect EsClient deleteByQuery called 3 times for repository cleaning before // the index for document, symbol and reference, respectively. assert.strictEqual(deleteByQuerySpy.callCount, 3); @@ -251,5 +243,61 @@ describe('lsp_indexer', () => { // indexed and thus bulk won't be called. assert.ok(bulkSpy.notCalled); }); + + it('Index continues from checkpoint', async () => { + // Setup the esClient spies + const { + existsAliasSpy, + createSpy, + putAliasSpy, + deleteByQuerySpy, + bulkSpy, + } = setupEsClientSpy(); + + const lspservice = new LspService( + '127.0.0.1', + serverOptions, + esClient as EsClient, + {} as InstallManager, + new ConsoleLoggerFactory(), + new RepositoryConfigController(esClient as EsClient) + ); + + lspservice.sendRequest = setupLsServiceSendRequestSpy(); + + const indexer = new LspIndexer( + 'github.com/Microsoft/TypeScript-Node-Starter', + '46971a8', + lspservice, + serverOptions, + esClient as EsClient, + log + ); + + // Apply a checkpoint in here. + await indexer.start(undefined, { + repoUri: '', + filePath: 'src/public/js/main.ts', + revision: '46971a8', + localRepoPath: '', + }); + + // Expect EsClient deleteByQuery called 0 times for repository cleaning while + // dealing with repository checkpoint. + assert.strictEqual(deleteByQuerySpy.callCount, 0); + + // Ditto for index and alias creation + assert.strictEqual(existsAliasSpy.callCount, 0); + assert.strictEqual(createSpy.callCount, 0); + assert.strictEqual(putAliasSpy.callCount, 0); + + // There are 22 files in the repo, but only 11 files after the checkpoint. + // 1 file + 1 symbol + 1 reference = 3 objects to index for each file. + // Total doc indexed should be 3 * 11 = 33, which can be fitted into a + // single batch index. + assert.ok(bulkSpy.calledOnce); + assert.strictEqual(bulkSpy.getCall(0).args[0].body.length, 33 * 2); + // @ts-ignore + }).timeout(20000); // @ts-ignore }).timeout(20000); diff --git a/x-pack/plugins/code/server/indexer/abstract_indexer.ts b/x-pack/plugins/code/server/indexer/abstract_indexer.ts index 6fce9123d0879..6d28c43cb6726 100644 --- a/x-pack/plugins/code/server/indexer/abstract_indexer.ts +++ b/x-pack/plugins/code/server/indexer/abstract_indexer.ts @@ -6,8 +6,8 @@ import moment from 'moment'; -import { Indexer, IndexProgress, ProgressReporter } from '.'; -import { IndexRequest, IndexStats, IndexStatsKey, RepositoryUri } from '../../model'; +import { Indexer, ProgressReporter } from '.'; +import { IndexProgress, IndexRequest, IndexStats, IndexStatsKey, RepositoryUri } from '../../model'; import { EsClient } from '../lib/esqueue'; import { Logger } from '../log'; import { aggregateIndexStats } from '../utils/index_stats_aggregator'; @@ -29,21 +29,23 @@ export abstract class AbstractIndexer implements Indexer { this.indexCreator = new IndexCreator(client); } - public async start(progressReporter?: ProgressReporter) { + public async start(progressReporter?: ProgressReporter, checkpointReq?: IndexRequest) { this.log.info( `Indexer ${this.type} started for repo ${this.repoUri} with revision ${this.revision}` ); this.cancelled = false; - // Prepare the ES index - const res = await this.prepareIndex(); - if (!res) { - this.log.error(`Prepare index for ${this.repoUri} error. Skip indexing.`); - return new Map(); - } + if (!checkpointReq) { + // Prepare the ES index + const res = await this.prepareIndex(); + if (!res) { + this.log.error(`Prepare index for ${this.repoUri} error. Skip indexing.`); + return new Map(); + } - // Clean up the index if necessary - await this.cleanIndex(); + // Clean up the index if necessary + await this.cleanIndex(); + } // Prepare all the index requests let totalCount = 0; @@ -60,6 +62,7 @@ export abstract class AbstractIndexer implements Indexer { throw error; } + let meetCheckpoint = false; const reqsIterator = await this.getIndexRequestIterator(); for await (const req of reqsIterator) { if (this.isCancelled()) { @@ -67,6 +70,22 @@ export abstract class AbstractIndexer implements Indexer { break; } + // If checkpoint is not undefined and not empty + if (checkpointReq) { + // Assume for the same revision, everything we iterate the repository, + // the order of the files is definite. + // @ts-ignore + if (req.filePath === checkpointReq.filePath && req.revision === checkpointReq.revision) { + this.log.info(`The index checkpoint has been found ${JSON.stringify(checkpointReq)}.`); + meetCheckpoint = true; + } + + if (!meetCheckpoint) { + // If the checkpoint has not been met yet, skip current request. + continue; + } + } + try { const stats = await this.processRequest(req); statsBuffer.push(stats); @@ -89,6 +108,7 @@ export abstract class AbstractIndexer implements Indexer { success: successCount, fail: failCount, percentage: Math.floor((100 * (successCount + failCount)) / totalCount), + checkpoint: req, }; if (moment().diff(prevTimestamp) > this.INDEXER_PROGRESS_UPDATE_INTERVAL_MS) { progressReporter(progress); diff --git a/x-pack/plugins/code/server/indexer/indexer.ts b/x-pack/plugins/code/server/indexer/indexer.ts index 01ecc884ed10b..fa20449255520 100644 --- a/x-pack/plugins/code/server/indexer/indexer.ts +++ b/x-pack/plugins/code/server/indexer/indexer.ts @@ -4,20 +4,12 @@ * you may not use this file except in compliance with the Elastic License. */ -import { IndexStats, RepositoryUri } from '../../model'; - -export interface IndexProgress { - type: string; - total: number; - success: number; - fail: number; - percentage: number; -} +import { IndexProgress, IndexRequest, IndexStats, RepositoryUri } from '../../model'; export type ProgressReporter = (progress: IndexProgress) => void; export interface Indexer { - start(ProgressReporter?: ProgressReporter): Promise; + start(ProgressReporter?: ProgressReporter, checkpointReq?: IndexRequest): Promise; cancel(): void; } diff --git a/x-pack/plugins/code/server/indexer/lsp_indexer.ts b/x-pack/plugins/code/server/indexer/lsp_indexer.ts index a0d7f496a8f8b..3f9d252953b42 100644 --- a/x-pack/plugins/code/server/indexer/lsp_indexer.ts +++ b/x-pack/plugins/code/server/indexer/lsp_indexer.ts @@ -42,9 +42,9 @@ export class LspIndexer extends AbstractIndexer { this.batchIndexHelper = new BatchIndexHelper(client, log); } - public async start(progressReporter?: ProgressReporter) { + public async start(progressReporter?: ProgressReporter, checkpointReq?: LspIndexRequest) { try { - return await super.start(progressReporter); + return await super.start(progressReporter, checkpointReq); } finally { if (!this.isCancelled()) { // Flush all the index request still in the cache for bulk index. diff --git a/x-pack/plugins/code/server/indexer/schema/document.ts b/x-pack/plugins/code/server/indexer/schema/document.ts index 08a6954819e74..97d706e922294 100644 --- a/x-pack/plugins/code/server/indexer/schema/document.ts +++ b/x-pack/plugins/code/server/indexer/schema/document.ts @@ -159,6 +159,28 @@ export const DocumentSchema = { revision: { type: 'keyword', }, + indexProgress: { + properties: { + type: { + type: 'keyword', + }, + total: { + type: 'integer', + }, + success: { + type: 'integer', + }, + fail: { + type: 'integer', + }, + percentage: { + type: 'integer', + }, + checkpoint: { + type: 'object', + }, + }, + }, }, }, }; diff --git a/x-pack/plugins/code/server/queue/index_worker.test.ts b/x-pack/plugins/code/server/queue/index_worker.test.ts index 760fc3a312565..0c3f55f46d3bc 100644 --- a/x-pack/plugins/code/server/queue/index_worker.test.ts +++ b/x-pack/plugins/code/server/queue/index_worker.test.ts @@ -6,7 +6,9 @@ import sinon from 'sinon'; +import { WorkerReservedProgress } from '../../model'; import { IndexerFactory } from '../indexer'; +import { RepositoryLspIndexStatusReservedField } from '../indexer/schema'; import { CancellationToken, EsClient, Esqueue } from '../lib/esqueue'; import { Logger } from '../log'; import { ServerOptions } from '../server_options'; @@ -34,6 +36,24 @@ test('Execute index job.', async () => { cancellationService.cancelIndexJob = cancelIndexJobSpy; cancellationService.registerIndexJobToken = registerIndexJobTokenSpy; + // Setup EsClient + const getSpy = sinon.fake.returns( + Promise.resolve({ + _source: { + [RepositoryLspIndexStatusReservedField]: { + uri: 'github.com/Microsoft/TypeScript-Node-Starter', + progress: WorkerReservedProgress.COMPLETED, + timestamp: new Date(), + revision: 'abcdefg', + }, + }, + }) + ); + const esClient = { + get: emptyAsyncFunc, + }; + esClient.get = getSpy; + // Setup IndexerFactory const cancelSpy = sinon.spy(); const startSpy = sinon.fake.returns(new Map()); @@ -54,7 +74,7 @@ test('Execute index job.', async () => { const indexWorker = new IndexWorker( esQueue as Esqueue, log, - {} as EsClient, + esClient as EsClient, [(indexerFactory as any) as IndexerFactory], {} as ServerOptions, (cancellationService as any) as CancellationSerivce @@ -70,7 +90,7 @@ test('Execute index job.', async () => { }); expect(cancelIndexJobSpy.calledOnce).toBeTruthy(); - + expect(getSpy.calledOnce).toBeTruthy(); expect(createSpy.calledOnce).toBeTruthy(); expect(startSpy.calledOnce).toBeTruthy(); expect(cancelSpy.notCalled).toBeTruthy(); @@ -87,6 +107,24 @@ test('Execute index job and then cancel.', async () => { cancellationService.cancelIndexJob = cancelIndexJobSpy; cancellationService.registerIndexJobToken = registerIndexJobTokenSpy; + // Setup EsClient + const getSpy = sinon.fake.returns( + Promise.resolve({ + _source: { + [RepositoryLspIndexStatusReservedField]: { + uri: 'github.com/Microsoft/TypeScript-Node-Starter', + progress: WorkerReservedProgress.COMPLETED, + timestamp: new Date(), + revision: 'abcdefg', + }, + }, + }) + ); + const esClient = { + get: emptyAsyncFunc, + }; + esClient.get = getSpy; + // Setup IndexerFactory const cancelSpy = sinon.spy(); const startSpy = sinon.fake.returns(new Map()); @@ -107,7 +145,7 @@ test('Execute index job and then cancel.', async () => { const indexWorker = new IndexWorker( esQueue as Esqueue, log, - {} as EsClient, + esClient as EsClient, [(indexerFactory as any) as IndexerFactory], {} as ServerOptions, (cancellationService as any) as CancellationSerivce @@ -126,13 +164,167 @@ test('Execute index job and then cancel.', async () => { cToken.cancel(); expect(cancelIndexJobSpy.calledOnce).toBeTruthy(); - + expect(getSpy.calledOnce).toBeTruthy(); expect(createSpy.calledOnce).toBeTruthy(); expect(startSpy.calledOnce).toBeTruthy(); // Then the the cancel function of the indexer should be called. expect(cancelSpy.calledOnce).toBeTruthy(); }); +test('Index job skipped/deduplicated if revision matches', async () => { + // Setup CancellationService + const cancelIndexJobSpy = sinon.spy(); + const registerIndexJobTokenSpy = sinon.spy(); + const cancellationService = { + cancelIndexJob: emptyAsyncFunc, + registerIndexJobToken: emptyAsyncFunc, + }; + cancellationService.cancelIndexJob = cancelIndexJobSpy; + cancellationService.registerIndexJobToken = registerIndexJobTokenSpy; + + // Setup EsClient + const getSpy = sinon.fake.returns( + Promise.resolve({ + _source: { + [RepositoryLspIndexStatusReservedField]: { + uri: 'github.com/elastic/kibana', + progress: 50, + timestamp: new Date(), + revision: 'abcdefg', + indexProgress: {}, + }, + }, + }) + ); + const esClient = { + get: emptyAsyncFunc, + }; + esClient.get = getSpy; + + // Setup IndexerFactory + const cancelSpy = sinon.spy(); + const startSpy = sinon.fake.returns(new Map()); + const indexer = { + cancel: emptyAsyncFunc, + start: emptyAsyncFunc, + }; + indexer.cancel = cancelSpy; + indexer.start = startSpy; + const createSpy = sinon.fake.returns(indexer); + const indexerFactory = { + create: emptyAsyncFunc, + }; + indexerFactory.create = createSpy; + + const cToken = new CancellationToken(); + + const indexWorker = new IndexWorker( + esQueue as Esqueue, + log, + esClient as EsClient, + [(indexerFactory as any) as IndexerFactory], + {} as ServerOptions, + (cancellationService as any) as CancellationSerivce + ); + + await indexWorker.executeJob({ + payload: { + uri: 'github.com/elastic/kibana', + revision: 'abcdefg', + }, + options: {}, + cancellationToken: cToken, + timestamp: 0, + }); + + expect(getSpy.calledOnce).toBeTruthy(); + expect(cancelIndexJobSpy.notCalled).toBeTruthy(); + expect(createSpy.notCalled).toBeTruthy(); + expect(startSpy.notCalled).toBeTruthy(); + expect(cancelSpy.notCalled).toBeTruthy(); +}); + +test('Index job continue if revision matches and checkpoint found', async () => { + // Setup CancellationService + const cancelIndexJobSpy = sinon.spy(); + const registerIndexJobTokenSpy = sinon.spy(); + const cancellationService = { + cancelIndexJob: emptyAsyncFunc, + registerIndexJobToken: emptyAsyncFunc, + }; + cancellationService.cancelIndexJob = cancelIndexJobSpy; + cancellationService.registerIndexJobToken = registerIndexJobTokenSpy; + + // Setup EsClient + const getSpy = sinon.fake.returns( + Promise.resolve({ + _source: { + [RepositoryLspIndexStatusReservedField]: { + uri: 'github.com/elastic/kibana', + progress: 50, + timestamp: new Date(), + revision: 'abcdefg', + indexProgress: { + checkpoint: { + repoUri: 'github.com/elastic/kibana', + filePath: 'foo/bar.js', + revision: 'abcdefg', + }, + }, + }, + }, + }) + ); + const esClient = { + get: emptyAsyncFunc, + }; + esClient.get = getSpy; + + // Setup IndexerFactory + const cancelSpy = sinon.spy(); + const startSpy = sinon.fake.returns(new Map()); + const indexer = { + cancel: emptyAsyncFunc, + start: emptyAsyncFunc, + }; + indexer.cancel = cancelSpy; + indexer.start = startSpy; + const createSpy = sinon.fake.returns(indexer); + const indexerFactory = { + create: emptyAsyncFunc, + }; + indexerFactory.create = createSpy; + + const cToken = new CancellationToken(); + + const indexWorker = new IndexWorker( + esQueue as Esqueue, + log, + esClient as EsClient, + [(indexerFactory as any) as IndexerFactory], + {} as ServerOptions, + (cancellationService as any) as CancellationSerivce + ); + + await indexWorker.executeJob({ + payload: { + uri: 'github.com/elastic/kibana', + revision: 'abcdefg', + }, + options: {}, + cancellationToken: cToken, + timestamp: 0, + }); + + expect(getSpy.calledOnce).toBeTruthy(); + // the rest of the index worker logic after the checkpoint handling + // should be executed. + expect(cancelIndexJobSpy.calledOnce).toBeTruthy(); + expect(createSpy.calledOnce).toBeTruthy(); + expect(startSpy.calledOnce).toBeTruthy(); + expect(cancelSpy.notCalled).toBeTruthy(); +}); + test('On index job enqueued.', async () => { // Setup EsClient const indexSpy = sinon.fake.returns(Promise.resolve()); diff --git a/x-pack/plugins/code/server/queue/index_worker.ts b/x-pack/plugins/code/server/queue/index_worker.ts index fe483305becc0..a85da3f78ba81 100644 --- a/x-pack/plugins/code/server/queue/index_worker.ts +++ b/x-pack/plugins/code/server/queue/index_worker.ts @@ -6,9 +6,18 @@ import moment from 'moment'; -import { IndexStats, IndexWorkerResult, RepositoryUri, WorkerProgress } from '../../model'; +import { + IndexProgress, + IndexRequest, + IndexStats, + IndexWorkerProgress, + IndexWorkerResult, + RepositoryUri, + WorkerProgress, + WorkerReservedProgress, +} from '../../model'; import { GitOperations } from '../git_operations'; -import { IndexerFactory, IndexProgress } from '../indexer'; +import { IndexerFactory } from '../indexer'; import { EsClient, Esqueue } from '../lib/esqueue'; import { Logger } from '../log'; import { RepositoryObjectClient } from '../search'; @@ -40,6 +49,40 @@ export class IndexWorker extends AbstractWorker { const { uri, revision } = payload; const indexerNumber = this.indexerFactories.length; + const workerProgress = (await this.objectClient.getRepositoryLspIndexStatus( + uri + )) as IndexWorkerProgress; + let checkpointReq: IndexRequest | undefined; + if (workerProgress) { + // There exist an ongoing index process + const { + uri: currentUri, + revision: currentRevision, + indexProgress: currentIndexProgress, + progress, + } = workerProgress; + + checkpointReq = currentIndexProgress && currentIndexProgress.checkpoint; + if ( + !checkpointReq && + progress > WorkerReservedProgress.INIT && + progress < WorkerReservedProgress.COMPLETED && + currentUri === uri && + currentRevision === revision + ) { + // If + // * no checkpoint exist (undefined or empty string) + // * index progress is ongoing + // * the uri and revision match the current job + // Then we can safely dedup this index job request. + this.log.info(`Index job skipped for ${uri} at revision ${revision}`); + return { + uri, + revision, + }; + } + } + // Binding the index cancellation logic this.cancellationService.cancelIndexJob(uri); const indexPromises: Array> = this.indexerFactories.map( @@ -52,7 +95,7 @@ export class IndexWorker extends AbstractWorker { this.cancellationService.registerIndexJobToken(uri, cancellationToken); } const progressReporter = this.getProgressReporter(uri, revision, index, indexerNumber); - return indexer.start(progressReporter); + return indexer.start(progressReporter, checkpointReq); } ); const stats: IndexStats[] = await Promise.all(indexPromises); @@ -69,7 +112,7 @@ export class IndexWorker extends AbstractWorker { const { uri, revision } = job.payload; const progress: WorkerProgress = { uri, - progress: 0, + progress: WorkerReservedProgress.INIT, timestamp: new Date(), revision, }; @@ -77,11 +120,24 @@ export class IndexWorker extends AbstractWorker { } public async updateProgress(uri: RepositoryUri, progress: number) { - const p: WorkerProgress = { + let p: any = { uri, progress, timestamp: new Date(), }; + if ( + progress === WorkerReservedProgress.COMPLETED || + progress === WorkerReservedProgress.ERROR || + progress === WorkerReservedProgress.TIMEOUT + ) { + // Reset the checkpoint if necessary. + p = { + ...p, + indexProgress: { + checkpoint: null, + }, + }; + } try { return await this.objectClient.updateRepositoryLspIndexStatus(uri, p); } catch (error) { @@ -116,11 +172,12 @@ export class IndexWorker extends AbstractWorker { total: number ) { return async (progress: IndexProgress) => { - const p: WorkerProgress = { + const p: IndexWorkerProgress = { uri: repoUri, progress: progress.percentage, timestamp: new Date(), revision, + indexProgress: progress, }; return await this.objectClient.setRepositoryLspIndexStatus(repoUri, p); };