diff --git a/src/core/server/saved_objects/migrations/core/document_migrator.ts b/src/core/server/saved_objects/migrations/core/document_migrator.ts index cccd38bf5cc9e..8e538f6e12384 100644 --- a/src/core/server/saved_objects/migrations/core/document_migrator.ts +++ b/src/core/server/saved_objects/migrations/core/document_migrator.ts @@ -850,7 +850,8 @@ function assertNoDowngrades( * that we can later regenerate any inbound object references to match. * * @note This is only intended to be used when single-namespace object types are converted into multi-namespace object types. + * @internal */ -function deterministicallyRegenerateObjectId(namespace: string, type: string, id: string) { +export function deterministicallyRegenerateObjectId(namespace: string, type: string, id: string) { return uuidv5(`${namespace}:${type}:${id}`, uuidv5.DNS); // the uuidv5 namespace constant (uuidv5.DNS) is arbitrary } diff --git a/src/core/server/saved_objects/migrations/core/elastic_index.ts b/src/core/server/saved_objects/migrations/core/elastic_index.ts index 460aabbc77415..44dd60097f1cd 100644 --- a/src/core/server/saved_objects/migrations/core/elastic_index.ts +++ b/src/core/server/saved_objects/migrations/core/elastic_index.ts @@ -14,7 +14,6 @@ import _ from 'lodash'; import { estypes } from '@elastic/elasticsearch'; import { MigrationEsClient } from './migration_es_client'; -import { CountResponse, SearchResponse } from '../../../elasticsearch'; import { IndexMapping } from '../../mappings'; import { SavedObjectsMigrationVersion } from '../../types'; import { AliasAction, RawDoc } from './call_cluster'; @@ -95,11 +94,11 @@ export async function fetchInfo(client: MigrationEsClient, index: string): Promi * Creates a reader function that serves up batches of documents from the index. We aren't using * an async generator, as that feature currently breaks Kibana's tooling. * - * @param {CallCluster} callCluster - The elastic search connection - * @param {string} - The index to be read from + * @param client - The elastic search connection + * @param index - The index to be read from * @param {opts} - * @prop {number} batchSize - The number of documents to read at a time - * @prop {string} scrollDuration - The scroll duration used for scrolling through the index + * @prop batchSize - The number of documents to read at a time + * @prop scrollDuration - The scroll duration used for scrolling through the index */ export function reader( client: MigrationEsClient, @@ -111,11 +110,11 @@ export function reader( const nextBatch = () => scrollId !== undefined - ? client.scroll>({ + ? client.scroll({ scroll, scroll_id: scrollId, }) - : client.search>({ + : client.search({ body: { size: batchSize, query: excludeUnusedTypesQuery, @@ -143,10 +142,6 @@ export function reader( /** * Writes the specified documents to the index, throws an exception * if any of the documents fail to save. - * - * @param {CallCluster} callCluster - * @param {string} index - * @param {RawDoc[]} docs */ export async function write(client: MigrationEsClient, index: string, docs: RawDoc[]) { const { body } = await client.bulk({ @@ -184,9 +179,9 @@ export async function write(client: MigrationEsClient, index: string, docs: RawD * it performs the check *each* time it is called, rather than memoizing itself, * as this is used to determine if migrations are complete. * - * @param {CallCluster} callCluster - * @param {string} index - * @param {SavedObjectsMigrationVersion} migrationVersion - The latest versions of the migrations + * @param client - The connection to ElasticSearch + * @param index + * @param migrationVersion - The latest versions of the migrations */ export async function migrationsUpToDate( client: MigrationEsClient, @@ -207,7 +202,7 @@ export async function migrationsUpToDate( return true; } - const { body } = await client.count({ + const { body } = await client.count({ body: { query: { bool: { @@ -271,9 +266,9 @@ export async function createIndex( * is a concrete index. This function will reindex `alias` into a new index, delete the `alias` * index, and then create an alias `alias` that points to the new index. * - * @param {CallCluster} callCluster - The connection to ElasticSearch - * @param {FullIndexInfo} info - Information about the mappings and name of the new index - * @param {string} alias - The name of the index being converted to an alias + * @param client - The ElasticSearch connection + * @param info - Information about the mappings and name of the new index + * @param alias - The name of the index being converted to an alias */ export async function convertToAlias( client: MigrationEsClient, @@ -297,7 +292,7 @@ export async function convertToAlias( * alias, meaning that it will only point to one index at a time, so we * remove any other indices from the alias. * - * @param {CallCluster} callCluster + * @param {CallCluster} client * @param {string} index * @param {string} alias * @param {AliasAction[]} aliasActions - Optional actions to be added to the updateAliases call @@ -377,7 +372,7 @@ async function reindex( ) { // We poll instead of having the request wait for completion, as for large indices, // the request times out on the Elasticsearch side of things. We have a relatively tight - // polling interval, as the request is fairly efficent, and we don't + // polling interval, as the request is fairly efficient, and we don't // want to block index migrations for too long on this. const pollInterval = 250; const { body: reindexBody } = await client.reindex({ diff --git a/src/core/server/saved_objects/migrations/core/index_migrator.ts b/src/core/server/saved_objects/migrations/core/index_migrator.ts index 5bf5ae26f6a0a..472fb4f8d1a39 100644 --- a/src/core/server/saved_objects/migrations/core/index_migrator.ts +++ b/src/core/server/saved_objects/migrations/core/index_migrator.ts @@ -189,8 +189,7 @@ async function migrateSourceToDest(context: Context) { serializer, documentMigrator.migrateAndConvert, // @ts-expect-error @elastic/elasticsearch `Hit._id` may be a string | number in ES, but we always expect strings in the SO index. - docs, - log + docs ) ); } diff --git a/src/core/server/saved_objects/migrations/core/migrate_raw_docs.test.ts b/src/core/server/saved_objects/migrations/core/migrate_raw_docs.test.ts index 66750a8abf1db..45e73f7dfae30 100644 --- a/src/core/server/saved_objects/migrations/core/migrate_raw_docs.test.ts +++ b/src/core/server/saved_objects/migrations/core/migrate_raw_docs.test.ts @@ -11,7 +11,6 @@ import _ from 'lodash'; import { SavedObjectTypeRegistry } from '../../saved_objects_type_registry'; import { SavedObjectsSerializer } from '../../serialization'; import { migrateRawDocs } from './migrate_raw_docs'; -import { createSavedObjectsMigrationLoggerMock } from '../../migrations/mocks'; describe('migrateRawDocs', () => { test('converts raw docs to saved objects', async () => { @@ -24,8 +23,7 @@ describe('migrateRawDocs', () => { [ { _id: 'a:b', _source: { type: 'a', a: { name: 'AAA' } } }, { _id: 'c:d', _source: { type: 'c', c: { name: 'DDD' } } }, - ], - createSavedObjectsMigrationLoggerMock() + ] ); expect(result).toEqual([ @@ -59,7 +57,6 @@ describe('migrateRawDocs', () => { }); test('throws when encountering a corrupt saved object document', async () => { - const logger = createSavedObjectsMigrationLoggerMock(); const transform = jest.fn((doc: any) => [ set(_.cloneDeep(doc), 'attributes.name', 'TADA'), ]); @@ -69,8 +66,7 @@ describe('migrateRawDocs', () => { [ { _id: 'foo:b', _source: { type: 'a', a: { name: 'AAA' } } }, { _id: 'c:d', _source: { type: 'c', c: { name: 'DDD' } } }, - ], - logger + ] ); expect(result).rejects.toMatchInlineSnapshot( @@ -88,8 +84,7 @@ describe('migrateRawDocs', () => { const result = await migrateRawDocs( new SavedObjectsSerializer(new SavedObjectTypeRegistry()), transform, - [{ _id: 'a:b', _source: { type: 'a', a: { name: 'AAA' } } }], - createSavedObjectsMigrationLoggerMock() + [{ _id: 'a:b', _source: { type: 'a', a: { name: 'AAA' } } }] ); expect(result).toEqual([ @@ -119,12 +114,9 @@ describe('migrateRawDocs', () => { throw new Error('error during transform'); }); await expect( - migrateRawDocs( - new SavedObjectsSerializer(new SavedObjectTypeRegistry()), - transform, - [{ _id: 'a:b', _source: { type: 'a', a: { name: 'AAA' } } }], - createSavedObjectsMigrationLoggerMock() - ) + migrateRawDocs(new SavedObjectsSerializer(new SavedObjectTypeRegistry()), transform, [ + { _id: 'a:b', _source: { type: 'a', a: { name: 'AAA' } } }, + ]) ).rejects.toThrowErrorMatchingInlineSnapshot(`"error during transform"`); }); }); diff --git a/src/core/server/saved_objects/migrations/core/migrate_raw_docs.ts b/src/core/server/saved_objects/migrations/core/migrate_raw_docs.ts index e75f29e54c876..102ec81646a92 100644 --- a/src/core/server/saved_objects/migrations/core/migrate_raw_docs.ts +++ b/src/core/server/saved_objects/migrations/core/migrate_raw_docs.ts @@ -16,7 +16,6 @@ import { SavedObjectUnsanitizedDoc, } from '../../serialization'; import { MigrateAndConvertFn } from './document_migrator'; -import { SavedObjectsMigrationLogger } from '.'; /** * Error thrown when saved object migrations encounter a corrupt saved object. @@ -46,8 +45,7 @@ export class CorruptSavedObjectError extends Error { export async function migrateRawDocs( serializer: SavedObjectsSerializer, migrateDoc: MigrateAndConvertFn, - rawDocs: SavedObjectsRawDoc[], - log: SavedObjectsMigrationLogger + rawDocs: SavedObjectsRawDoc[] ): Promise { const migrateDocWithoutBlocking = transformNonBlocking(migrateDoc); const processedDocs = []; diff --git a/src/core/server/saved_objects/migrations/kibana/kibana_migrator.test.ts b/src/core/server/saved_objects/migrations/kibana/kibana_migrator.test.ts index 221e78e3e12e2..c6dfd2c2d1809 100644 --- a/src/core/server/saved_objects/migrations/kibana/kibana_migrator.test.ts +++ b/src/core/server/saved_objects/migrations/kibana/kibana_migrator.test.ts @@ -229,48 +229,6 @@ describe('KibanaMigrator', () => { jest.clearAllMocks(); }); - it('creates a V2 migrator that initializes a new index and migrates an existing index', async () => { - const options = mockV2MigrationOptions(); - const migrator = new KibanaMigrator(options); - const migratorStatus = migrator.getStatus$().pipe(take(3)).toPromise(); - migrator.prepareMigrations(); - await migrator.runMigrations(); - - // Basic assertions that we're creating and reindexing the expected indices - expect(options.client.indices.create).toHaveBeenCalledTimes(3); - expect(options.client.indices.create.mock.calls).toEqual( - expect.arrayContaining([ - // LEGACY_CREATE_REINDEX_TARGET - expect.arrayContaining([expect.objectContaining({ index: '.my-index_pre8.2.3_001' })]), - // CREATE_REINDEX_TEMP - expect.arrayContaining([ - expect.objectContaining({ index: '.my-index_8.2.3_reindex_temp' }), - ]), - // CREATE_NEW_TARGET - expect.arrayContaining([expect.objectContaining({ index: 'other-index_8.2.3_001' })]), - ]) - ); - // LEGACY_REINDEX - expect(options.client.reindex.mock.calls[0][0]).toEqual( - expect.objectContaining({ - body: expect.objectContaining({ - source: expect.objectContaining({ index: '.my-index' }), - dest: expect.objectContaining({ index: '.my-index_pre8.2.3_001' }), - }), - }) - ); - // REINDEX_SOURCE_TO_TEMP - expect(options.client.reindex.mock.calls[1][0]).toEqual( - expect.objectContaining({ - body: expect.objectContaining({ - source: expect.objectContaining({ index: '.my-index_pre8.2.3_001' }), - dest: expect.objectContaining({ index: '.my-index_8.2.3_reindex_temp' }), - }), - }) - ); - const { status } = await migratorStatus; - return expect(status).toEqual('completed'); - }); it('emits results on getMigratorResult$()', async () => { const options = mockV2MigrationOptions(); const migrator = new KibanaMigrator(options); @@ -378,6 +336,24 @@ const mockV2MigrationOptions = () => { } as estypes.GetTaskResponse) ); + options.client.search = jest + .fn() + .mockImplementation(() => + elasticsearchClientMock.createSuccessTransportRequestPromise({ hits: { hits: [] } }) + ); + + options.client.openPointInTime = jest + .fn() + .mockImplementationOnce(() => + elasticsearchClientMock.createSuccessTransportRequestPromise({ id: 'pit_id' }) + ); + + options.client.closePointInTime = jest + .fn() + .mockImplementationOnce(() => + elasticsearchClientMock.createSuccessTransportRequestPromise({ succeeded: true }) + ); + return options; }; diff --git a/src/core/server/saved_objects/migrations/kibana/kibana_migrator.ts b/src/core/server/saved_objects/migrations/kibana/kibana_migrator.ts index 29852f8ac6445..58dcae7309eea 100644 --- a/src/core/server/saved_objects/migrations/kibana/kibana_migrator.ts +++ b/src/core/server/saved_objects/migrations/kibana/kibana_migrator.ts @@ -36,7 +36,6 @@ import { ISavedObjectTypeRegistry } from '../../saved_objects_type_registry'; import { SavedObjectsType } from '../../types'; import { runResilientMigrator } from '../../migrationsv2'; import { migrateRawDocs } from '../core/migrate_raw_docs'; -import { MigrationLogger } from '../core/migration_logger'; export interface KibanaMigratorOptions { client: ElasticsearchClient; @@ -185,12 +184,7 @@ export class KibanaMigrator { logger: this.log, preMigrationScript: indexMap[index].script, transformRawDocs: (rawDocs: SavedObjectsRawDoc[]) => - migrateRawDocs( - this.serializer, - this.documentMigrator.migrateAndConvert, - rawDocs, - new MigrationLogger(this.log) - ), + migrateRawDocs(this.serializer, this.documentMigrator.migrateAndConvert, rawDocs), migrationVersionPerType: this.documentMigrator.migrationVersion, indexPrefix: index, migrationsConfig: this.soMigrationsConfig, diff --git a/src/core/server/saved_objects/migrationsv2/actions/index.test.ts b/src/core/server/saved_objects/migrationsv2/actions/index.test.ts index bee17f42d7bdb..b144905cf01ad 100644 --- a/src/core/server/saved_objects/migrationsv2/actions/index.test.ts +++ b/src/core/server/saved_objects/migrationsv2/actions/index.test.ts @@ -78,6 +78,54 @@ describe('actions', () => { }); }); + describe('openPit', () => { + it('calls catchRetryableEsClientErrors when the promise rejects', async () => { + const task = Actions.openPit(client, 'my_index'); + try { + await task(); + } catch (e) { + /** ignore */ + } + expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); + }); + }); + + describe('readWithPit', () => { + it('calls catchRetryableEsClientErrors when the promise rejects', async () => { + const task = Actions.readWithPit(client, 'pitId', Option.none, 10_000); + try { + await task(); + } catch (e) { + /** ignore */ + } + expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); + }); + }); + + describe('closePit', () => { + it('calls catchRetryableEsClientErrors when the promise rejects', async () => { + const task = Actions.closePit(client, 'pitId'); + try { + await task(); + } catch (e) { + /** ignore */ + } + expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); + }); + }); + + describe('transformDocs', () => { + it('calls catchRetryableEsClientErrors when the promise rejects', async () => { + const task = Actions.transformDocs(client, () => Promise.resolve([]), [], 'my_index', false); + try { + await task(); + } catch (e) { + /** ignore */ + } + expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); + }); + }); + describe('reindex', () => { it('calls catchRetryableEsClientErrors when the promise rejects', async () => { const task = Actions.reindex( @@ -205,7 +253,7 @@ describe('actions', () => { describe('bulkOverwriteTransformedDocuments', () => { it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = Actions.bulkOverwriteTransformedDocuments(client, 'new_index', []); + const task = Actions.bulkOverwriteTransformedDocuments(client, 'new_index', [], 'wait_for'); try { await task(); } catch (e) { diff --git a/src/core/server/saved_objects/migrationsv2/actions/index.ts b/src/core/server/saved_objects/migrationsv2/actions/index.ts index 02d3f8e21a510..049cdc41b7527 100644 --- a/src/core/server/saved_objects/migrationsv2/actions/index.ts +++ b/src/core/server/saved_objects/migrationsv2/actions/index.ts @@ -16,7 +16,8 @@ import { pipe } from 'fp-ts/lib/pipeable'; import { flow } from 'fp-ts/lib/function'; import { ElasticsearchClient } from '../../../elasticsearch'; import { IndexMapping } from '../../mappings'; -import { SavedObjectsRawDoc, SavedObjectsRawDocSource } from '../../serialization'; +import type { SavedObjectsRawDoc, SavedObjectsRawDocSource } from '../../serialization'; +import type { TransformRawDocs } from '../types'; import { catchRetryableEsClientErrors, RetryableEsClientError, @@ -419,6 +420,133 @@ export const pickupUpdatedMappings = ( .catch(catchRetryableEsClientErrors); }; +/** @internal */ +export interface OpenPitResponse { + pitId: string; +} + +// how long ES should keep PIT alive +const pitKeepAlive = '10m'; +/* + * Creates a lightweight view of data when the request has been initiated. + * See https://www.elastic.co/guide/en/elasticsearch/reference/current/point-in-time-api.html + * */ +export const openPit = ( + client: ElasticsearchClient, + index: string +): TaskEither.TaskEither => () => { + return client + .openPointInTime({ + index, + keep_alive: pitKeepAlive, + }) + .then((response) => Either.right({ pitId: response.body.id })) + .catch(catchRetryableEsClientErrors); +}; + +/** @internal */ +export interface ReadWithPit { + outdatedDocuments: SavedObjectsRawDoc[]; + readonly lastHitSortValue: number[] | undefined; +} + +/* + * Requests documents from the index using PIT mechanism. + * Filter unusedTypesToExclude documents out to exclude them from being migrated. + * */ +export const readWithPit = ( + client: ElasticsearchClient, + pitId: string, + /* When reading we use a source query to exclude saved objects types which + * are no longer used. These saved objects will still be kept in the outdated + * index for backup purposes, but won't be available in the upgraded index. + */ + unusedTypesQuery: Option.Option, + batchSize: number, + searchAfter?: number[] +): TaskEither.TaskEither => () => { + return client + .search({ + body: { + // Sort fields are required to use searchAfter + sort: { + // the most efficient option as order is not important for the migration + _shard_doc: { order: 'asc' }, + }, + pit: { id: pitId, keep_alive: pitKeepAlive }, + size: batchSize, + search_after: searchAfter, + // Improve performance by not calculating the total number of hits + // matching the query. + track_total_hits: false, + // Exclude saved object types + query: Option.isSome(unusedTypesQuery) ? unusedTypesQuery.value : undefined, + }, + }) + .then((response) => { + const hits = response.body.hits.hits; + + if (hits.length > 0) { + return Either.right({ + // @ts-expect-error @elastic/elasticsearch _source is optional + outdatedDocuments: hits as SavedObjectsRawDoc[], + lastHitSortValue: hits[hits.length - 1].sort as number[], + }); + } + + return Either.right({ + outdatedDocuments: [], + lastHitSortValue: undefined, + }); + }) + .catch(catchRetryableEsClientErrors); +}; + +/* + * Closes PIT. + * See https://www.elastic.co/guide/en/elasticsearch/reference/current/point-in-time-api.html + * */ +export const closePit = ( + client: ElasticsearchClient, + pitId: string +): TaskEither.TaskEither => () => { + return client + .closePointInTime({ + body: { id: pitId }, + }) + .then((response) => { + if (!response.body.succeeded) { + throw new Error(`Failed to close PointInTime with id: ${pitId}`); + } + return Either.right({}); + }) + .catch(catchRetryableEsClientErrors); +}; + +/* + * Transform outdated docs and write them to the index. + * */ +export const transformDocs = ( + client: ElasticsearchClient, + transformRawDocs: TransformRawDocs, + outdatedDocuments: SavedObjectsRawDoc[], + index: string, + refresh: estypes.Refresh +): TaskEither.TaskEither< + RetryableEsClientError | IndexNotFound | TargetIndexHadWriteBlock, + 'bulk_index_succeeded' +> => + pipe( + TaskEither.tryCatch( + () => transformRawDocs(outdatedDocuments), + (e) => { + throw e; + } + ), + TaskEither.chain((docs) => bulkOverwriteTransformedDocuments(client, index, docs, refresh)) + ); + +/** @internal */ export interface ReindexResponse { taskId: string; } @@ -489,10 +617,12 @@ interface WaitForReindexTaskFailure { readonly cause: { type: string; reason: string }; } +/** @internal */ export interface TargetIndexHadWriteBlock { type: 'target_index_had_write_block'; } +/** @internal */ export interface IncompatibleMappingException { type: 'incompatible_mapping_exception'; } @@ -605,14 +735,17 @@ export const waitForPickupUpdatedMappingsTask = flow( ) ); +/** @internal */ export interface AliasNotFound { type: 'alias_not_found_exception'; } +/** @internal */ export interface RemoveIndexNotAConcreteIndex { type: 'remove_index_not_a_concrete_index'; } +/** @internal */ export type AliasAction = | { remove_index: { index: string } } | { remove: { index: string; alias: string; must_exist: boolean } } @@ -679,11 +812,19 @@ export const updateAliases = ( .catch(catchRetryableEsClientErrors); }; +/** @internal */ export interface AcknowledgeResponse { acknowledged: boolean; shardsAcknowledged: boolean; } +function aliasArrayToRecord(aliases: string[]): Record { + const result: Record = {}; + for (const alias of aliases) { + result[alias] = {}; + } + return result; +} /** * Creates an index with the given mappings * @@ -698,16 +839,13 @@ export const createIndex = ( client: ElasticsearchClient, indexName: string, mappings: IndexMapping, - aliases?: string[] + aliases: string[] = [] ): TaskEither.TaskEither => { const createIndexTask: TaskEither.TaskEither< RetryableEsClientError, AcknowledgeResponse > = () => { - const aliasesObject = (aliases ?? []).reduce((acc, alias) => { - acc[alias] = {}; - return acc; - }, {} as Record); + const aliasesObject = aliasArrayToRecord(aliases); return client.indices .create( @@ -792,6 +930,7 @@ export const createIndex = ( ); }; +/** @internal */ export interface UpdateAndPickupMappingsResponse { taskId: string; } @@ -842,6 +981,8 @@ export const updateAndPickupMappings = ( }) ); }; + +/** @internal */ export interface SearchResponse { outdatedDocuments: SavedObjectsRawDoc[]; } @@ -906,7 +1047,8 @@ export const searchForOutdatedDocuments = ( export const bulkOverwriteTransformedDocuments = ( client: ElasticsearchClient, index: string, - transformedDocs: SavedObjectsRawDoc[] + transformedDocs: SavedObjectsRawDoc[], + refresh: estypes.Refresh ): TaskEither.TaskEither => () => { return client .bulk({ @@ -919,15 +1061,7 @@ export const bulkOverwriteTransformedDocuments = ( // system indices puts in place a hard control. require_alias: false, wait_for_active_shards: WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE, - // Wait for a refresh to happen before returning. This ensures that when - // this Kibana instance searches for outdated documents, it won't find - // documents that were already transformed by itself or another Kibna - // instance. However, this causes each OUTDATED_DOCUMENTS_SEARCH -> - // OUTDATED_DOCUMENTS_TRANSFORM cycle to take 1s so when batches are - // small performance will become a lot worse. - // The alternative is to use a search_after with either a tie_breaker - // field or using a Point In Time as a cursor to go through all documents. - refresh: 'wait_for', + refresh, filter_path: ['items.*.error'], body: transformedDocs.flatMap((doc) => { return [ diff --git a/src/core/server/saved_objects/migrationsv2/index.ts b/src/core/server/saved_objects/migrationsv2/index.ts index 6e65a2e700fd3..25816c7fd14c6 100644 --- a/src/core/server/saved_objects/migrationsv2/index.ts +++ b/src/core/server/saved_objects/migrationsv2/index.ts @@ -9,9 +9,10 @@ import { ElasticsearchClient } from '../../elasticsearch'; import { IndexMapping } from '../mappings'; import { Logger } from '../../logging'; -import { SavedObjectsMigrationVersion } from '../types'; +import type { SavedObjectsMigrationVersion } from '../types'; +import type { TransformRawDocs } from './types'; import { MigrationResult } from '../migrations/core'; -import { next, TransformRawDocs } from './next'; +import { next } from './next'; import { createInitialState, model } from './model'; import { migrationStateActionMachine } from './migrations_state_action_machine'; import { SavedObjectsMigrationConfigType } from '../saved_objects_config'; @@ -55,5 +56,6 @@ export async function runResilientMigrator({ logger, next: next(client, transformRawDocs), model, + client, }); } diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/.gitignore b/src/core/server/saved_objects/migrationsv2/integration_tests/.gitignore index 57208badcc680..397b4a7624e35 100644 --- a/src/core/server/saved_objects/migrationsv2/integration_tests/.gitignore +++ b/src/core/server/saved_objects/migrationsv2/integration_tests/.gitignore @@ -1 +1 @@ -migration_test_kibana.log +*.log diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/actions.test.ts b/src/core/server/saved_objects/migrationsv2/integration_tests/actions.test.ts index 3905044f04e2f..b31f20950ae77 100644 --- a/src/core/server/saved_objects/migrationsv2/integration_tests/actions.test.ts +++ b/src/core/server/saved_objects/migrationsv2/integration_tests/actions.test.ts @@ -14,9 +14,14 @@ import { SavedObjectsRawDoc } from '../../serialization'; import { bulkOverwriteTransformedDocuments, cloneIndex, + closePit, createIndex, fetchIndices, + openPit, + OpenPitResponse, reindex, + readWithPit, + ReadWithPit, searchForOutdatedDocuments, SearchResponse, setWriteBlock, @@ -30,6 +35,7 @@ import { UpdateAndPickupMappingsResponse, verifyReindex, removeWriteBlock, + transformDocs, waitForIndexStatusYellow, } from '../actions'; import * as Either from 'fp-ts/lib/Either'; @@ -70,14 +76,20 @@ describe('migration actions', () => { { _source: { title: 'saved object 4', type: 'another_unused_type' } }, { _source: { title: 'f-agent-event 5', type: 'f_agent_event' } }, ] as unknown) as SavedObjectsRawDoc[]; - await bulkOverwriteTransformedDocuments(client, 'existing_index_with_docs', sourceDocs)(); + await bulkOverwriteTransformedDocuments( + client, + 'existing_index_with_docs', + sourceDocs, + 'wait_for' + )(); await createIndex(client, 'existing_index_2', { properties: {} })(); await createIndex(client, 'existing_index_with_write_block', { properties: {} })(); await bulkOverwriteTransformedDocuments( client, 'existing_index_with_write_block', - sourceDocs + sourceDocs, + 'wait_for' )(); await setWriteBlock(client, 'existing_index_with_write_block')(); await updateAliases(client, [ @@ -155,7 +167,12 @@ describe('migration actions', () => { { _source: { title: 'doc 4' } }, ] as unknown) as SavedObjectsRawDoc[]; await expect( - bulkOverwriteTransformedDocuments(client, 'new_index_without_write_block', sourceDocs)() + bulkOverwriteTransformedDocuments( + client, + 'new_index_without_write_block', + sourceDocs, + 'wait_for' + )() ).rejects.toMatchObject(expect.anything()); }); it('resolves left index_not_found_exception when the index does not exist', async () => { @@ -265,14 +282,14 @@ describe('migration actions', () => { const task = cloneIndex(client, 'existing_index_with_write_block', 'clone_target_1'); expect.assertions(1); await expect(task()).resolves.toMatchInlineSnapshot(` - Object { - "_tag": "Right", - "right": Object { - "acknowledged": true, - "shardsAcknowledged": true, - }, - } - `); + Object { + "_tag": "Right", + "right": Object { + "acknowledged": true, + "shardsAcknowledged": true, + }, + } + `); }); it('resolves right after waiting for index status to be yellow if clone target already existed', async () => { expect.assertions(2); @@ -331,14 +348,14 @@ describe('migration actions', () => { expect.assertions(1); const task = cloneIndex(client, 'no_such_index', 'clone_target_3'); await expect(task()).resolves.toMatchInlineSnapshot(` - Object { - "_tag": "Left", - "left": Object { - "index": "no_such_index", - "type": "index_not_found_exception", - }, - } - `); + Object { + "_tag": "Left", + "left": Object { + "index": "no_such_index", + "type": "index_not_found_exception", + }, + } + `); }); it('resolves left with a retryable_es_client_error if clone target already exists but takes longer than the specified timeout before turning yellow', async () => { // Create a red index @@ -406,13 +423,13 @@ describe('migration actions', () => { targetIndex: 'reindex_target', outdatedDocumentsQuery: undefined, })()) as Either.Right).right.outdatedDocuments; - expect(results.map((doc) => doc._source.title)).toMatchInlineSnapshot(` + expect(results.map((doc) => doc._source.title).sort()).toMatchInlineSnapshot(` Array [ "doc 1", "doc 2", "doc 3", - "saved object 4", "f-agent-event 5", + "saved object 4", ] `); }); @@ -433,18 +450,18 @@ describe('migration actions', () => { )()) as Either.Right; const task = waitForReindexTask(client, res.right.taskId, '10s'); await expect(task()).resolves.toMatchInlineSnapshot(` - Object { - "_tag": "Right", - "right": "reindex_succeeded", - } - `); + Object { + "_tag": "Right", + "right": "reindex_succeeded", + } + `); const results = ((await searchForOutdatedDocuments(client, { batchSize: 1000, targetIndex: 'reindex_target_excluded_docs', outdatedDocumentsQuery: undefined, })()) as Either.Right).right.outdatedDocuments; - expect(results.map((doc) => doc._source.title)).toMatchInlineSnapshot(` + expect(results.map((doc) => doc._source.title).sort()).toMatchInlineSnapshot(` Array [ "doc 1", "doc 2", @@ -474,13 +491,13 @@ describe('migration actions', () => { targetIndex: 'reindex_target_2', outdatedDocumentsQuery: undefined, })()) as Either.Right).right.outdatedDocuments; - expect(results.map((doc) => doc._source.title)).toMatchInlineSnapshot(` + expect(results.map((doc) => doc._source.title).sort()).toMatchInlineSnapshot(` Array [ "doc 1_updated", "doc 2_updated", "doc 3_updated", - "saved object 4_updated", "f-agent-event 5_updated", + "saved object 4_updated", ] `); }); @@ -526,13 +543,13 @@ describe('migration actions', () => { targetIndex: 'reindex_target_3', outdatedDocumentsQuery: undefined, })()) as Either.Right).right.outdatedDocuments; - expect(results.map((doc) => doc._source.title)).toMatchInlineSnapshot(` + expect(results.map((doc) => doc._source.title).sort()).toMatchInlineSnapshot(` Array [ "doc 1_updated", "doc 2_updated", "doc 3_updated", - "saved object 4_updated", "f-agent-event 5_updated", + "saved object 4_updated", ] `); }); @@ -551,7 +568,7 @@ describe('migration actions', () => { _id, _source, })); - await bulkOverwriteTransformedDocuments(client, 'reindex_target_4', sourceDocs)(); + await bulkOverwriteTransformedDocuments(client, 'reindex_target_4', sourceDocs, 'wait_for')(); // Now do a real reindex const res = (await reindex( @@ -576,13 +593,13 @@ describe('migration actions', () => { targetIndex: 'reindex_target_4', outdatedDocumentsQuery: undefined, })()) as Either.Right).right.outdatedDocuments; - expect(results.map((doc) => doc._source.title)).toMatchInlineSnapshot(` + expect(results.map((doc) => doc._source.title).sort()).toMatchInlineSnapshot(` Array [ "doc 1", "doc 2", "doc 3_updated", - "saved object 4_updated", "f-agent-event 5_updated", + "saved object 4_updated", ] `); }); @@ -790,9 +807,169 @@ describe('migration actions', () => { ); task = verifyReindex(client, 'existing_index_2', 'no_such_index'); - await expect(task()).rejects.toMatchInlineSnapshot( - `[ResponseError: index_not_found_exception]` + await expect(task()).rejects.toThrow('index_not_found_exception'); + }); + }); + + describe('openPit', () => { + it('opens PointInTime for an index', async () => { + const openPitTask = openPit(client, 'existing_index_with_docs'); + const pitResponse = (await openPitTask()) as Either.Right; + + expect(pitResponse.right.pitId).toEqual(expect.any(String)); + + const searchResponse = await client.search({ + body: { + pit: { id: pitResponse.right.pitId }, + }, + }); + + await expect(searchResponse.body.hits.hits.length).toBeGreaterThan(0); + }); + it('rejects if index does not exist', async () => { + const openPitTask = openPit(client, 'no_such_index'); + await expect(openPitTask()).rejects.toThrow('index_not_found_exception'); + }); + }); + + describe('readWithPit', () => { + it('requests documents from an index using given PIT', async () => { + const openPitTask = openPit(client, 'existing_index_with_docs'); + const pitResponse = (await openPitTask()) as Either.Right; + + const readWithPitTask = readWithPit( + client, + pitResponse.right.pitId, + Option.none, + 1000, + undefined + ); + const docsResponse = (await readWithPitTask()) as Either.Right; + + await expect(docsResponse.right.outdatedDocuments.length).toBe(5); + }); + + it('requests the batchSize of documents from an index', async () => { + const openPitTask = openPit(client, 'existing_index_with_docs'); + const pitResponse = (await openPitTask()) as Either.Right; + + const readWithPitTask = readWithPit( + client, + pitResponse.right.pitId, + Option.none, + 3, + undefined ); + const docsResponse = (await readWithPitTask()) as Either.Right; + + await expect(docsResponse.right.outdatedDocuments.length).toBe(3); + }); + + it('it excludes documents not matching the provided "unusedTypesQuery"', async () => { + const openPitTask = openPit(client, 'existing_index_with_docs'); + const pitResponse = (await openPitTask()) as Either.Right; + + const readWithPitTask = readWithPit( + client, + pitResponse.right.pitId, + Option.some({ + bool: { + must_not: [ + { + term: { + type: 'f_agent_event', + }, + }, + { + term: { + type: 'another_unused_type', + }, + }, + ], + }, + }), + 1000, + undefined + ); + + const docsResponse = (await readWithPitTask()) as Either.Right; + + expect(docsResponse.right.outdatedDocuments.map((doc) => doc._source.title).sort()) + .toMatchInlineSnapshot(` + Array [ + "doc 1", + "doc 2", + "doc 3", + ] + `); + }); + + it('rejects if PIT does not exist', async () => { + const readWithPitTask = readWithPit(client, 'no_such_pit', Option.none, 1000, undefined); + await expect(readWithPitTask()).rejects.toThrow('illegal_argument_exception'); + }); + }); + + describe('closePit', () => { + it('closes PointInTime', async () => { + const openPitTask = openPit(client, 'existing_index_with_docs'); + const pitResponse = (await openPitTask()) as Either.Right; + + const pitId = pitResponse.right.pitId; + await closePit(client, pitId)(); + + const searchTask = client.search({ + body: { + pit: { id: pitId }, + }, + }); + + await expect(searchTask).rejects.toThrow('search_phase_execution_exception'); + }); + + it('rejects if PIT does not exist', async () => { + const closePitTask = closePit(client, 'no_such_pit'); + await expect(closePitTask()).rejects.toThrow('illegal_argument_exception'); + }); + }); + + describe('transformDocs', () => { + it('applies "transformRawDocs" and writes result into an index', async () => { + const index = 'transform_docs_index'; + const originalDocs = [ + { _id: 'foo:1', _source: { type: 'dashboard', value: 1 } }, + { _id: 'foo:2', _source: { type: 'dashboard', value: 2 } }, + ]; + + const createIndexTask = createIndex(client, index, { + dynamic: true, + properties: {}, + }); + await createIndexTask(); + + async function tranformRawDocs(docs: SavedObjectsRawDoc[]): Promise { + for (const doc of docs) { + doc._source.value += 1; + } + return docs; + } + + const transformTask = transformDocs(client, tranformRawDocs, originalDocs, index, 'wait_for'); + + const result = (await transformTask()) as Either.Right<'bulk_index_succeeded'>; + + expect(result.right).toBe('bulk_index_succeeded'); + + const { body } = await client.search<{ value: number }>({ + index, + }); + const hits = body.hits.hits; + + const foo1 = hits.find((h) => h._id === 'foo:1'); + expect(foo1?._source?.value).toBe(2); + + const foo2 = hits.find((h) => h._id === 'foo:2'); + expect(foo2?._source?.value).toBe(3); }); }); @@ -919,7 +1096,8 @@ describe('migration actions', () => { await bulkOverwriteTransformedDocuments( client, 'existing_index_without_mappings', - sourceDocs + sourceDocs, + 'wait_for' )(); // Assert that we can't search over the unmapped fields of the document @@ -1147,7 +1325,13 @@ describe('migration actions', () => { { _source: { title: 'doc 6' } }, { _source: { title: 'doc 7' } }, ] as unknown) as SavedObjectsRawDoc[]; - const task = bulkOverwriteTransformedDocuments(client, 'existing_index_with_docs', newDocs); + const task = bulkOverwriteTransformedDocuments( + client, + 'existing_index_with_docs', + newDocs, + 'wait_for' + ); + await expect(task()).resolves.toMatchInlineSnapshot(` Object { "_tag": "Right", @@ -1162,10 +1346,12 @@ describe('migration actions', () => { outdatedDocumentsQuery: undefined, })()) as Either.Right).right.outdatedDocuments; - const task = bulkOverwriteTransformedDocuments(client, 'existing_index_with_docs', [ - ...existingDocs, - ({ _source: { title: 'doc 8' } } as unknown) as SavedObjectsRawDoc, - ]); + const task = bulkOverwriteTransformedDocuments( + client, + 'existing_index_with_docs', + [...existingDocs, ({ _source: { title: 'doc 8' } } as unknown) as SavedObjectsRawDoc], + 'wait_for' + ); await expect(task()).resolves.toMatchInlineSnapshot(` Object { "_tag": "Right", @@ -1180,7 +1366,12 @@ describe('migration actions', () => { { _source: { title: 'doc 7' } }, ] as unknown) as SavedObjectsRawDoc[]; await expect( - bulkOverwriteTransformedDocuments(client, 'existing_index_with_write_block', newDocs)() + bulkOverwriteTransformedDocuments( + client, + 'existing_index_with_write_block', + newDocs, + 'wait_for' + )() ).rejects.toMatchObject(expect.anything()); }); }); diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/archives/7.13.0_so_with_multiple_namespaces.zip b/src/core/server/saved_objects/migrationsv2/integration_tests/archives/7.13.0_so_with_multiple_namespaces.zip new file mode 100644 index 0000000000000..a92211c16c559 Binary files /dev/null and b/src/core/server/saved_objects/migrationsv2/integration_tests/archives/7.13.0_so_with_multiple_namespaces.zip differ diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/archives/7.13.0_with_corrupted_so.zip b/src/core/server/saved_objects/migrationsv2/integration_tests/archives/7.13.0_with_corrupted_so.zip new file mode 100644 index 0000000000000..c6c89ac2879b2 Binary files /dev/null and b/src/core/server/saved_objects/migrationsv2/integration_tests/archives/7.13.0_with_corrupted_so.zip differ diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/cleanup.test.ts b/src/core/server/saved_objects/migrationsv2/integration_tests/cleanup.test.ts new file mode 100644 index 0000000000000..48bb282da18f6 --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/integration_tests/cleanup.test.ts @@ -0,0 +1,131 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import Path from 'path'; +import Fs from 'fs'; +import Util from 'util'; +import JSON5 from 'json5'; +import * as kbnTestServer from '../../../../test_helpers/kbn_server'; +import type { Root } from '../../../root'; + +const logFilePath = Path.join(__dirname, 'cleanup_test.log'); + +const asyncUnlink = Util.promisify(Fs.unlink); +const asyncReadFile = Util.promisify(Fs.readFile); +async function removeLogFile() { + // ignore errors if it doesn't exist + await asyncUnlink(logFilePath).catch(() => void 0); +} + +function createRoot() { + return kbnTestServer.createRootWithCorePlugins( + { + migrations: { + skip: false, + enableV2: true, + }, + logging: { + appenders: { + file: { + type: 'file', + fileName: logFilePath, + layout: { + type: 'json', + }, + }, + }, + loggers: [ + { + name: 'root', + appenders: ['file'], + }, + ], + }, + }, + { + oss: true, + } + ); +} + +describe('migration v2', () => { + let esServer: kbnTestServer.TestElasticsearchUtils; + let root: Root; + + beforeAll(async () => { + await removeLogFile(); + }); + + afterAll(async () => { + if (root) { + await root.shutdown(); + } + if (esServer) { + await esServer.stop(); + } + + await new Promise((resolve) => setTimeout(resolve, 10000)); + }); + + it('clean ups if migration fails', async () => { + const { startES } = kbnTestServer.createTestServers({ + adjustTimeout: (t: number) => jest.setTimeout(t), + settings: { + es: { + license: 'trial', + // original SO: + // { + // _index: '.kibana_7.13.0_001', + // _type: '_doc', + // _id: 'index-pattern:test_index*', + // _version: 1, + // result: 'created', + // _shards: { total: 2, successful: 1, failed: 0 }, + // _seq_no: 0, + // _primary_term: 1 + // } + dataArchive: Path.join(__dirname, 'archives', '7.13.0_with_corrupted_so.zip'), + }, + }, + }); + + root = createRoot(); + + esServer = await startES(); + await root.setup(); + + await expect(root.start()).rejects.toThrow( + /Unable to migrate the corrupt saved object document with _id: 'index-pattern:test_index\*'/ + ); + + const logFileContent = await asyncReadFile(logFilePath, 'utf-8'); + const records = logFileContent + .split('\n') + .filter(Boolean) + .map((str) => JSON5.parse(str)); + + const logRecordWithPit = records.find( + (rec) => rec.message === '[.kibana] REINDEX_SOURCE_TO_TEMP_OPEN_PIT RESPONSE' + ); + + expect(logRecordWithPit).toBeTruthy(); + + const pitId = logRecordWithPit.right.pitId; + expect(pitId).toBeTruthy(); + + const client = esServer.es.getClient(); + await expect( + client.search({ + body: { + pit: { id: pitId }, + }, + }) + // throws an exception that cannot search with closed PIT + ).rejects.toThrow(/search_phase_execution_exception/); + }); +}); diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/migration.test.ts b/src/core/server/saved_objects/migrationsv2/integration_tests/migration.test.ts index 7e4930b48576b..c85a22348b5f5 100644 --- a/src/core/server/saved_objects/migrationsv2/integration_tests/migration.test.ts +++ b/src/core/server/saved_objects/migrationsv2/integration_tests/migration.test.ts @@ -51,6 +51,8 @@ describe('migration v2', () => { migrations: { skip: false, enableV2: true, + // There are 53 docs in fixtures. Batch size configured to enforce 3 migration steps. + batchSize: 20, }, logging: { appenders: { diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/rewriting_id.test.ts b/src/core/server/saved_objects/migrationsv2/integration_tests/rewriting_id.test.ts new file mode 100644 index 0000000000000..79dac2723dfcb --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/integration_tests/rewriting_id.test.ts @@ -0,0 +1,241 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import Path from 'path'; +import Fs from 'fs'; +import Util from 'util'; +import { kibanaPackageJson as pkg } from '@kbn/utils'; +import * as kbnTestServer from '../../../../test_helpers/kbn_server'; +import type { ElasticsearchClient } from '../../../elasticsearch'; +import { Root } from '../../../root'; +import { deterministicallyRegenerateObjectId } from '../../migrations/core/document_migrator'; + +const logFilePath = Path.join(__dirname, 'migration_test_kibana.log'); + +const asyncUnlink = Util.promisify(Fs.unlink); +async function removeLogFile() { + // ignore errors if it doesn't exist + await asyncUnlink(logFilePath).catch(() => void 0); +} + +function sortByTypeAndId(a: { type: string; id: string }, b: { type: string; id: string }) { + return a.type.localeCompare(b.type) || a.id.localeCompare(b.id); +} + +async function fetchDocs(esClient: ElasticsearchClient, index: string) { + const { body } = await esClient.search({ + index, + body: { + query: { + bool: { + should: [ + { + term: { type: 'foo' }, + }, + { + term: { type: 'bar' }, + }, + { + term: { type: 'legacy-url-alias' }, + }, + ], + }, + }, + }, + }); + + return body.hits.hits + .map((h) => ({ + ...h._source, + id: h._id, + })) + .sort(sortByTypeAndId); +} + +function createRoot() { + return kbnTestServer.createRootWithCorePlugins( + { + migrations: { + skip: false, + enableV2: true, + }, + logging: { + appenders: { + file: { + type: 'file', + fileName: logFilePath, + layout: { + type: 'json', + }, + }, + }, + loggers: [ + { + name: 'root', + appenders: ['file'], + }, + ], + }, + }, + { + oss: true, + } + ); +} + +// skip as we allow id rewriting logic starting from v8.0 +describe.skip('migration v2', () => { + let esServer: kbnTestServer.TestElasticsearchUtils; + let root: Root; + + beforeAll(async () => { + await removeLogFile(); + }); + + afterAll(async () => { + if (root) { + await root.shutdown(); + } + if (esServer) { + await esServer.stop(); + } + + await new Promise((resolve) => setTimeout(resolve, 10000)); + }); + + it('rewrites id deterministically for SO with namespaceType: "multiple" and "multiple-isolated"', async () => { + const migratedIndex = `.kibana_${pkg.version}_001`; + const { startES } = kbnTestServer.createTestServers({ + adjustTimeout: (t: number) => jest.setTimeout(t), + settings: { + es: { + license: 'trial', + // original SO: + // [ + // { id: 'foo:1', type: 'foo', foo: { name: 'Foo 1 default' } }, + // { id: 'spacex:foo:1', type: 'foo', foo: { name: 'Foo 1 spacex' }, namespace: 'spacex' }, + // { + // id: 'bar:1', + // type: 'bar', + // bar: { nomnom: 1 }, + // references: [{ type: 'foo', id: '1', name: 'Foo 1 default' }], + // }, + // { + // id: 'spacex:bar:1', + // type: 'bar', + // bar: { nomnom: 2 }, + // references: [{ type: 'foo', id: '1', name: 'Foo 1 spacex' }], + // namespace: 'spacex', + // }, + // ]; + dataArchive: Path.join(__dirname, 'archives', '7.13.0_so_with_multiple_namespaces.zip'), + }, + }, + }); + + root = createRoot(); + + esServer = await startES(); + const coreSetup = await root.setup(); + + coreSetup.savedObjects.registerType({ + name: 'foo', + hidden: false, + mappings: { properties: { name: { type: 'text' } } }, + namespaceType: 'multiple', + convertToMultiNamespaceTypeVersion: '8.0.0', + }); + + coreSetup.savedObjects.registerType({ + name: 'bar', + hidden: false, + mappings: { properties: { nomnom: { type: 'integer' } } }, + namespaceType: 'multiple-isolated', + convertToMultiNamespaceTypeVersion: '8.0.0', + }); + + const coreStart = await root.start(); + const esClient = coreStart.elasticsearch.client.asInternalUser; + + const migratedDocs = await fetchDocs(esClient, migratedIndex); + + // each newly converted multi-namespace object in a non-default space has its ID deterministically regenerated, and a legacy-url-alias + // object is created which links the old ID to the new ID + const newFooId = deterministicallyRegenerateObjectId('spacex', 'foo', '1'); + const newBarId = deterministicallyRegenerateObjectId('spacex', 'bar', '1'); + + expect(migratedDocs).toEqual( + [ + { + id: 'foo:1', + type: 'foo', + foo: { name: 'Foo 1 default' }, + references: [], + namespaces: ['default'], + migrationVersion: { foo: '8.0.0' }, + coreMigrationVersion: pkg.version, + }, + { + id: `foo:${newFooId}`, + type: 'foo', + foo: { name: 'Foo 1 spacex' }, + references: [], + namespaces: ['spacex'], + originId: '1', + migrationVersion: { foo: '8.0.0' }, + coreMigrationVersion: pkg.version, + }, + { + // new object for spacex:foo:1 + id: 'legacy-url-alias:spacex:foo:1', + type: 'legacy-url-alias', + 'legacy-url-alias': { + targetId: newFooId, + targetNamespace: 'spacex', + targetType: 'foo', + }, + migrationVersion: {}, + references: [], + coreMigrationVersion: pkg.version, + }, + { + id: 'bar:1', + type: 'bar', + bar: { nomnom: 1 }, + references: [{ type: 'foo', id: '1', name: 'Foo 1 default' }], + namespaces: ['default'], + migrationVersion: { bar: '8.0.0' }, + coreMigrationVersion: pkg.version, + }, + { + id: `bar:${newBarId}`, + type: 'bar', + bar: { nomnom: 2 }, + references: [{ type: 'foo', id: newFooId, name: 'Foo 1 spacex' }], + namespaces: ['spacex'], + originId: '1', + migrationVersion: { bar: '8.0.0' }, + coreMigrationVersion: pkg.version, + }, + { + // new object for spacex:bar:1 + id: 'legacy-url-alias:spacex:bar:1', + type: 'legacy-url-alias', + 'legacy-url-alias': { + targetId: newBarId, + targetNamespace: 'spacex', + targetType: 'bar', + }, + migrationVersion: {}, + references: [], + coreMigrationVersion: pkg.version, + }, + ].sort(sortByTypeAndId) + ); + }); +}); diff --git a/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.test.ts b/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.test.ts index a6617fc2fb7f4..161d4a7219c8d 100644 --- a/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.test.ts +++ b/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.test.ts @@ -5,9 +5,9 @@ * in compliance with, at your election, the Elastic License 2.0 or the Server * Side Public License, v 1. */ - +import { cleanupMock } from './migrations_state_machine_cleanup.mocks'; import { migrationStateActionMachine } from './migrations_state_action_machine'; -import { loggingSystemMock } from '../../mocks'; +import { loggingSystemMock, elasticsearchServiceMock } from '../../mocks'; import * as Either from 'fp-ts/lib/Either'; import * as Option from 'fp-ts/lib/Option'; import { AllControlStates, State } from './types'; @@ -15,6 +15,7 @@ import { createInitialState } from './model'; import { ResponseError } from '@elastic/elasticsearch/lib/errors'; import { elasticsearchClientMock } from '../../elasticsearch/client/mocks'; +const esClient = elasticsearchServiceMock.createElasticsearchClient(); describe('migrationsStateActionMachine', () => { beforeAll(() => { jest @@ -74,6 +75,7 @@ describe('migrationsStateActionMachine', () => { logger: mockLogger.get(), model: transitionModel(['LEGACY_REINDEX', 'LEGACY_DELETE', 'LEGACY_DELETE', 'DONE']), next, + client: esClient, }); const logs = loggingSystemMock.collect(mockLogger); const doneLog = logs.info.splice(8, 1)[0][0]; @@ -151,6 +153,7 @@ describe('migrationsStateActionMachine', () => { logger: mockLogger.get(), model: transitionModel(['LEGACY_REINDEX', 'LEGACY_DELETE', 'LEGACY_DELETE', 'DONE']), next, + client: esClient, }) ).resolves.toEqual(expect.anything()); }); @@ -161,6 +164,7 @@ describe('migrationsStateActionMachine', () => { logger: mockLogger.get(), model: transitionModel(['LEGACY_REINDEX', 'LEGACY_DELETE', 'LEGACY_DELETE', 'DONE']), next, + client: esClient, }) ).resolves.toEqual(expect.objectContaining({ status: 'migrated' })); }); @@ -171,6 +175,7 @@ describe('migrationsStateActionMachine', () => { logger: mockLogger.get(), model: transitionModel(['LEGACY_REINDEX', 'LEGACY_DELETE', 'LEGACY_DELETE', 'DONE']), next, + client: esClient, }) ).resolves.toEqual(expect.objectContaining({ status: 'patched' })); }); @@ -181,6 +186,7 @@ describe('migrationsStateActionMachine', () => { logger: mockLogger.get(), model: transitionModel(['LEGACY_REINDEX', 'LEGACY_DELETE', 'FATAL']), next, + client: esClient, }) ).rejects.toMatchInlineSnapshot( `[Error: Unable to complete saved object migrations for the [.my-so-index] index: the fatal reason]` @@ -196,6 +202,7 @@ describe('migrationsStateActionMachine', () => { logger: mockLogger.get(), model: transitionModel(['LEGACY_DELETE', 'FATAL']), next, + client: esClient, }).catch((err) => err); // Ignore the first 4 log entries that come from our model const executionLogLogs = loggingSystemMock.collect(mockLogger).info.slice(4); @@ -418,6 +425,7 @@ describe('migrationsStateActionMachine', () => { }) ); }, + client: esClient, }) ).rejects.toMatchInlineSnapshot( `[Error: Unable to complete saved object migrations for the [.my-so-index] index. Please check the health of your Elasticsearch cluster and try again. Error: [snapshot_in_progress_exception]: Cannot delete indices that are being snapshotted]` @@ -450,6 +458,7 @@ describe('migrationsStateActionMachine', () => { next: () => { throw new Error('this action throws'); }, + client: esClient, }) ).rejects.toMatchInlineSnapshot( `[Error: Unable to complete saved object migrations for the [.my-so-index] index. Error: this action throws]` @@ -483,6 +492,7 @@ describe('migrationsStateActionMachine', () => { if (state.controlState === 'LEGACY_DELETE') throw new Error('this action throws'); return () => Promise.resolve('hello'); }, + client: esClient, }); } catch (e) { /** ignore */ @@ -680,4 +690,37 @@ describe('migrationsStateActionMachine', () => { ] `); }); + describe('cleanup', () => { + beforeEach(() => { + cleanupMock.mockClear(); + }); + it('calls cleanup function when an action throws', async () => { + await expect( + migrationStateActionMachine({ + initialState: { ...initialState, reason: 'the fatal reason' } as State, + logger: mockLogger.get(), + model: transitionModel(['LEGACY_REINDEX', 'LEGACY_DELETE', 'FATAL']), + next: () => { + throw new Error('this action throws'); + }, + client: esClient, + }) + ).rejects.toThrow(); + + expect(cleanupMock).toHaveBeenCalledTimes(1); + }); + it('calls cleanup function when reaching the FATAL state', async () => { + await expect( + migrationStateActionMachine({ + initialState: { ...initialState, reason: 'the fatal reason' } as State, + logger: mockLogger.get(), + model: transitionModel(['LEGACY_REINDEX', 'LEGACY_DELETE', 'FATAL']), + next, + client: esClient, + }) + ).rejects.toThrow(); + + expect(cleanupMock).toHaveBeenCalledTimes(1); + }); + }); }); diff --git a/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.ts b/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.ts index 20177dda63b3b..dede52f9758e9 100644 --- a/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.ts +++ b/src/core/server/saved_objects/migrationsv2/migrations_state_action_machine.ts @@ -9,8 +9,10 @@ import { errors as EsErrors } from '@elastic/elasticsearch'; import * as Option from 'fp-ts/lib/Option'; import { Logger, LogMeta } from '../../logging'; +import type { ElasticsearchClient } from '../../elasticsearch'; import { CorruptSavedObjectError } from '../migrations/core/migrate_raw_docs'; import { Model, Next, stateActionMachine } from './state_action_machine'; +import { cleanup } from './migrations_state_machine_cleanup'; import { State } from './types'; interface StateLogMeta extends LogMeta { @@ -19,7 +21,8 @@ interface StateLogMeta extends LogMeta { }; } -type ExecutionLog = Array< +/** @internal */ +export type ExecutionLog = Array< | { type: 'transition'; prevControlState: State['controlState']; @@ -31,6 +34,11 @@ type ExecutionLog = Array< controlState: State['controlState']; res: unknown; } + | { + type: 'cleanup'; + state: State; + message: string; + } >; const logStateTransition = ( @@ -99,11 +107,13 @@ export async function migrationStateActionMachine({ logger, next, model, + client, }: { initialState: State; logger: Logger; next: Next; model: Model; + client: ElasticsearchClient; }) { const executionLog: ExecutionLog = []; const startTime = Date.now(); @@ -112,11 +122,13 @@ export async function migrationStateActionMachine({ // indicate which messages come from which index upgrade. const logMessagePrefix = `[${initialState.indexPrefix}] `; let prevTimestamp = startTime; + let lastState: State | undefined; try { const finalState = await stateActionMachine( initialState, (state) => next(state), (state, res) => { + lastState = state; executionLog.push({ type: 'response', res, @@ -169,6 +181,7 @@ export async function migrationStateActionMachine({ }; } } else if (finalState.controlState === 'FATAL') { + await cleanup(client, executionLog, finalState); dumpExecutionLog(logger, logMessagePrefix, executionLog); return Promise.reject( new Error( @@ -180,6 +193,7 @@ export async function migrationStateActionMachine({ throw new Error('Invalid terminating control state'); } } catch (e) { + await cleanup(client, executionLog, lastState); if (e instanceof EsErrors.ResponseError) { logger.error( logMessagePrefix + `[${e.body?.error?.type}]: ${e.body?.error?.reason ?? e.message}` @@ -202,9 +216,13 @@ export async function migrationStateActionMachine({ ); } - throw new Error( + const newError = new Error( `Unable to complete saved object migrations for the [${initialState.indexPrefix}] index. ${e}` ); + + // restore error stack to point to a source of the problem. + newError.stack = `[${e.stack}]`; + throw newError; } } } diff --git a/src/core/server/saved_objects/migrationsv2/migrations_state_machine_cleanup.mocks.ts b/src/core/server/saved_objects/migrationsv2/migrations_state_machine_cleanup.mocks.ts new file mode 100644 index 0000000000000..29967a1f75820 --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/migrations_state_machine_cleanup.mocks.ts @@ -0,0 +1,12 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +export const cleanupMock = jest.fn(); +jest.doMock('./migrations_state_machine_cleanup', () => ({ + cleanup: cleanupMock, +})); diff --git a/src/core/server/saved_objects/migrationsv2/migrations_state_machine_cleanup.ts b/src/core/server/saved_objects/migrationsv2/migrations_state_machine_cleanup.ts new file mode 100644 index 0000000000000..1881f9a712c29 --- /dev/null +++ b/src/core/server/saved_objects/migrationsv2/migrations_state_machine_cleanup.ts @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import type { ElasticsearchClient } from '../../elasticsearch'; +import * as Actions from './actions'; +import type { State } from './types'; +import type { ExecutionLog } from './migrations_state_action_machine'; + +export async function cleanup( + client: ElasticsearchClient, + executionLog: ExecutionLog, + state?: State +) { + if (!state) return; + if ('sourceIndexPitId' in state) { + try { + await Actions.closePit(client, state.sourceIndexPitId)(); + } catch (e) { + executionLog.push({ + type: 'cleanup', + state, + message: e.message, + }); + } + } +} diff --git a/src/core/server/saved_objects/migrationsv2/model.test.ts b/src/core/server/saved_objects/migrationsv2/model.test.ts index 0267ae33dd157..57a7a7f2ea24a 100644 --- a/src/core/server/saved_objects/migrationsv2/model.test.ts +++ b/src/core/server/saved_objects/migrationsv2/model.test.ts @@ -17,7 +17,10 @@ import type { LegacyReindexState, LegacyReindexWaitForTaskState, LegacyDeleteState, - ReindexSourceToTempState, + ReindexSourceToTempOpenPit, + ReindexSourceToTempRead, + ReindexSourceToTempClosePit, + ReindexSourceToTempIndex, UpdateTargetMappingsState, UpdateTargetMappingsWaitForTaskState, OutdatedDocumentsSearch, @@ -25,7 +28,6 @@ import type { MarkVersionIndexReady, BaseState, CreateReindexTempState, - ReindexSourceToTempWaitForTaskState, MarkVersionIndexReadyConflict, CreateNewTargetState, CloneTempToSource, @@ -299,14 +301,12 @@ describe('migrations v2 model', () => { settings: {}, }, }); - const newState = model(initState, res) as FatalState; + const newState = model(initState, res) as WaitForYellowSourceState; - expect(newState.controlState).toEqual('WAIT_FOR_YELLOW_SOURCE'); - expect(newState).toMatchObject({ - controlState: 'WAIT_FOR_YELLOW_SOURCE', - sourceIndex: '.kibana_7.invalid.0_001', - }); + expect(newState.controlState).toBe('WAIT_FOR_YELLOW_SOURCE'); + expect(newState.sourceIndex.value).toBe('.kibana_7.invalid.0_001'); }); + test('INIT -> WAIT_FOR_YELLOW_SOURCE when migrating from a v2 migrations index (>= 7.11.0)', () => { const res: ResponseType<'INIT'> = Either.right({ '.kibana_7.11.0_001': { @@ -330,15 +330,14 @@ describe('migrations v2 model', () => { }, }, res - ); + ) as WaitForYellowSourceState; - expect(newState).toMatchObject({ - controlState: 'WAIT_FOR_YELLOW_SOURCE', - sourceIndex: '.kibana_7.11.0_001', - }); + expect(newState.controlState).toBe('WAIT_FOR_YELLOW_SOURCE'); + expect(newState.sourceIndex.value).toBe('.kibana_7.11.0_001'); expect(newState.retryCount).toEqual(0); expect(newState.retryDelay).toEqual(0); }); + test('INIT -> WAIT_FOR_YELLOW_SOURCE when migrating from a v1 migrations index (>= 6.5 < 7.11.0)', () => { const res: ResponseType<'INIT'> = Either.right({ '.kibana_3': { @@ -349,12 +348,10 @@ describe('migrations v2 model', () => { settings: {}, }, }); - const newState = model(initState, res); + const newState = model(initState, res) as WaitForYellowSourceState; - expect(newState).toMatchObject({ - controlState: 'WAIT_FOR_YELLOW_SOURCE', - sourceIndex: '.kibana_3', - }); + expect(newState.controlState).toBe('WAIT_FOR_YELLOW_SOURCE'); + expect(newState.sourceIndex.value).toBe('.kibana_3'); expect(newState.retryCount).toEqual(0); expect(newState.retryDelay).toEqual(0); }); @@ -420,12 +417,10 @@ describe('migrations v2 model', () => { versionIndex: 'my-saved-objects_7.11.0_001', }, res - ); + ) as WaitForYellowSourceState; - expect(newState).toMatchObject({ - controlState: 'WAIT_FOR_YELLOW_SOURCE', - sourceIndex: 'my-saved-objects_3', - }); + expect(newState.controlState).toBe('WAIT_FOR_YELLOW_SOURCE'); + expect(newState.sourceIndex.value).toBe('my-saved-objects_3'); expect(newState.retryCount).toEqual(0); expect(newState.retryDelay).toEqual(0); }); @@ -449,12 +444,11 @@ describe('migrations v2 model', () => { versionIndex: 'my-saved-objects_7.12.0_001', }, res - ); + ) as WaitForYellowSourceState; + + expect(newState.controlState).toBe('WAIT_FOR_YELLOW_SOURCE'); + expect(newState.sourceIndex.value).toBe('my-saved-objects_7.11.0'); - expect(newState).toMatchObject({ - controlState: 'WAIT_FOR_YELLOW_SOURCE', - sourceIndex: 'my-saved-objects_7.11.0', - }); expect(newState.retryCount).toEqual(0); expect(newState.retryDelay).toEqual(0); }); @@ -662,7 +656,7 @@ describe('migrations v2 model', () => { const waitForYellowSourceState: WaitForYellowSourceState = { ...baseState, controlState: 'WAIT_FOR_YELLOW_SOURCE', - sourceIndex: '.kibana_3', + sourceIndex: Option.some('.kibana_3') as Option.Some, sourceIndexMappings: mappingsWithUnknownType, }; @@ -734,7 +728,7 @@ describe('migrations v2 model', () => { }); }); describe('CREATE_REINDEX_TEMP', () => { - const createReindexTargetState: CreateReindexTempState = { + const state: CreateReindexTempState = { ...baseState, controlState: 'CREATE_REINDEX_TEMP', versionIndexReadyActions: Option.none, @@ -742,80 +736,134 @@ describe('migrations v2 model', () => { targetIndex: '.kibana_7.11.0_001', tempIndexMappings: { properties: {} }, }; - it('CREATE_REINDEX_TEMP -> REINDEX_SOURCE_TO_TEMP if action succeeds', () => { + it('CREATE_REINDEX_TEMP -> REINDEX_SOURCE_TO_TEMP_OPEN_PIT if action succeeds', () => { const res: ResponseType<'CREATE_REINDEX_TEMP'> = Either.right('create_index_succeeded'); - const newState = model(createReindexTargetState, res); - expect(newState.controlState).toEqual('REINDEX_SOURCE_TO_TEMP'); + const newState = model(state, res); + expect(newState.controlState).toEqual('REINDEX_SOURCE_TO_TEMP_OPEN_PIT'); expect(newState.retryCount).toEqual(0); expect(newState.retryDelay).toEqual(0); }); }); - describe('REINDEX_SOURCE_TO_TEMP', () => { - const reindexSourceToTargetState: ReindexSourceToTempState = { + + describe('REINDEX_SOURCE_TO_TEMP_OPEN_PIT', () => { + const state: ReindexSourceToTempOpenPit = { ...baseState, - controlState: 'REINDEX_SOURCE_TO_TEMP', + controlState: 'REINDEX_SOURCE_TO_TEMP_OPEN_PIT', versionIndexReadyActions: Option.none, sourceIndex: Option.some('.kibana') as Option.Some, targetIndex: '.kibana_7.11.0_001', + tempIndexMappings: { properties: {} }, }; - test('REINDEX_SOURCE_TO_TEMP -> REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK', () => { - const res: ResponseType<'REINDEX_SOURCE_TO_TEMP'> = Either.right({ - taskId: 'reindex-task-id', + it('REINDEX_SOURCE_TO_TEMP_OPEN_PIT -> REINDEX_SOURCE_TO_TEMP_READ if action succeeds', () => { + const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_OPEN_PIT'> = Either.right({ + pitId: 'pit_id', }); - const newState = model(reindexSourceToTargetState, res); - expect(newState.controlState).toEqual('REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK'); - expect(newState.retryCount).toEqual(0); - expect(newState.retryDelay).toEqual(0); + const newState = model(state, res) as ReindexSourceToTempRead; + expect(newState.controlState).toBe('REINDEX_SOURCE_TO_TEMP_READ'); + expect(newState.sourceIndexPitId).toBe('pit_id'); + expect(newState.lastHitSortValue).toBe(undefined); }); }); - describe('REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK', () => { - const state: ReindexSourceToTempWaitForTaskState = { + + describe('REINDEX_SOURCE_TO_TEMP_READ', () => { + const state: ReindexSourceToTempRead = { ...baseState, - controlState: 'REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK', + controlState: 'REINDEX_SOURCE_TO_TEMP_READ', versionIndexReadyActions: Option.none, sourceIndex: Option.some('.kibana') as Option.Some, + sourceIndexPitId: 'pit_id', targetIndex: '.kibana_7.11.0_001', - reindexSourceToTargetTaskId: 'reindex-task-id', + tempIndexMappings: { properties: {} }, + lastHitSortValue: undefined, }; - test('REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK -> SET_TEMP_WRITE_BLOCK when response is right', () => { - const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK'> = Either.right( - 'reindex_succeeded' + + it('REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_INDEX if the index has outdated documents to reindex', () => { + const outdatedDocuments = [{ _id: '1', _source: { type: 'vis' } }]; + const lastHitSortValue = [123456]; + const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_READ'> = Either.right({ + outdatedDocuments, + lastHitSortValue, + }); + const newState = model(state, res) as ReindexSourceToTempIndex; + expect(newState.controlState).toBe('REINDEX_SOURCE_TO_TEMP_INDEX'); + expect(newState.outdatedDocuments).toBe(outdatedDocuments); + expect(newState.lastHitSortValue).toBe(lastHitSortValue); + }); + + it('REINDEX_SOURCE_TO_TEMP_READ -> REINDEX_SOURCE_TO_TEMP_CLOSE_PIT if no outdated documents to reindex', () => { + const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_READ'> = Either.right({ + outdatedDocuments: [], + lastHitSortValue: undefined, + }); + const newState = model(state, res) as ReindexSourceToTempClosePit; + expect(newState.controlState).toBe('REINDEX_SOURCE_TO_TEMP_CLOSE_PIT'); + expect(newState.sourceIndexPitId).toBe('pit_id'); + }); + }); + + describe('REINDEX_SOURCE_TO_TEMP_CLOSE_PIT', () => { + const state: ReindexSourceToTempClosePit = { + ...baseState, + controlState: 'REINDEX_SOURCE_TO_TEMP_CLOSE_PIT', + versionIndexReadyActions: Option.none, + sourceIndex: Option.some('.kibana') as Option.Some, + sourceIndexPitId: 'pit_id', + targetIndex: '.kibana_7.11.0_001', + tempIndexMappings: { properties: {} }, + }; + + it('REINDEX_SOURCE_TO_TEMP_CLOSE_PIT -> SET_TEMP_WRITE_BLOCK if action succeeded', () => { + const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_CLOSE_PIT'> = Either.right({}); + const newState = model(state, res) as ReindexSourceToTempIndex; + expect(newState.controlState).toBe('SET_TEMP_WRITE_BLOCK'); + expect(newState.sourceIndex).toEqual(state.sourceIndex); + }); + }); + + describe('REINDEX_SOURCE_TO_TEMP_INDEX', () => { + const state: ReindexSourceToTempIndex = { + ...baseState, + controlState: 'REINDEX_SOURCE_TO_TEMP_INDEX', + outdatedDocuments: [], + versionIndexReadyActions: Option.none, + sourceIndex: Option.some('.kibana') as Option.Some, + sourceIndexPitId: 'pit_id', + targetIndex: '.kibana_7.11.0_001', + lastHitSortValue: undefined, + }; + + it('REINDEX_SOURCE_TO_TEMP_INDEX -> REINDEX_SOURCE_TO_TEMP_READ if action succeeded', () => { + const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_INDEX'> = Either.right( + 'bulk_index_succeeded' ); const newState = model(state, res); - expect(newState.controlState).toEqual('SET_TEMP_WRITE_BLOCK'); + expect(newState.controlState).toEqual('REINDEX_SOURCE_TO_TEMP_READ'); expect(newState.retryCount).toEqual(0); expect(newState.retryDelay).toEqual(0); }); - test('REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK -> SET_TEMP_WRITE_BLOCK when response is left target_index_had_write_block', () => { - const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK'> = Either.left({ + + it('REINDEX_SOURCE_TO_TEMP_INDEX -> REINDEX_SOURCE_TO_TEMP_READ when response is left target_index_had_write_block', () => { + const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_INDEX'> = Either.left({ type: 'target_index_had_write_block', }); - const newState = model(state, res); - expect(newState.controlState).toEqual('SET_TEMP_WRITE_BLOCK'); + const newState = model(state, res) as ReindexSourceToTempRead; + expect(newState.controlState).toEqual('REINDEX_SOURCE_TO_TEMP_READ'); expect(newState.retryCount).toEqual(0); expect(newState.retryDelay).toEqual(0); }); - test('REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK -> SET_TEMP_WRITE_BLOCK when response is left index_not_found_exception', () => { - const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK'> = Either.left({ + + it('REINDEX_SOURCE_TO_TEMP_INDEX -> REINDEX_SOURCE_TO_TEMP_READ when response is left index_not_found_exception for temp index', () => { + const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_INDEX'> = Either.left({ type: 'index_not_found_exception', - index: '.kibana_7.11.0_reindex_temp', + index: state.tempIndex, }); - const newState = model(state, res); - expect(newState.controlState).toEqual('SET_TEMP_WRITE_BLOCK'); + const newState = model(state, res) as ReindexSourceToTempRead; + expect(newState.controlState).toEqual('REINDEX_SOURCE_TO_TEMP_READ'); expect(newState.retryCount).toEqual(0); expect(newState.retryDelay).toEqual(0); }); - test('REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK -> REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK when response is left wait_for_task_completion_timeout', () => { - const res: ResponseType<'REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK'> = Either.left({ - message: '[timeout_exception] Timeout waiting for ...', - type: 'wait_for_task_completion_timeout', - }); - const newState = model(state, res); - expect(newState.controlState).toEqual('REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK'); - expect(newState.retryCount).toEqual(1); - expect(newState.retryDelay).toEqual(2000); - }); }); + describe('SET_TEMP_WRITE_BLOCK', () => { const state: SetTempWriteBlock = { ...baseState, diff --git a/src/core/server/saved_objects/migrationsv2/model.ts b/src/core/server/saved_objects/migrationsv2/model.ts index acf0f620136a2..2097b1de88aab 100644 --- a/src/core/server/saved_objects/migrationsv2/model.ts +++ b/src/core/server/saved_objects/migrationsv2/model.ts @@ -227,7 +227,7 @@ export const model = (currentState: State, resW: ResponseType): return { ...stateP, controlState: 'WAIT_FOR_YELLOW_SOURCE', - sourceIndex: source, + sourceIndex: Option.some(source) as Option.Some, sourceIndexMappings: indices[source].mappings, }; } else if (indices[stateP.legacyIndex] != null) { @@ -303,7 +303,7 @@ export const model = (currentState: State, resW: ResponseType): } } else if (stateP.controlState === 'LEGACY_SET_WRITE_BLOCK') { const res = resW as ExcludeRetryableEsError>; - // If the write block is sucessfully in place + // If the write block is successfully in place if (Either.isRight(res)) { return { ...stateP, controlState: 'LEGACY_CREATE_REINDEX_TARGET' }; } else if (Either.isLeft(res)) { @@ -431,14 +431,14 @@ export const model = (currentState: State, resW: ResponseType): return { ...stateP, controlState: 'SET_SOURCE_WRITE_BLOCK', - sourceIndex: Option.some(source) as Option.Some, + sourceIndex: source, targetIndex: target, targetIndexMappings: disableUnknownTypeMappingFields( stateP.targetIndexMappings, stateP.sourceIndexMappings ), versionIndexReadyActions: Option.some([ - { remove: { index: source, alias: stateP.currentAlias, must_exist: true } }, + { remove: { index: source.value, alias: stateP.currentAlias, must_exist: true } }, { add: { index: target, alias: stateP.currentAlias } }, { add: { index: target, alias: stateP.versionAlias } }, { remove_index: { index: stateP.tempIndex } }, @@ -466,32 +466,61 @@ export const model = (currentState: State, resW: ResponseType): } else if (stateP.controlState === 'CREATE_REINDEX_TEMP') { const res = resW as ExcludeRetryableEsError>; if (Either.isRight(res)) { - return { ...stateP, controlState: 'REINDEX_SOURCE_TO_TEMP' }; + return { ...stateP, controlState: 'REINDEX_SOURCE_TO_TEMP_OPEN_PIT' }; } else { // If the createIndex action receives an 'resource_already_exists_exception' // it will wait until the index status turns green so we don't have any // left responses to handle here. throwBadResponse(stateP, res); } - } else if (stateP.controlState === 'REINDEX_SOURCE_TO_TEMP') { + } else if (stateP.controlState === 'REINDEX_SOURCE_TO_TEMP_OPEN_PIT') { const res = resW as ExcludeRetryableEsError>; if (Either.isRight(res)) { return { ...stateP, - controlState: 'REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK', - reindexSourceToTargetTaskId: res.right.taskId, + controlState: 'REINDEX_SOURCE_TO_TEMP_READ', + sourceIndexPitId: res.right.pitId, + lastHitSortValue: undefined, }; } else { - // Since this is a background task, the request should always succeed, - // errors only show up in the returned task. throwBadResponse(stateP, res); } - } else if (stateP.controlState === 'REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK') { + } else if (stateP.controlState === 'REINDEX_SOURCE_TO_TEMP_READ') { const res = resW as ExcludeRetryableEsError>; if (Either.isRight(res)) { + if (res.right.outdatedDocuments.length > 0) { + return { + ...stateP, + controlState: 'REINDEX_SOURCE_TO_TEMP_INDEX', + outdatedDocuments: res.right.outdatedDocuments, + lastHitSortValue: res.right.lastHitSortValue, + }; + } return { ...stateP, + controlState: 'REINDEX_SOURCE_TO_TEMP_CLOSE_PIT', + }; + } else { + throwBadResponse(stateP, res); + } + } else if (stateP.controlState === 'REINDEX_SOURCE_TO_TEMP_CLOSE_PIT') { + const res = resW as ExcludeRetryableEsError>; + if (Either.isRight(res)) { + const { sourceIndexPitId, ...state } = stateP; + return { + ...state, controlState: 'SET_TEMP_WRITE_BLOCK', + sourceIndex: stateP.sourceIndex as Option.Some, + }; + } else { + throwBadResponse(stateP, res); + } + } else if (stateP.controlState === 'REINDEX_SOURCE_TO_TEMP_INDEX') { + const res = resW as ExcludeRetryableEsError>; + if (Either.isRight(res)) { + return { + ...stateP, + controlState: 'REINDEX_SOURCE_TO_TEMP_READ', }; } else { const left = res.left; @@ -510,28 +539,11 @@ export const model = (currentState: State, resW: ResponseType): // we know another instance already completed these. return { ...stateP, - controlState: 'SET_TEMP_WRITE_BLOCK', + controlState: 'REINDEX_SOURCE_TO_TEMP_READ', }; - } else if (isLeftTypeof(left, 'wait_for_task_completion_timeout')) { - // After waiting for the specificed timeout, the task has not yet - // completed. Retry this step to see if the task has completed after an - // exponential delay. We will basically keep polling forever until the - // Elasticeasrch task succeeds or fails. - return delayRetryState(stateP, left.message, Number.MAX_SAFE_INTEGER); - } else if ( - isLeftTypeof(left, 'index_not_found_exception') || - isLeftTypeof(left, 'incompatible_mapping_exception') - ) { - // Don't handle the following errors as the migration algorithm should - // never cause them to occur: - // - incompatible_mapping_exception the temp index has `dynamic: false` - // mappings - // - index_not_found_exception for the source index, we will never - // delete the source index - throwBadResponse(stateP, left as never); - } else { - throwBadResponse(stateP, left); } + // should never happen + throwBadResponse(stateP, res as never); } } else if (stateP.controlState === 'SET_TEMP_WRITE_BLOCK') { const res = resW as ExcludeRetryableEsError>; @@ -609,7 +621,7 @@ export const model = (currentState: State, resW: ResponseType): controlState: 'OUTDATED_DOCUMENTS_SEARCH', }; } else { - throwBadResponse(stateP, res); + throwBadResponse(stateP, res as never); } } else if (stateP.controlState === 'UPDATE_TARGET_MAPPINGS') { const res = resW as ExcludeRetryableEsError>; @@ -647,10 +659,10 @@ export const model = (currentState: State, resW: ResponseType): } else { const left = res.left; if (isLeftTypeof(left, 'wait_for_task_completion_timeout')) { - // After waiting for the specificed timeout, the task has not yet + // After waiting for the specified timeout, the task has not yet // completed. Retry this step to see if the task has completed after an // exponential delay. We will basically keep polling forever until the - // Elasticeasrch task succeeds or fails. + // Elasticsearch task succeeds or fails. return delayRetryState(stateP, res.left.message, Number.MAX_SAFE_INTEGER); } else { throwBadResponse(stateP, left); diff --git a/src/core/server/saved_objects/migrationsv2/next.ts b/src/core/server/saved_objects/migrationsv2/next.ts index bb506cbca66fb..6d61634a6948e 100644 --- a/src/core/server/saved_objects/migrationsv2/next.ts +++ b/src/core/server/saved_objects/migrationsv2/next.ts @@ -6,13 +6,13 @@ * Side Public License, v 1. */ -import * as TaskEither from 'fp-ts/lib/TaskEither'; -import * as Option from 'fp-ts/lib/Option'; -import { UnwrapPromise } from '@kbn/utility-types'; -import { pipe } from 'fp-ts/lib/pipeable'; +import type { UnwrapPromise } from '@kbn/utility-types'; import type { AllActionStates, - ReindexSourceToTempState, + ReindexSourceToTempOpenPit, + ReindexSourceToTempRead, + ReindexSourceToTempClosePit, + ReindexSourceToTempIndex, MarkVersionIndexReady, InitState, LegacyCreateReindexTargetState, @@ -27,18 +27,16 @@ import type { UpdateTargetMappingsState, UpdateTargetMappingsWaitForTaskState, CreateReindexTempState, - ReindexSourceToTempWaitForTaskState, MarkVersionIndexReadyConflict, CreateNewTargetState, CloneTempToSource, SetTempWriteBlock, WaitForYellowSourceState, + TransformRawDocs, } from './types'; import * as Actions from './actions'; import { ElasticsearchClient } from '../../elasticsearch'; -import { SavedObjectsRawDoc } from '..'; -export type TransformRawDocs = (rawDocs: SavedObjectsRawDoc[]) => Promise; type ActionMap = ReturnType; /** @@ -56,26 +54,43 @@ export const nextActionMap = (client: ElasticsearchClient, transformRawDocs: Tra INIT: (state: InitState) => Actions.fetchIndices(client, [state.currentAlias, state.versionAlias]), WAIT_FOR_YELLOW_SOURCE: (state: WaitForYellowSourceState) => - Actions.waitForIndexStatusYellow(client, state.sourceIndex), + Actions.waitForIndexStatusYellow(client, state.sourceIndex.value), SET_SOURCE_WRITE_BLOCK: (state: SetSourceWriteBlockState) => Actions.setWriteBlock(client, state.sourceIndex.value), CREATE_NEW_TARGET: (state: CreateNewTargetState) => Actions.createIndex(client, state.targetIndex, state.targetIndexMappings), CREATE_REINDEX_TEMP: (state: CreateReindexTempState) => Actions.createIndex(client, state.tempIndex, state.tempIndexMappings), - REINDEX_SOURCE_TO_TEMP: (state: ReindexSourceToTempState) => - Actions.reindex( + REINDEX_SOURCE_TO_TEMP_OPEN_PIT: (state: ReindexSourceToTempOpenPit) => + Actions.openPit(client, state.sourceIndex.value), + REINDEX_SOURCE_TO_TEMP_READ: (state: ReindexSourceToTempRead) => + Actions.readWithPit( client, - state.sourceIndex.value, + state.sourceIndexPitId, + state.unusedTypesQuery, + state.batchSize, + state.lastHitSortValue + ), + REINDEX_SOURCE_TO_TEMP_CLOSE_PIT: (state: ReindexSourceToTempClosePit) => + Actions.closePit(client, state.sourceIndexPitId), + REINDEX_SOURCE_TO_TEMP_INDEX: (state: ReindexSourceToTempIndex) => + Actions.transformDocs( + client, + transformRawDocs, + state.outdatedDocuments, state.tempIndex, - Option.none, - false, - state.unusedTypesQuery + /** + * Since we don't run a search against the target index, we disable "refresh" to speed up + * the migration process. + * Although any further step must run "refresh" for the target index + * before we reach out to the OUTDATED_DOCUMENTS_SEARCH step. + * Right now, we rely on UPDATE_TARGET_MAPPINGS + UPDATE_TARGET_MAPPINGS_WAIT_FOR_TASK + * to perform refresh. + */ + false ), SET_TEMP_WRITE_BLOCK: (state: SetTempWriteBlock) => Actions.setWriteBlock(client, state.tempIndex), - REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK: (state: ReindexSourceToTempWaitForTaskState) => - Actions.waitForReindexTask(client, state.reindexSourceToTargetTaskId, '60s'), CLONE_TEMP_TO_TARGET: (state: CloneTempToSource) => Actions.cloneIndex(client, state.tempIndex, state.targetIndex), UPDATE_TARGET_MAPPINGS: (state: UpdateTargetMappingsState) => @@ -89,16 +104,20 @@ export const nextActionMap = (client: ElasticsearchClient, transformRawDocs: Tra outdatedDocumentsQuery: state.outdatedDocumentsQuery, }), OUTDATED_DOCUMENTS_TRANSFORM: (state: OutdatedDocumentsTransform) => - pipe( - TaskEither.tryCatch( - () => transformRawDocs(state.outdatedDocuments), - (e) => { - throw e; - } - ), - TaskEither.chain((docs) => - Actions.bulkOverwriteTransformedDocuments(client, state.targetIndex, docs) - ) + // Wait for a refresh to happen before returning. This ensures that when + // this Kibana instance searches for outdated documents, it won't find + // documents that were already transformed by itself or another Kibana + // instance. However, this causes each OUTDATED_DOCUMENTS_SEARCH -> + // OUTDATED_DOCUMENTS_TRANSFORM cycle to take 1s so when batches are + // small performance will become a lot worse. + // The alternative is to use a search_after with either a tie_breaker + // field or using a Point In Time as a cursor to go through all documents. + Actions.transformDocs( + client, + transformRawDocs, + state.outdatedDocuments, + state.targetIndex, + 'wait_for' ), MARK_VERSION_INDEX_READY: (state: MarkVersionIndexReady) => Actions.updateAliases(client, state.versionIndexReadyActions.value), diff --git a/src/core/server/saved_objects/migrationsv2/types.ts b/src/core/server/saved_objects/migrationsv2/types.ts index 5e84bc23b1d16..b84d483cf6203 100644 --- a/src/core/server/saved_objects/migrationsv2/types.ts +++ b/src/core/server/saved_objects/migrationsv2/types.ts @@ -132,7 +132,7 @@ export type FatalState = BaseState & { export interface WaitForYellowSourceState extends BaseState { /** Wait for the source index to be yellow before requesting it. */ readonly controlState: 'WAIT_FOR_YELLOW_SOURCE'; - readonly sourceIndex: string; + readonly sourceIndex: Option.Some; readonly sourceIndexMappings: IndexMapping; } @@ -158,21 +158,29 @@ export type CreateReindexTempState = PostInitState & { readonly sourceIndex: Option.Some; }; -export type ReindexSourceToTempState = PostInitState & { - /** Reindex documents from the source index into the target index */ - readonly controlState: 'REINDEX_SOURCE_TO_TEMP'; +export interface ReindexSourceToTempOpenPit extends PostInitState { + /** Open PIT to the source index */ + readonly controlState: 'REINDEX_SOURCE_TO_TEMP_OPEN_PIT'; readonly sourceIndex: Option.Some; -}; +} -export type ReindexSourceToTempWaitForTaskState = PostInitState & { - /** - * Wait until reindexing documents from the source index into the target - * index has completed - */ - readonly controlState: 'REINDEX_SOURCE_TO_TEMP_WAIT_FOR_TASK'; - readonly sourceIndex: Option.Some; - readonly reindexSourceToTargetTaskId: string; -}; +export interface ReindexSourceToTempRead extends PostInitState { + readonly controlState: 'REINDEX_SOURCE_TO_TEMP_READ'; + readonly sourceIndexPitId: string; + readonly lastHitSortValue: number[] | undefined; +} + +export interface ReindexSourceToTempClosePit extends PostInitState { + readonly controlState: 'REINDEX_SOURCE_TO_TEMP_CLOSE_PIT'; + readonly sourceIndexPitId: string; +} + +export interface ReindexSourceToTempIndex extends PostInitState { + readonly controlState: 'REINDEX_SOURCE_TO_TEMP_INDEX'; + readonly outdatedDocuments: SavedObjectsRawDoc[]; + readonly sourceIndexPitId: string; + readonly lastHitSortValue: number[] | undefined; +} export type SetTempWriteBlock = PostInitState & { /** @@ -302,8 +310,10 @@ export type State = | SetSourceWriteBlockState | CreateNewTargetState | CreateReindexTempState - | ReindexSourceToTempState - | ReindexSourceToTempWaitForTaskState + | ReindexSourceToTempOpenPit + | ReindexSourceToTempRead + | ReindexSourceToTempClosePit + | ReindexSourceToTempIndex | SetTempWriteBlock | CloneTempToSource | UpdateTargetMappingsState @@ -324,3 +334,5 @@ export type AllControlStates = State['controlState']; * 'FATAL' and 'DONE'). */ export type AllActionStates = Exclude; + +export type TransformRawDocs = (rawDocs: SavedObjectsRawDoc[]) => Promise; diff --git a/src/core/server/saved_objects/service/lib/repository.ts b/src/core/server/saved_objects/service/lib/repository.ts index c0e2cdc333363..8faa476b77bfa 100644 --- a/src/core/server/saved_objects/service/lib/repository.ts +++ b/src/core/server/saved_objects/service/lib/repository.ts @@ -1917,10 +1917,7 @@ export class SavedObjectsRepository { ...(preference ? { preference } : {}), }; - const { - body, - statusCode, - } = await this.client.openPointInTime( + const { body, statusCode } = await this.client.openPointInTime( // @ts-expect-error @elastic/elasticsearch OpenPointInTimeRequest.index expected to accept string[] esOptions, { diff --git a/src/core/test_helpers/kbn_server.ts b/src/core/test_helpers/kbn_server.ts index 950ab5f4392e1..dbf19f84825be 100644 --- a/src/core/test_helpers/kbn_server.ts +++ b/src/core/test_helpers/kbn_server.ts @@ -6,7 +6,7 @@ * Side Public License, v 1. */ -import { Client } from 'elasticsearch'; +import type { KibanaClient } from '@elastic/elasticsearch/api/kibana'; import { ToolingLog, REPO_ROOT } from '@kbn/dev-utils'; import { // @ts-expect-error https://github.com/elastic/kibana/issues/95679 @@ -140,7 +140,7 @@ export interface TestElasticsearchServer { start: (esArgs: string[], esEnvVars: Record) => Promise; stop: () => Promise; cleanup: () => Promise; - getClient: () => Client; + getClient: () => KibanaClient; getCallCluster: () => LegacyAPICaller; getUrl: () => string; } diff --git a/test/functional/apps/context/_discover_navigation.js b/test/functional/apps/context/_discover_navigation.js index dc5d56271c7fd..1c3862e07e9d7 100644 --- a/test/functional/apps/context/_discover_navigation.js +++ b/test/functional/apps/context/_discover_navigation.js @@ -35,7 +35,10 @@ export default function ({ getService, getPageObjects }) { describe('context link in discover', () => { before(async () => { await PageObjects.timePicker.setDefaultAbsoluteRangeViaUiSettings(); - await kibanaServer.uiSettings.update({ 'doc_table:legacy': true }); + await kibanaServer.uiSettings.update({ + 'doc_table:legacy': true, + defaultIndex: 'logstash-*', + }); await PageObjects.common.navigateToApp('discover'); for (const columnName of TEST_COLUMN_NAMES) { diff --git a/test/functional/fixtures/es_archiver/visualize/data.json b/test/functional/fixtures/es_archiver/visualize/data.json index 66941e201e9ba..f337bffe80f2c 100644 --- a/test/functional/fixtures/es_archiver/visualize/data.json +++ b/test/functional/fixtures/es_archiver/visualize/data.json @@ -207,7 +207,7 @@ "fields": "[{\"name\":\"_id\",\"type\":\"string\",\"esTypes\":[\"_id\"],\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":false},{\"name\":\"_index\",\"type\":\"string\",\"esTypes\":[\"_index\"],\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":false},{\"name\":\"_score\",\"type\":\"number\",\"count\":0,\"scripted\":false,\"searchable\":false,\"aggregatable\":false,\"readFromDocValues\":false},{\"name\":\"_source\",\"type\":\"_source\",\"esTypes\":[\"_source\"],\"count\":0,\"scripted\":false,\"searchable\":false,\"aggregatable\":false,\"readFromDocValues\":false},{\"name\":\"_type\",\"type\":\"string\",\"esTypes\":[\"_type\"],\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":false},{\"name\":\"message\",\"type\":\"string\",\"esTypes\":[\"text\"],\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":false,\"readFromDocValues\":false},{\"name\":\"message.keyword\",\"type\":\"string\",\"esTypes\":[\"keyword\"],\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true,\"subType\":{\"multi\":{\"parent\":\"message\"}}},{\"name\":\"user\",\"type\":\"string\",\"esTypes\":[\"text\"],\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":false,\"readFromDocValues\":false},{\"name\":\"user.keyword\",\"type\":\"string\",\"esTypes\":[\"keyword\"],\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true,\"subType\":{\"multi\":{\"parent\":\"user\"}}}]", "title": "test_index*" }, - "type": "test_index*" + "type": "index-pattern" } } }