Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use PiT for outdated document search #98797

Merged
merged 17 commits into from
May 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ export class DocumentMigrator implements VersionedTransformer {
}

/**
* Gets the latest version of each migratable property.
* Gets the latest version of each migrate-able property.
*
* @readonly
* @type {SavedObjectsMigrationVersion}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export interface FullIndexInfo {

// When migrating from the outdated index we use a read query which excludes
// saved objects which are no longer used. These saved objects will still be
// kept in the outdated index for backup purposes, but won't be availble in
// kept in the outdated index for backup purposes, but won't be available in
// the upgraded index.
export const excludeUnusedTypesQuery: estypes.QueryContainer = {
bool: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,13 +344,13 @@ const mockV2MigrationOptions = () => {

options.client.openPointInTime = jest
.fn()
.mockImplementationOnce(() =>
.mockImplementation(() =>
elasticsearchClientMock.createSuccessTransportRequestPromise({ id: 'pit_id' })
);

options.client.closePointInTime = jest
.fn()
.mockImplementationOnce(() =>
.mockImplementation(() =>
elasticsearchClientMock.createSuccessTransportRequestPromise({ succeeded: true })
);

Expand Down
19 changes: 16 additions & 3 deletions src/core/server/saved_objects/migrationsv2/actions/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ describe('actions', () => {
jest.clearAllMocks();
});

// Create a mock client that rejects all methods with a 503 statuscode
// Create a mock client that rejects all methods with a 503 status code
// response.
const retryableError = new EsErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
Expand Down Expand Up @@ -92,7 +92,7 @@ describe('actions', () => {

describe('readWithPit', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.readWithPit(client, 'pitId', Option.none, 10_000);
const task = Actions.readWithPit(client, 'pitId', { match_all: {} }, 10_000);
try {
await task();
} catch (e) {
Expand Down Expand Up @@ -134,7 +134,7 @@ describe('actions', () => {
'my_target_index',
Option.none,
false,
Option.none
{}
);
try {
await task();
Expand Down Expand Up @@ -263,4 +263,17 @@ describe('actions', () => {
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
});

describe('refreshIndex', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.refreshIndex(client, 'target_index');
try {
await task();
} catch (e) {
/** ignore */
}

expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
});
});
40 changes: 26 additions & 14 deletions src/core/server/saved_objects/migrationsv2/actions/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -452,21 +452,18 @@ export interface ReadWithPit {

/*
* Requests documents from the index using PIT mechanism.
* Filter unusedTypesToExclude documents out to exclude them from being migrated.
* */
export const readWithPit = (
client: ElasticsearchClient,
pitId: string,
/* When reading we use a source query to exclude saved objects types which
* are no longer used. These saved objects will still be kept in the outdated
* index for backup purposes, but won't be available in the upgraded index.
*/
unusedTypesQuery: Option.Option<estypes.QueryContainer>,
query: estypes.QueryContainer,
batchSize: number,
searchAfter?: number[]
searchAfter?: number[],
seqNoPrimaryTerm?: boolean
): TaskEither.TaskEither<RetryableEsClientError, ReadWithPit> => () => {
return client
.search<SavedObjectsRawDoc>({
seq_no_primary_term: seqNoPrimaryTerm,
body: {
// Sort fields are required to use searchAfter
sort: {
Expand All @@ -479,8 +476,7 @@ export const readWithPit = (
// Improve performance by not calculating the total number of hits
// matching the query.
track_total_hits: false,
// Exclude saved object types
query: Option.isSome(unusedTypesQuery) ? unusedTypesQuery.value : undefined,
query,
},
})
.then((response) => {
Expand Down Expand Up @@ -531,6 +527,7 @@ export const transformDocs = (
transformRawDocs: TransformRawDocs,
outdatedDocuments: SavedObjectsRawDoc[],
index: string,
// used for testing purposes only
refresh: estypes.Refresh
): TaskEither.TaskEither<
RetryableEsClientError | IndexNotFound | TargetIndexHadWriteBlock,
Expand All @@ -551,6 +548,22 @@ export interface ReindexResponse {
taskId: string;
}

/**
* Wait for Elasticsearch to reindex all the changes.
*/
export const refreshIndex = (
client: ElasticsearchClient,
targetIndex: string
): TaskEither.TaskEither<RetryableEsClientError, { refreshed: boolean }> => () => {
return client.indices
.refresh({
index: targetIndex,
})
.then(() => {
return Either.right({ refreshed: true });
})
.catch(catchRetryableEsClientErrors);
};
/**
* Reindex documents from the `sourceIndex` into the `targetIndex`. Returns a
* task ID which can be tracked for progress.
Expand All @@ -569,7 +582,7 @@ export const reindex = (
* are no longer used. These saved objects will still be kept in the outdated
* index for backup purposes, but won't be available in the upgraded index.
*/
unusedTypesQuery: Option.Option<estypes.QueryContainer>
unusedTypesQuery: estypes.QueryContainer
): TaskEither.TaskEither<RetryableEsClientError, ReindexResponse> => () => {
return client
.reindex({
Expand All @@ -584,10 +597,7 @@ export const reindex = (
// Set reindex batch size
size: BATCH_SIZE,
// Exclude saved object types
query: Option.fold<estypes.QueryContainer, estypes.QueryContainer | undefined>(
() => undefined,
(query) => query
)(unusedTypesQuery),
query: unusedTypesQuery,
},
dest: {
index: targetIndex,
Expand Down Expand Up @@ -997,6 +1007,8 @@ interface SearchForOutdatedDocumentsOptions {
* Search for outdated saved object documents with the provided query. Will
* return one batch of documents. Searching should be repeated until no more
* outdated documents can be found.
*
* Used for testing only
Copy link
Contributor Author

@mshustov mshustov May 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can move it to testing tools in a follow-up

*/
export const searchForOutdatedDocuments = (
client: ElasticsearchClient,
Expand Down
Loading