Skip to content

Commit

Permalink
Use PiT for outdated document search (#98797)
Browse files Browse the repository at this point in the history
* use PiT for OUTDATED_DOCUEMENT_SEARCH step

* update tests

* fix typo

* fix so migrations unit tests

* TEMP: use wait_for when transformin docs

* add a step to refresh target index if transformed outdated docs

* add unit tests

* refresh before searching outdated docs

* add integration test for outdated docs migration

* add a step to refresh target index if transformed outdated docs

* make query required

* address comments
  • Loading branch information
mshustov authored May 4, 2021
1 parent d03176e commit c815e4c
Show file tree
Hide file tree
Showing 13 changed files with 691 additions and 356 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ export class DocumentMigrator implements VersionedTransformer {
}

/**
* Gets the latest version of each migratable property.
* Gets the latest version of each migrate-able property.
*
* @readonly
* @type {SavedObjectsMigrationVersion}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export interface FullIndexInfo {

// When migrating from the outdated index we use a read query which excludes
// saved objects which are no longer used. These saved objects will still be
// kept in the outdated index for backup purposes, but won't be availble in
// kept in the outdated index for backup purposes, but won't be available in
// the upgraded index.
export const excludeUnusedTypesQuery: estypes.QueryContainer = {
bool: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,13 +344,13 @@ const mockV2MigrationOptions = () => {

options.client.openPointInTime = jest
.fn()
.mockImplementationOnce(() =>
.mockImplementation(() =>
elasticsearchClientMock.createSuccessTransportRequestPromise({ id: 'pit_id' })
);

options.client.closePointInTime = jest
.fn()
.mockImplementationOnce(() =>
.mockImplementation(() =>
elasticsearchClientMock.createSuccessTransportRequestPromise({ succeeded: true })
);

Expand Down
19 changes: 16 additions & 3 deletions src/core/server/saved_objects/migrationsv2/actions/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ describe('actions', () => {
jest.clearAllMocks();
});

// Create a mock client that rejects all methods with a 503 statuscode
// Create a mock client that rejects all methods with a 503 status code
// response.
const retryableError = new EsErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
Expand Down Expand Up @@ -92,7 +92,7 @@ describe('actions', () => {

describe('readWithPit', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.readWithPit(client, 'pitId', Option.none, 10_000);
const task = Actions.readWithPit(client, 'pitId', { match_all: {} }, 10_000);
try {
await task();
} catch (e) {
Expand Down Expand Up @@ -134,7 +134,7 @@ describe('actions', () => {
'my_target_index',
Option.none,
false,
Option.none
{}
);
try {
await task();
Expand Down Expand Up @@ -263,4 +263,17 @@ describe('actions', () => {
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
});

describe('refreshIndex', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.refreshIndex(client, 'target_index');
try {
await task();
} catch (e) {
/** ignore */
}

expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
});
});
40 changes: 26 additions & 14 deletions src/core/server/saved_objects/migrationsv2/actions/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -452,21 +452,18 @@ export interface ReadWithPit {

/*
* Requests documents from the index using PIT mechanism.
* Filter unusedTypesToExclude documents out to exclude them from being migrated.
* */
export const readWithPit = (
client: ElasticsearchClient,
pitId: string,
/* When reading we use a source query to exclude saved objects types which
* are no longer used. These saved objects will still be kept in the outdated
* index for backup purposes, but won't be available in the upgraded index.
*/
unusedTypesQuery: Option.Option<estypes.QueryContainer>,
query: estypes.QueryContainer,
batchSize: number,
searchAfter?: number[]
searchAfter?: number[],
seqNoPrimaryTerm?: boolean
): TaskEither.TaskEither<RetryableEsClientError, ReadWithPit> => () => {
return client
.search<SavedObjectsRawDoc>({
seq_no_primary_term: seqNoPrimaryTerm,
body: {
// Sort fields are required to use searchAfter
sort: {
Expand All @@ -479,8 +476,7 @@ export const readWithPit = (
// Improve performance by not calculating the total number of hits
// matching the query.
track_total_hits: false,
// Exclude saved object types
query: Option.isSome(unusedTypesQuery) ? unusedTypesQuery.value : undefined,
query,
},
})
.then((response) => {
Expand Down Expand Up @@ -531,6 +527,7 @@ export const transformDocs = (
transformRawDocs: TransformRawDocs,
outdatedDocuments: SavedObjectsRawDoc[],
index: string,
// used for testing purposes only
refresh: estypes.Refresh
): TaskEither.TaskEither<
RetryableEsClientError | IndexNotFound | TargetIndexHadWriteBlock,
Expand All @@ -551,6 +548,22 @@ export interface ReindexResponse {
taskId: string;
}

/**
* Wait for Elasticsearch to reindex all the changes.
*/
export const refreshIndex = (
client: ElasticsearchClient,
targetIndex: string
): TaskEither.TaskEither<RetryableEsClientError, { refreshed: boolean }> => () => {
return client.indices
.refresh({
index: targetIndex,
})
.then(() => {
return Either.right({ refreshed: true });
})
.catch(catchRetryableEsClientErrors);
};
/**
* Reindex documents from the `sourceIndex` into the `targetIndex`. Returns a
* task ID which can be tracked for progress.
Expand All @@ -569,7 +582,7 @@ export const reindex = (
* are no longer used. These saved objects will still be kept in the outdated
* index for backup purposes, but won't be available in the upgraded index.
*/
unusedTypesQuery: Option.Option<estypes.QueryContainer>
unusedTypesQuery: estypes.QueryContainer
): TaskEither.TaskEither<RetryableEsClientError, ReindexResponse> => () => {
return client
.reindex({
Expand All @@ -584,10 +597,7 @@ export const reindex = (
// Set reindex batch size
size: BATCH_SIZE,
// Exclude saved object types
query: Option.fold<estypes.QueryContainer, estypes.QueryContainer | undefined>(
() => undefined,
(query) => query
)(unusedTypesQuery),
query: unusedTypesQuery,
},
dest: {
index: targetIndex,
Expand Down Expand Up @@ -997,6 +1007,8 @@ interface SearchForOutdatedDocumentsOptions {
* Search for outdated saved object documents with the provided query. Will
* return one batch of documents. Searching should be repeated until no more
* outdated documents can be found.
*
* Used for testing only
*/
export const searchForOutdatedDocuments = (
client: ElasticsearchClient,
Expand Down
Loading

0 comments on commit c815e4c

Please sign in to comment.