Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Migrations] Add support of deferred migrations #153117

Merged
merged 18 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions dev_docs/tutorials/saved_objects.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,38 @@ the error should be verbose and informative so that the corrupt document can be

**WARNING:** Do not attempt to change the `typeMigrationVersion`, `id`, or `type` fields within a migration function, this is not supported.

### Deferred Migrations
Usually, migrations run during the upgrade process, and sometimes that may block it if there is a huge amount of outdated objects.
In this case, it is recommended to mark some of the migrations to defer their execution.

```ts
export const dashboardVisualization: SavedObjectsType = {
name: 'dashboard_visualization', [1]
/** ... */
migrations: {
// Takes a pre 1.1.0 doc, and converts it to 1.1.0
'1.1.0': {
deferred: true,
transform: migrateDashboardVisualization110,
},
},
};
```

By default, all the migrations are not deferred, and in order to make them so, the `deferred` flag should be explicitly set to `true`.
In this case, the documents with only pending deferred migrations will not be migrated during the upgrade process.

But whenever they are accessed via Saved Object API or repository, all the migrations will be applied to them on the fly:
- On read operations, the stored objects remain untouched and only transformed before returning the result.
If there are some failures during the migration, an exception or 500 server error will be thrown,
so that it is guaranteed that all the returned objects will be up to date.
- On write operations, the objects will be migrated to the latest version before writing them.

In other words, this flag postpones the write operation until the objects are explicitly modified.

One important notice: if there is a few pending migrations for a document and not all of them can be deferred,
the document will be migrated during the upgrade process, and all pending migrations will be applied.

### Testing Migrations

Bugs in a migration function cause downtime for our users and therefore have a very high impact. Follow the <DocLink id="kibDevTutorialTestingPlugins" section="saved-objects-migrations" text="Saved Object migrations section in the plugin testing guide"/>.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,15 @@ type ExpectedBulkGetResult = Either<

export const performBulkGet = async <T>(
{ objects, options }: PerformBulkGetParams<T>,
{ helpers, allowedTypes, client, serializer, registry, extensions = {} }: ApiExecutionContext
{
helpers,
allowedTypes,
client,
migrator,
serializer,
registry,
extensions = {},
}: ApiExecutionContext
): Promise<SavedObjectsBulkResponse<T>> => {
const {
common: commonHelper,
Expand Down Expand Up @@ -192,9 +200,12 @@ export const performBulkGet = async <T>(
}

// @ts-expect-error MultiGetHit._source is optional
return getSavedObjectFromSource(registry, type, id, doc, {
const document = getSavedObjectFromSource(registry, type, id, doc, {
migrationVersionCompatibility,
});
const migrated = migrator.migrateDocument(document);

return migrated;
}),
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ export const performBulkResolve = async <T>(
helpers,
allowedTypes,
client,
migrator,
serializer,
extensions = {},
} = apiExecutionContext;
Expand All @@ -43,6 +44,7 @@ export const performBulkResolve = async <T>(
registry,
allowedTypes,
client,
migrator,
serializer,
getIndexForType: commonHelper.getIndexForType.bind(commonHelper),
incrementCounterInternal: (type, id, counterFields, opts = {}) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
SavedObjectsErrorHelpers,
type SavedObjectsRawDoc,
CheckAuthorizationResult,
type SavedObject,
SavedObjectsRawDocSource,
} from '@kbn/core-saved-objects-server';
import {
Expand Down Expand Up @@ -48,7 +49,6 @@ export const performFind = async <T = unknown, A = unknown>(
allowedTypes: rawAllowedTypes,
mappings,
client,
serializer,
migrator,
extensions = {},
}: ApiExecutionContext
Expand Down Expand Up @@ -229,22 +229,32 @@ export const performFind = async <T = unknown, A = unknown>(
return SavedObjectsUtils.createEmptyFindResponse<T, A>(options);
}

const result = {
...(body.aggregations ? { aggregations: body.aggregations as unknown as A } : {}),
page,
per_page: perPage,
total: body.hits.total,
saved_objects: body.hits.hits.map(
(hit: estypes.SearchHit<SavedObjectsRawDocSource>): SavedObjectsFindResult => ({
...serializerHelper.rawToSavedObject(hit as SavedObjectsRawDoc, {
migrationVersionCompatibility,
}),
score: hit._score!,
sort: hit.sort,
})
),
pit_id: body.pit_id,
} as SavedObjectsFindResponse<T, A>;
let result: SavedObjectsFindResponse<T, A>;
try {
result = {
...(body.aggregations ? { aggregations: body.aggregations as unknown as A } : {}),
page,
per_page: perPage,
total: body.hits.total,
saved_objects: body.hits.hits.map(
(hit: estypes.SearchHit<SavedObjectsRawDocSource>): SavedObjectsFindResult => ({
...(migrator.migrateDocument(
serializerHelper.rawToSavedObject(hit as SavedObjectsRawDoc, {
migrationVersionCompatibility,
})
) as SavedObject),
score: hit._score!,
sort: hit.sort,
})
),
pit_id: body.pit_id,
} as typeof result;
} catch (error) {
throw SavedObjectsErrorHelpers.decorateGeneralError(
error,
'Failed to migrate document to the latest version.'
);
}

if (disableExtensions) {
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,15 @@ export interface PerformGetParams {

export const performGet = async <T>(
{ type, id, options }: PerformGetParams,
{ registry, helpers, allowedTypes, client, serializer, extensions = {} }: ApiExecutionContext
{
registry,
helpers,
allowedTypes,
client,
migrator,
serializer,
extensions = {},
}: ApiExecutionContext
): Promise<SavedObject<T>> => {
const { common: commonHelper, encryption: encryptionHelper } = helpers;
const { securityExtension } = extensions;
Expand Down Expand Up @@ -68,12 +76,22 @@ export const performGet = async <T>(
throw SavedObjectsErrorHelpers.createGenericNotFoundError(type, id);
}

const result = getSavedObjectFromSource<T>(registry, type, id, body, {
const document = getSavedObjectFromSource<T>(registry, type, id, body, {
migrationVersionCompatibility,
});

let migrated: SavedObject<T>;
try {
migrated = migrator.migrateDocument(document) as SavedObject<T>;
} catch (error) {
throw SavedObjectsErrorHelpers.decorateGeneralError(
error,
'Failed to migrate document to the latest version.'
);
}

return encryptionHelper.optionallyDecryptAndRedactSingleResult(
result,
migrated,
authorizationResult?.typeMap
);
};
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ import {
type ISavedObjectTypeRegistry,
type SavedObject,
SavedObjectsErrorHelpers,
type SavedObjectUnsanitizedDoc,
} from '@kbn/core-saved-objects-server';
import {
enforceError,
setupAuthorizeAndRedactInternalBulkResolveFailure,
setupAuthorizeAndRedactInternalBulkResolveSuccess,
} from '../../../test_helpers/repository.test.common';
import { savedObjectsExtensionsMock } from '../../../mocks/saved_objects_extensions.mock';
import { kibanaMigratorMock } from '../../../mocks';

const VERSION_PROPS = { _seq_no: 1, _primary_term: 1 };
const OBJ_TYPE = 'obj-type';
Expand All @@ -57,6 +59,7 @@ beforeEach(() => {

describe('internalBulkResolve', () => {
let client: ReturnType<typeof elasticsearchClientMock.createElasticsearchClient>;
let migrator: ReturnType<typeof kibanaMigratorMock.create>;
let serializer: SavedObjectsSerializer;
let incrementCounterInternal: jest.Mock<any, any>;
let registry: jest.Mocked<ISavedObjectTypeRegistry>;
Expand All @@ -72,11 +75,13 @@ describe('internalBulkResolve', () => {
): InternalBulkResolveParams {
registry = typeRegistryMock.create();
client = elasticsearchClientMock.createElasticsearchClient();
migrator = kibanaMigratorMock.create();
serializer = new SavedObjectsSerializer(registry);
incrementCounterInternal = jest.fn().mockRejectedValue(new Error('increment error')); // mock error to implicitly test that it is caught and swallowed
return {
registry,
allowedTypes: [OBJ_TYPE, ENCRYPTED_TYPE],
migrator,
client,
serializer,
getIndexForType: (type: string) => `index-for-${type}`,
Expand Down Expand Up @@ -223,7 +228,7 @@ describe('internalBulkResolve', () => {
};
}

for (const namespace of [undefined, 'default', 'space-x']) {
describe.each([undefined, 'default', 'space-x'])(`with namespace '%s'`, (namespace) => {
const expectedNamespaceString = SavedObjectsUtils.namespaceIdToString(namespace);

it('throws if mget call results in non-ES-originated 404 error', async () => {
Expand Down Expand Up @@ -362,7 +367,29 @@ describe('internalBulkResolve', () => {
expectConflictResult({ id: '7', alias_target_id: '7-newId', alias_purpose: 'y' }),
]);
});
}

it('migrates the resolved objects', async () => {
const objects = [
{ type: OBJ_TYPE, id: '1' },
{ type: OBJ_TYPE, id: '2' },
];
const params = setup(objects, { namespace });
mockBulkResults({ found: false }, { found: false });
mockMgetResults({ found: true }, { found: true });
migrator.migrateDocument.mockImplementation(
(doc) => `migrated-${doc}` as unknown as SavedObjectUnsanitizedDoc
);

await expect(internalBulkResolve(params)).resolves.toHaveProperty('resolved_objects', [
expect.objectContaining({ saved_object: 'migrated-mock-obj-for-1' }),
expect.objectContaining({ saved_object: 'migrated-mock-obj-for-2' }),
]);

expect(migrator.migrateDocument).toHaveBeenCalledTimes(2);
expect(migrator.migrateDocument).nthCalledWith(1, 'mock-obj-for-1');
expect(migrator.migrateDocument).nthCalledWith(2, 'mock-obj-for-2');
});
});

describe('with encryption extension', () => {
const namespace = 'foo';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
SavedObjectsErrorHelpers,
} from '@kbn/core-saved-objects-server';
import {
type IKibanaMigrator,
LEGACY_URL_ALIAS_TYPE,
type LegacyUrlAlias,
} from '@kbn/core-saved-objects-base-server-internal';
Expand Down Expand Up @@ -59,6 +60,7 @@ const MAX_CONCURRENT_RESOLVE = 10;
*/
export interface InternalBulkResolveParams {
registry: ISavedObjectTypeRegistry;
migrator: IKibanaMigrator;
allowedTypes: string[];
client: RepositoryEsClient;
serializer: ISavedObjectsSerializer;
Expand Down Expand Up @@ -98,6 +100,7 @@ export async function internalBulkResolve<T>(
): Promise<InternalSavedObjectsBulkResolveResponse<T>> {
const {
registry,
migrator,
allowedTypes,
client,
serializer,
Expand Down Expand Up @@ -184,10 +187,12 @@ export async function internalBulkResolve<T>(
const object = getSavedObjectFromSource<T>(registry, objectType, objectId, doc, {
migrationVersionCompatibility,
});
if (!encryptionExtension?.isEncryptableType(object.type)) {
return object;
const migrated = migrator.migrateDocument(object) as SavedObject<T>;

if (!encryptionExtension?.isEncryptableType(migrated.type)) {
return migrated;
}
return encryptionExtension.decryptOrStripResponseAttributes(object);
return encryptionExtension.decryptOrStripResponseAttributes(migrated);
}

// map function for pMap below
Expand All @@ -211,28 +216,39 @@ export async function internalBulkResolve<T>(
const { type, id } = either.value;
let result: SavedObjectsResolveResponse<T> | null = null;

if (foundExactMatch && foundAliasMatch) {
result = {
saved_object: await getSavedObject(type, id, exactMatchDoc!),
outcome: 'conflict',
alias_target_id: aliasInfo!.targetId,
alias_purpose: aliasInfo!.purpose,
};
resolveCounter.recordOutcome(REPOSITORY_RESOLVE_OUTCOME_STATS.CONFLICT);
} else if (foundExactMatch) {
result = {
saved_object: await getSavedObject(type, id, exactMatchDoc!),
outcome: 'exactMatch',
};
resolveCounter.recordOutcome(REPOSITORY_RESOLVE_OUTCOME_STATS.EXACT_MATCH);
} else if (foundAliasMatch) {
result = {
saved_object: await getSavedObject(type, aliasInfo!.targetId, aliasMatchDoc!),
outcome: 'aliasMatch',
alias_target_id: aliasInfo!.targetId,
alias_purpose: aliasInfo!.purpose,
try {
if (foundExactMatch && foundAliasMatch) {
result = {
saved_object: await getSavedObject(type, id, exactMatchDoc!),
outcome: 'conflict',
alias_target_id: aliasInfo!.targetId,
alias_purpose: aliasInfo!.purpose,
};
resolveCounter.recordOutcome(REPOSITORY_RESOLVE_OUTCOME_STATS.CONFLICT);
} else if (foundExactMatch) {
result = {
saved_object: await getSavedObject(type, id, exactMatchDoc!),
outcome: 'exactMatch',
};
resolveCounter.recordOutcome(REPOSITORY_RESOLVE_OUTCOME_STATS.EXACT_MATCH);
} else if (foundAliasMatch) {
result = {
saved_object: await getSavedObject(type, aliasInfo!.targetId, aliasMatchDoc!),
outcome: 'aliasMatch',
alias_target_id: aliasInfo!.targetId,
alias_purpose: aliasInfo!.purpose,
};
resolveCounter.recordOutcome(REPOSITORY_RESOLVE_OUTCOME_STATS.ALIAS_MATCH);
}
} catch (error) {
return {
id,
type,
error: SavedObjectsErrorHelpers.decorateGeneralError(
error,
'Failed to migrate document to the latest version.'
),
};
resolveCounter.recordOutcome(REPOSITORY_RESOLVE_OUTCOME_STATS.ALIAS_MATCH);
}

if (result !== null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export const performResolve = async <T>(
helpers,
allowedTypes,
client,
migrator,
serializer,
extensions = {},
} = apiExecutionContext;
Expand All @@ -39,6 +40,7 @@ export const performResolve = async <T>(
registry,
allowedTypes,
client,
migrator,
serializer,
getIndexForType: commonHelper.getIndexForType.bind(commonHelper),
incrementCounterInternal: (t, i, counterFields, opts = {}) =>
Expand Down
Loading