-
Notifications
You must be signed in to change notification settings - Fork 8.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Increase stability when initializing the Elasticsearch index for the …
…event log (#57465) (#57731) * Fix ILM policy creation * Handle errors thrown in scenario multiple Kibana instances are started at the same time * Fix tests and cleanup * Start adding tests * Refactor tests, add index template failure test * Create cluster client adapter to facilitate testing and isolation * Fix places calling callEs still Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com> Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
- Loading branch information
1 parent
8990d35
commit 0eb38fc
Showing
13 changed files
with
495 additions
and
185 deletions.
There are no files selected for viewing
24 changes: 24 additions & 0 deletions
24
x-pack/plugins/event_log/server/es/cluster_client_adapter.mock.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
import { IClusterClientAdapter } from './cluster_client_adapter'; | ||
|
||
const createClusterClientMock = () => { | ||
const mock: jest.Mocked<IClusterClientAdapter> = { | ||
indexDocument: jest.fn(), | ||
doesIlmPolicyExist: jest.fn(), | ||
createIlmPolicy: jest.fn(), | ||
doesIndexTemplateExist: jest.fn(), | ||
createIndexTemplate: jest.fn(), | ||
doesAliasExist: jest.fn(), | ||
createIndex: jest.fn(), | ||
}; | ||
return mock; | ||
}; | ||
|
||
export const clusterClientAdapterMock = { | ||
create: createClusterClientMock, | ||
}; |
196 changes: 196 additions & 0 deletions
196
x-pack/plugins/event_log/server/es/cluster_client_adapter.test.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,196 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
import { ClusterClient, Logger } from '../../../../../src/core/server'; | ||
import { elasticsearchServiceMock, loggingServiceMock } from '../../../../../src/core/server/mocks'; | ||
import { ClusterClientAdapter, IClusterClientAdapter } from './cluster_client_adapter'; | ||
|
||
type EsClusterClient = Pick<jest.Mocked<ClusterClient>, 'callAsInternalUser' | 'asScoped'>; | ||
|
||
let logger: Logger; | ||
let clusterClient: EsClusterClient; | ||
let clusterClientAdapter: IClusterClientAdapter; | ||
|
||
beforeEach(() => { | ||
logger = loggingServiceMock.createLogger(); | ||
clusterClient = elasticsearchServiceMock.createClusterClient(); | ||
clusterClientAdapter = new ClusterClientAdapter({ | ||
logger, | ||
clusterClient, | ||
}); | ||
}); | ||
|
||
describe('indexDocument', () => { | ||
test('should call cluster client with given doc', async () => { | ||
await clusterClientAdapter.indexDocument({ args: true }); | ||
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('index', { | ||
args: true, | ||
}); | ||
}); | ||
|
||
test('should throw error when cluster client throws an error', async () => { | ||
clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail')); | ||
await expect( | ||
clusterClientAdapter.indexDocument({ args: true }) | ||
).rejects.toThrowErrorMatchingInlineSnapshot(`"Fail"`); | ||
}); | ||
}); | ||
|
||
describe('doesIlmPolicyExist', () => { | ||
const notFoundError = new Error('Not found') as any; | ||
notFoundError.statusCode = 404; | ||
|
||
test('should call cluster with proper arguments', async () => { | ||
await clusterClientAdapter.doesIlmPolicyExist('foo'); | ||
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('transport.request', { | ||
method: 'GET', | ||
path: '_ilm/policy/foo', | ||
}); | ||
}); | ||
|
||
test('should return false when 404 error is returned by Elasticsearch', async () => { | ||
clusterClient.callAsInternalUser.mockRejectedValue(notFoundError); | ||
await expect(clusterClientAdapter.doesIlmPolicyExist('foo')).resolves.toEqual(false); | ||
}); | ||
|
||
test('should throw error when error is not 404', async () => { | ||
clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail')); | ||
await expect( | ||
clusterClientAdapter.doesIlmPolicyExist('foo') | ||
).rejects.toThrowErrorMatchingInlineSnapshot(`"error checking existance of ilm policy: Fail"`); | ||
}); | ||
|
||
test('should return true when no error is thrown', async () => { | ||
await expect(clusterClientAdapter.doesIlmPolicyExist('foo')).resolves.toEqual(true); | ||
}); | ||
}); | ||
|
||
describe('createIlmPolicy', () => { | ||
test('should call cluster client with given policy', async () => { | ||
clusterClient.callAsInternalUser.mockResolvedValue({ success: true }); | ||
await clusterClientAdapter.createIlmPolicy('foo', { args: true }); | ||
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('transport.request', { | ||
method: 'PUT', | ||
path: '_ilm/policy/foo', | ||
body: { args: true }, | ||
}); | ||
}); | ||
|
||
test('should throw error when call cluster client throws', async () => { | ||
clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail')); | ||
await expect( | ||
clusterClientAdapter.createIlmPolicy('foo', { args: true }) | ||
).rejects.toThrowErrorMatchingInlineSnapshot(`"error creating ilm policy: Fail"`); | ||
}); | ||
}); | ||
|
||
describe('doesIndexTemplateExist', () => { | ||
test('should call cluster with proper arguments', async () => { | ||
await clusterClientAdapter.doesIndexTemplateExist('foo'); | ||
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('indices.existsTemplate', { | ||
name: 'foo', | ||
}); | ||
}); | ||
|
||
test('should return true when call cluster returns true', async () => { | ||
clusterClient.callAsInternalUser.mockResolvedValue(true); | ||
await expect(clusterClientAdapter.doesIndexTemplateExist('foo')).resolves.toEqual(true); | ||
}); | ||
|
||
test('should return false when call cluster returns false', async () => { | ||
clusterClient.callAsInternalUser.mockResolvedValue(false); | ||
await expect(clusterClientAdapter.doesIndexTemplateExist('foo')).resolves.toEqual(false); | ||
}); | ||
|
||
test('should throw error when call cluster throws an error', async () => { | ||
clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail')); | ||
await expect( | ||
clusterClientAdapter.doesIndexTemplateExist('foo') | ||
).rejects.toThrowErrorMatchingInlineSnapshot( | ||
`"error checking existance of index template: Fail"` | ||
); | ||
}); | ||
}); | ||
|
||
describe('createIndexTemplate', () => { | ||
test('should call cluster with given template', async () => { | ||
await clusterClientAdapter.createIndexTemplate('foo', { args: true }); | ||
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('indices.putTemplate', { | ||
name: 'foo', | ||
create: true, | ||
body: { args: true }, | ||
}); | ||
}); | ||
|
||
test(`should throw error if index template still doesn't exist after error is thrown`, async () => { | ||
clusterClient.callAsInternalUser.mockRejectedValueOnce(new Error('Fail')); | ||
clusterClient.callAsInternalUser.mockResolvedValueOnce(false); | ||
await expect( | ||
clusterClientAdapter.createIndexTemplate('foo', { args: true }) | ||
).rejects.toThrowErrorMatchingInlineSnapshot(`"error creating index template: Fail"`); | ||
}); | ||
|
||
test('should not throw error if index template exists after error is thrown', async () => { | ||
clusterClient.callAsInternalUser.mockRejectedValueOnce(new Error('Fail')); | ||
clusterClient.callAsInternalUser.mockResolvedValueOnce(true); | ||
await clusterClientAdapter.createIndexTemplate('foo', { args: true }); | ||
}); | ||
}); | ||
|
||
describe('doesAliasExist', () => { | ||
test('should call cluster with proper arguments', async () => { | ||
await clusterClientAdapter.doesAliasExist('foo'); | ||
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('indices.existsAlias', { | ||
name: 'foo', | ||
}); | ||
}); | ||
|
||
test('should return true when call cluster returns true', async () => { | ||
clusterClient.callAsInternalUser.mockResolvedValueOnce(true); | ||
await expect(clusterClientAdapter.doesAliasExist('foo')).resolves.toEqual(true); | ||
}); | ||
|
||
test('should return false when call cluster returns false', async () => { | ||
clusterClient.callAsInternalUser.mockResolvedValueOnce(false); | ||
await expect(clusterClientAdapter.doesAliasExist('foo')).resolves.toEqual(false); | ||
}); | ||
|
||
test('should throw error when call cluster throws an error', async () => { | ||
clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail')); | ||
await expect( | ||
clusterClientAdapter.doesAliasExist('foo') | ||
).rejects.toThrowErrorMatchingInlineSnapshot( | ||
`"error checking existance of initial index: Fail"` | ||
); | ||
}); | ||
}); | ||
|
||
describe('createIndex', () => { | ||
test('should call cluster with proper arguments', async () => { | ||
await clusterClientAdapter.createIndex('foo'); | ||
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('indices.create', { | ||
index: 'foo', | ||
}); | ||
}); | ||
|
||
test('should throw error when not getting an error of type resource_already_exists_exception', async () => { | ||
clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail')); | ||
await expect( | ||
clusterClientAdapter.createIndex('foo') | ||
).rejects.toThrowErrorMatchingInlineSnapshot(`"error creating initial index: Fail"`); | ||
}); | ||
|
||
test(`shouldn't throw when an error of type resource_already_exists_exception is thrown`, async () => { | ||
const err = new Error('Already exists') as any; | ||
err.body = { | ||
error: { | ||
type: 'resource_already_exists_exception', | ||
}, | ||
}; | ||
clusterClient.callAsInternalUser.mockRejectedValue(err); | ||
await clusterClientAdapter.createIndex('foo'); | ||
}); | ||
}); |
126 changes: 126 additions & 0 deletions
126
x-pack/plugins/event_log/server/es/cluster_client_adapter.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
import { Logger, ClusterClient } from '../../../../../src/core/server'; | ||
|
||
export type EsClusterClient = Pick<ClusterClient, 'callAsInternalUser' | 'asScoped'>; | ||
export type IClusterClientAdapter = PublicMethodsOf<ClusterClientAdapter>; | ||
|
||
export interface ConstructorOpts { | ||
logger: Logger; | ||
clusterClient: EsClusterClient; | ||
} | ||
|
||
export class ClusterClientAdapter { | ||
private readonly logger: Logger; | ||
private readonly clusterClient: EsClusterClient; | ||
|
||
constructor(opts: ConstructorOpts) { | ||
this.logger = opts.logger; | ||
this.clusterClient = opts.clusterClient; | ||
} | ||
|
||
public async indexDocument(doc: any): Promise<void> { | ||
await this.callEs('index', doc); | ||
} | ||
|
||
public async doesIlmPolicyExist(policyName: string): Promise<boolean> { | ||
const request = { | ||
method: 'GET', | ||
path: `_ilm/policy/${policyName}`, | ||
}; | ||
try { | ||
await this.callEs('transport.request', request); | ||
} catch (err) { | ||
if (err.statusCode === 404) return false; | ||
throw new Error(`error checking existance of ilm policy: ${err.message}`); | ||
} | ||
return true; | ||
} | ||
|
||
public async createIlmPolicy(policyName: string, policy: any): Promise<void> { | ||
const request = { | ||
method: 'PUT', | ||
path: `_ilm/policy/${policyName}`, | ||
body: policy, | ||
}; | ||
try { | ||
await this.callEs('transport.request', request); | ||
} catch (err) { | ||
throw new Error(`error creating ilm policy: ${err.message}`); | ||
} | ||
} | ||
|
||
public async doesIndexTemplateExist(name: string): Promise<boolean> { | ||
let result; | ||
try { | ||
result = await this.callEs('indices.existsTemplate', { name }); | ||
} catch (err) { | ||
throw new Error(`error checking existance of index template: ${err.message}`); | ||
} | ||
return result as boolean; | ||
} | ||
|
||
public async createIndexTemplate(name: string, template: any): Promise<void> { | ||
const addTemplateParams = { | ||
name, | ||
create: true, | ||
body: template, | ||
}; | ||
try { | ||
await this.callEs('indices.putTemplate', addTemplateParams); | ||
} catch (err) { | ||
// The error message doesn't have a type attribute we can look to guarantee it's due | ||
// to the template already existing (only long message) so we'll check ourselves to see | ||
// if the template now exists. This scenario would happen if you startup multiple Kibana | ||
// instances at the same time. | ||
const existsNow = await this.doesIndexTemplateExist(name); | ||
if (!existsNow) { | ||
throw new Error(`error creating index template: ${err.message}`); | ||
} | ||
} | ||
} | ||
|
||
public async doesAliasExist(name: string): Promise<boolean> { | ||
let result; | ||
try { | ||
result = await this.callEs('indices.existsAlias', { name }); | ||
} catch (err) { | ||
throw new Error(`error checking existance of initial index: ${err.message}`); | ||
} | ||
return result as boolean; | ||
} | ||
|
||
public async createIndex(name: string): Promise<void> { | ||
try { | ||
await this.callEs('indices.create', { index: name }); | ||
} catch (err) { | ||
if (err.body?.error?.type !== 'resource_already_exists_exception') { | ||
throw new Error(`error creating initial index: ${err.message}`); | ||
} | ||
} | ||
} | ||
|
||
private async callEs(operation: string, body?: any): Promise<any> { | ||
try { | ||
this.debug(`callEs(${operation}) calls:`, body); | ||
const result = await this.clusterClient.callAsInternalUser(operation, body); | ||
this.debug(`callEs(${operation}) result:`, result); | ||
return result; | ||
} catch (err) { | ||
this.debug(`callEs(${operation}) error:`, { | ||
message: err.message, | ||
statusCode: err.statusCode, | ||
}); | ||
throw err; | ||
} | ||
} | ||
|
||
private debug(message: string, object?: any) { | ||
const objectString = object == null ? '' : JSON.stringify(object); | ||
this.logger.debug(`esContext: ${message} ${objectString}`); | ||
} | ||
} |
Oops, something went wrong.