Skip to content

Commit

Permalink
[Code] Implement the index checkpointing (#32682)
Browse files Browse the repository at this point in the history
* [Code] Persist index checkpoint into index progress in ES

* [Code] apply checkpoint to lsp indexer

* [Code] Add unit tests for index checkpointing

* [Code] move checkpoint from text to object
  • Loading branch information
mw-ding authored Mar 28, 2019
1 parent b650f2d commit 188123b
Show file tree
Hide file tree
Showing 8 changed files with 387 additions and 41 deletions.
15 changes: 15 additions & 0 deletions x-pack/plugins/code/model/repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { IndexRequest } from './search';

export type RepositoryUri = string;

export interface Repository {
Expand Down Expand Up @@ -125,3 +127,16 @@ export interface CloneProgress {
export interface CloneWorkerProgress extends WorkerProgress {
cloneProgress?: CloneProgress;
}

export interface IndexProgress {
type: string;
total: number;
success: number;
fail: number;
percentage: number;
checkpoint?: IndexRequest;
}

export interface IndexWorkerProgress extends WorkerProgress {
indexProgress?: IndexProgress;
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ describe('lsp_indexer', () => {
it('Normal LSP index process.', async () => {
// Setup the esClient spies
const {
getSpy,
existsAliasSpy,
createSpy,
putAliasSpy,
Expand Down Expand Up @@ -181,9 +180,6 @@ describe('lsp_indexer', () => {
);
await indexer.start();

// Expect EsClient get called once to get the repo git status.
assert.ok(getSpy.calledOnce);

// Expect EsClient deleteByQuery called 3 times for repository cleaning before
// the index for document, symbol and reference, respectively.
assert.strictEqual(deleteByQuerySpy.callCount, 3);
Expand All @@ -204,7 +200,6 @@ describe('lsp_indexer', () => {
it('Cancel LSP index process.', async () => {
// Setup the esClient spies
const {
getSpy,
existsAliasSpy,
createSpy,
putAliasSpy,
Expand Down Expand Up @@ -235,9 +230,6 @@ describe('lsp_indexer', () => {
indexer.cancel();
await indexer.start();

// Expect EsClient get called once to get the repo git status.
assert.ok(getSpy.calledOnce);

// Expect EsClient deleteByQuery called 3 times for repository cleaning before
// the index for document, symbol and reference, respectively.
assert.strictEqual(deleteByQuerySpy.callCount, 3);
Expand All @@ -251,5 +243,61 @@ describe('lsp_indexer', () => {
// indexed and thus bulk won't be called.
assert.ok(bulkSpy.notCalled);
});

it('Index continues from checkpoint', async () => {
// Setup the esClient spies
const {
existsAliasSpy,
createSpy,
putAliasSpy,
deleteByQuerySpy,
bulkSpy,
} = setupEsClientSpy();

const lspservice = new LspService(
'127.0.0.1',
serverOptions,
esClient as EsClient,
{} as InstallManager,
new ConsoleLoggerFactory(),
new RepositoryConfigController(esClient as EsClient)
);

lspservice.sendRequest = setupLsServiceSendRequestSpy();

const indexer = new LspIndexer(
'github.com/Microsoft/TypeScript-Node-Starter',
'46971a8',
lspservice,
serverOptions,
esClient as EsClient,
log
);

// Apply a checkpoint in here.
await indexer.start(undefined, {
repoUri: '',
filePath: 'src/public/js/main.ts',
revision: '46971a8',
localRepoPath: '',
});

// Expect EsClient deleteByQuery called 0 times for repository cleaning while
// dealing with repository checkpoint.
assert.strictEqual(deleteByQuerySpy.callCount, 0);

// Ditto for index and alias creation
assert.strictEqual(existsAliasSpy.callCount, 0);
assert.strictEqual(createSpy.callCount, 0);
assert.strictEqual(putAliasSpy.callCount, 0);

// There are 22 files in the repo, but only 11 files after the checkpoint.
// 1 file + 1 symbol + 1 reference = 3 objects to index for each file.
// Total doc indexed should be 3 * 11 = 33, which can be fitted into a
// single batch index.
assert.ok(bulkSpy.calledOnce);
assert.strictEqual(bulkSpy.getCall(0).args[0].body.length, 33 * 2);
// @ts-ignore
}).timeout(20000);
// @ts-ignore
}).timeout(20000);
42 changes: 31 additions & 11 deletions x-pack/plugins/code/server/indexer/abstract_indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

import moment from 'moment';

import { Indexer, IndexProgress, ProgressReporter } from '.';
import { IndexRequest, IndexStats, IndexStatsKey, RepositoryUri } from '../../model';
import { Indexer, ProgressReporter } from '.';
import { IndexProgress, IndexRequest, IndexStats, IndexStatsKey, RepositoryUri } from '../../model';
import { EsClient } from '../lib/esqueue';
import { Logger } from '../log';
import { aggregateIndexStats } from '../utils/index_stats_aggregator';
Expand All @@ -29,21 +29,23 @@ export abstract class AbstractIndexer implements Indexer {
this.indexCreator = new IndexCreator(client);
}

public async start(progressReporter?: ProgressReporter) {
public async start(progressReporter?: ProgressReporter, checkpointReq?: IndexRequest) {
this.log.info(
`Indexer ${this.type} started for repo ${this.repoUri} with revision ${this.revision}`
);
this.cancelled = false;

// Prepare the ES index
const res = await this.prepareIndex();
if (!res) {
this.log.error(`Prepare index for ${this.repoUri} error. Skip indexing.`);
return new Map<IndexStatsKey, number>();
}
if (!checkpointReq) {
// Prepare the ES index
const res = await this.prepareIndex();
if (!res) {
this.log.error(`Prepare index for ${this.repoUri} error. Skip indexing.`);
return new Map<IndexStatsKey, number>();
}

// Clean up the index if necessary
await this.cleanIndex();
// Clean up the index if necessary
await this.cleanIndex();
}

// Prepare all the index requests
let totalCount = 0;
Expand All @@ -60,13 +62,30 @@ export abstract class AbstractIndexer implements Indexer {
throw error;
}

let meetCheckpoint = false;
const reqsIterator = await this.getIndexRequestIterator();
for await (const req of reqsIterator) {
if (this.isCancelled()) {
this.log.info(`Indexer cancelled. Stop right now.`);
break;
}

// If checkpoint is not undefined and not empty
if (checkpointReq) {
// Assume for the same revision, everything we iterate the repository,
// the order of the files is definite.
// @ts-ignore
if (req.filePath === checkpointReq.filePath && req.revision === checkpointReq.revision) {
this.log.info(`The index checkpoint has been found ${JSON.stringify(checkpointReq)}.`);
meetCheckpoint = true;
}

if (!meetCheckpoint) {
// If the checkpoint has not been met yet, skip current request.
continue;
}
}

try {
const stats = await this.processRequest(req);
statsBuffer.push(stats);
Expand All @@ -89,6 +108,7 @@ export abstract class AbstractIndexer implements Indexer {
success: successCount,
fail: failCount,
percentage: Math.floor((100 * (successCount + failCount)) / totalCount),
checkpoint: req,
};
if (moment().diff(prevTimestamp) > this.INDEXER_PROGRESS_UPDATE_INTERVAL_MS) {
progressReporter(progress);
Expand Down
12 changes: 2 additions & 10 deletions x-pack/plugins/code/server/indexer/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,12 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { IndexStats, RepositoryUri } from '../../model';

export interface IndexProgress {
type: string;
total: number;
success: number;
fail: number;
percentage: number;
}
import { IndexProgress, IndexRequest, IndexStats, RepositoryUri } from '../../model';

export type ProgressReporter = (progress: IndexProgress) => void;

export interface Indexer {
start(ProgressReporter?: ProgressReporter): Promise<IndexStats>;
start(ProgressReporter?: ProgressReporter, checkpointReq?: IndexRequest): Promise<IndexStats>;
cancel(): void;
}

Expand Down
4 changes: 2 additions & 2 deletions x-pack/plugins/code/server/indexer/lsp_indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ export class LspIndexer extends AbstractIndexer {
this.batchIndexHelper = new BatchIndexHelper(client, log);
}

public async start(progressReporter?: ProgressReporter) {
public async start(progressReporter?: ProgressReporter, checkpointReq?: LspIndexRequest) {
try {
return await super.start(progressReporter);
return await super.start(progressReporter, checkpointReq);
} finally {
if (!this.isCancelled()) {
// Flush all the index request still in the cache for bulk index.
Expand Down
22 changes: 22 additions & 0 deletions x-pack/plugins/code/server/indexer/schema/document.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,28 @@ export const DocumentSchema = {
revision: {
type: 'keyword',
},
indexProgress: {
properties: {
type: {
type: 'keyword',
},
total: {
type: 'integer',
},
success: {
type: 'integer',
},
fail: {
type: 'integer',
},
percentage: {
type: 'integer',
},
checkpoint: {
type: 'object',
},
},
},
},
},
};
Expand Down
Loading

0 comments on commit 188123b

Please sign in to comment.