-
Notifications
You must be signed in to change notification settings - Fork 8.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Increase stability when initializing the Elasticsearch index for the event log #57465
Merged
mikecote
merged 13 commits into
elastic:master
from
mikecote:event_log/initialize-es-stability
Feb 14, 2020
Merged
Changes from 11 commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
f406233
Fix ILM policy creation
mikecote e87cadb
Merge branch 'master' of github.com:elastic/kibana into alerting/even…
mikecote 23707bb
Handle errors thrown in scenario multiple Kibana instances are starte…
mikecote e3f9768
Merge branch 'master' of github.com:elastic/kibana into alerting/even…
mikecote 306567c
Merge branch 'master' of github.com:elastic/kibana into alerting/even…
mikecote 28b8d12
Fix tests and cleanup
mikecote 2f24268
Start adding tests
mikecote 2f08752
Merge branch 'master' of github.com:elastic/kibana into alerting/even…
mikecote a818cfc
Refactor tests, add index template failure test
mikecote ee4393c
Create cluster client adapter to facilitate testing and isolation
mikecote 88b525c
Fix places calling callEs still
mikecote f818a08
Merge branch 'master' into event_log/initialize-es-stability
elasticmachine f622e81
Merge branch 'master' into event_log/initialize-es-stability
elasticmachine File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
24 changes: 24 additions & 0 deletions
24
x-pack/plugins/event_log/server/es/cluster_client_adapter.mock.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
import { IClusterClientAdapter } from './cluster_client_adapter'; | ||
|
||
const createClusterClientMock = () => { | ||
const mock: jest.Mocked<IClusterClientAdapter> = { | ||
indexDocument: jest.fn(), | ||
doesIlmPolicyExist: jest.fn(), | ||
createIlmPolicy: jest.fn(), | ||
doesIndexTemplateExist: jest.fn(), | ||
createIndexTemplate: jest.fn(), | ||
doesAliasExist: jest.fn(), | ||
createIndex: jest.fn(), | ||
}; | ||
return mock; | ||
}; | ||
|
||
export const clusterClientAdapterMock = { | ||
create: createClusterClientMock, | ||
}; |
196 changes: 196 additions & 0 deletions
196
x-pack/plugins/event_log/server/es/cluster_client_adapter.test.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,196 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
import { ClusterClient, Logger } from '../../../../../src/core/server'; | ||
import { elasticsearchServiceMock, loggingServiceMock } from '../../../../../src/core/server/mocks'; | ||
import { ClusterClientAdapter, IClusterClientAdapter } from './cluster_client_adapter'; | ||
|
||
type EsClusterClient = Pick<jest.Mocked<ClusterClient>, 'callAsInternalUser' | 'asScoped'>; | ||
|
||
let logger: Logger; | ||
let clusterClient: EsClusterClient; | ||
let clusterClientAdapter: IClusterClientAdapter; | ||
|
||
beforeEach(() => { | ||
logger = loggingServiceMock.createLogger(); | ||
clusterClient = elasticsearchServiceMock.createClusterClient(); | ||
clusterClientAdapter = new ClusterClientAdapter({ | ||
logger, | ||
clusterClient, | ||
}); | ||
}); | ||
|
||
describe('indexDocument', () => { | ||
test('should call cluster client with given doc', async () => { | ||
await clusterClientAdapter.indexDocument({ args: true }); | ||
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('index', { | ||
args: true, | ||
}); | ||
}); | ||
|
||
test('should throw error when cluster client throws an error', async () => { | ||
clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail')); | ||
await expect( | ||
clusterClientAdapter.indexDocument({ args: true }) | ||
).rejects.toThrowErrorMatchingInlineSnapshot(`"Fail"`); | ||
}); | ||
}); | ||
|
||
describe('doesIlmPolicyExist', () => { | ||
const notFoundError = new Error('Not found') as any; | ||
notFoundError.statusCode = 404; | ||
|
||
test('should call cluster with proper arguments', async () => { | ||
await clusterClientAdapter.doesIlmPolicyExist('foo'); | ||
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('transport.request', { | ||
method: 'GET', | ||
path: '_ilm/policy/foo', | ||
}); | ||
}); | ||
|
||
test('should return false when 404 error is returned by Elasticsearch', async () => { | ||
clusterClient.callAsInternalUser.mockRejectedValue(notFoundError); | ||
await expect(clusterClientAdapter.doesIlmPolicyExist('foo')).resolves.toEqual(false); | ||
}); | ||
|
||
test('should throw error when error is not 404', async () => { | ||
clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail')); | ||
await expect( | ||
clusterClientAdapter.doesIlmPolicyExist('foo') | ||
).rejects.toThrowErrorMatchingInlineSnapshot(`"error checking existance of ilm policy: Fail"`); | ||
}); | ||
|
||
test('should return true when no error is thrown', async () => { | ||
await expect(clusterClientAdapter.doesIlmPolicyExist('foo')).resolves.toEqual(true); | ||
}); | ||
}); | ||
|
||
describe('createIlmPolicy', () => { | ||
test('should call cluster client with given policy', async () => { | ||
clusterClient.callAsInternalUser.mockResolvedValue({ success: true }); | ||
await clusterClientAdapter.createIlmPolicy('foo', { args: true }); | ||
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('transport.request', { | ||
method: 'PUT', | ||
path: '_ilm/policy/foo', | ||
body: { args: true }, | ||
}); | ||
}); | ||
|
||
test('should throw error when call cluster client throws', async () => { | ||
clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail')); | ||
await expect( | ||
clusterClientAdapter.createIlmPolicy('foo', { args: true }) | ||
).rejects.toThrowErrorMatchingInlineSnapshot(`"error creating ilm policy: Fail"`); | ||
}); | ||
}); | ||
|
||
describe('doesIndexTemplateExist', () => { | ||
test('should call cluster with proper arguments', async () => { | ||
await clusterClientAdapter.doesIndexTemplateExist('foo'); | ||
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('indices.existsTemplate', { | ||
name: 'foo', | ||
}); | ||
}); | ||
|
||
test('should return true when call cluster returns true', async () => { | ||
clusterClient.callAsInternalUser.mockResolvedValue(true); | ||
await expect(clusterClientAdapter.doesIndexTemplateExist('foo')).resolves.toEqual(true); | ||
}); | ||
|
||
test('should return false when call cluster returns false', async () => { | ||
clusterClient.callAsInternalUser.mockResolvedValue(false); | ||
await expect(clusterClientAdapter.doesIndexTemplateExist('foo')).resolves.toEqual(false); | ||
}); | ||
|
||
test('should throw error when call cluster throws an error', async () => { | ||
clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail')); | ||
await expect( | ||
clusterClientAdapter.doesIndexTemplateExist('foo') | ||
).rejects.toThrowErrorMatchingInlineSnapshot( | ||
`"error checking existance of index template: Fail"` | ||
); | ||
}); | ||
}); | ||
|
||
describe('createIndexTemplate', () => { | ||
test('should call cluster with given template', async () => { | ||
await clusterClientAdapter.createIndexTemplate('foo', { args: true }); | ||
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('indices.putTemplate', { | ||
name: 'foo', | ||
create: true, | ||
body: { args: true }, | ||
}); | ||
}); | ||
|
||
test(`should throw error if index template still doesn't exist after error is thrown`, async () => { | ||
clusterClient.callAsInternalUser.mockRejectedValueOnce(new Error('Fail')); | ||
clusterClient.callAsInternalUser.mockResolvedValueOnce(false); | ||
await expect( | ||
clusterClientAdapter.createIndexTemplate('foo', { args: true }) | ||
).rejects.toThrowErrorMatchingInlineSnapshot(`"error creating index template: Fail"`); | ||
}); | ||
|
||
test('should not throw error if index template exists after error is thrown', async () => { | ||
clusterClient.callAsInternalUser.mockRejectedValueOnce(new Error('Fail')); | ||
clusterClient.callAsInternalUser.mockResolvedValueOnce(true); | ||
await clusterClientAdapter.createIndexTemplate('foo', { args: true }); | ||
}); | ||
}); | ||
|
||
describe('doesAliasExist', () => { | ||
test('should call cluster with proper arguments', async () => { | ||
await clusterClientAdapter.doesAliasExist('foo'); | ||
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('indices.existsAlias', { | ||
name: 'foo', | ||
}); | ||
}); | ||
|
||
test('should return true when call cluster returns true', async () => { | ||
clusterClient.callAsInternalUser.mockResolvedValueOnce(true); | ||
await expect(clusterClientAdapter.doesAliasExist('foo')).resolves.toEqual(true); | ||
}); | ||
|
||
test('should return false when call cluster returns false', async () => { | ||
clusterClient.callAsInternalUser.mockResolvedValueOnce(false); | ||
await expect(clusterClientAdapter.doesAliasExist('foo')).resolves.toEqual(false); | ||
}); | ||
|
||
test('should throw error when call cluster throws an error', async () => { | ||
clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail')); | ||
await expect( | ||
clusterClientAdapter.doesAliasExist('foo') | ||
).rejects.toThrowErrorMatchingInlineSnapshot( | ||
`"error checking existance of initial index: Fail"` | ||
); | ||
}); | ||
}); | ||
|
||
describe('createIndex', () => { | ||
test('should call cluster with proper arguments', async () => { | ||
await clusterClientAdapter.createIndex('foo'); | ||
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('indices.create', { | ||
index: 'foo', | ||
}); | ||
}); | ||
|
||
test('should throw error when not getting an error of type resource_already_exists_exception', async () => { | ||
clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail')); | ||
await expect( | ||
clusterClientAdapter.createIndex('foo') | ||
).rejects.toThrowErrorMatchingInlineSnapshot(`"error creating initial index: Fail"`); | ||
}); | ||
|
||
test(`shouldn't throw when an error of type resource_already_exists_exception is thrown`, async () => { | ||
const err = new Error('Already exists') as any; | ||
err.body = { | ||
error: { | ||
type: 'resource_already_exists_exception', | ||
}, | ||
}; | ||
clusterClient.callAsInternalUser.mockRejectedValue(err); | ||
await clusterClientAdapter.createIndex('foo'); | ||
}); | ||
}); |
126 changes: 126 additions & 0 deletions
126
x-pack/plugins/event_log/server/es/cluster_client_adapter.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
import { Logger, ClusterClient } from '../../../../../src/core/server'; | ||
|
||
export type EsClusterClient = Pick<ClusterClient, 'callAsInternalUser' | 'asScoped'>; | ||
export type IClusterClientAdapter = PublicMethodsOf<ClusterClientAdapter>; | ||
|
||
export interface ConstructorOpts { | ||
logger: Logger; | ||
clusterClient: EsClusterClient; | ||
} | ||
|
||
export class ClusterClientAdapter { | ||
private readonly logger: Logger; | ||
private readonly clusterClient: EsClusterClient; | ||
|
||
constructor(opts: ConstructorOpts) { | ||
this.logger = opts.logger; | ||
this.clusterClient = opts.clusterClient; | ||
} | ||
|
||
public async indexDocument(doc: any): Promise<void> { | ||
await this.callEs('index', doc); | ||
} | ||
|
||
public async doesIlmPolicyExist(policyName: string): Promise<boolean> { | ||
const request = { | ||
method: 'GET', | ||
path: `_ilm/policy/${policyName}`, | ||
}; | ||
try { | ||
await this.callEs('transport.request', request); | ||
} catch (err) { | ||
if (err.statusCode === 404) return false; | ||
throw new Error(`error checking existance of ilm policy: ${err.message}`); | ||
} | ||
return true; | ||
} | ||
|
||
public async createIlmPolicy(policyName: string, policy: any): Promise<void> { | ||
const request = { | ||
method: 'PUT', | ||
path: `_ilm/policy/${policyName}`, | ||
body: policy, | ||
}; | ||
try { | ||
await this.callEs('transport.request', request); | ||
} catch (err) { | ||
throw new Error(`error creating ilm policy: ${err.message}`); | ||
} | ||
} | ||
|
||
public async doesIndexTemplateExist(name: string): Promise<boolean> { | ||
let result; | ||
try { | ||
result = await this.callEs('indices.existsTemplate', { name }); | ||
} catch (err) { | ||
throw new Error(`error checking existance of index template: ${err.message}`); | ||
} | ||
return result as boolean; | ||
} | ||
|
||
public async createIndexTemplate(name: string, template: any): Promise<void> { | ||
const addTemplateParams = { | ||
name, | ||
create: true, | ||
body: template, | ||
}; | ||
try { | ||
await this.callEs('indices.putTemplate', addTemplateParams); | ||
} catch (err) { | ||
// The error message doesn't have a type attribute we can look to guarantee it's due | ||
// to the template already existing (only long message) so we'll check ourselves to see | ||
// if the template now exists. This scenario would happen if you startup multiple Kibana | ||
// instances at the same time. | ||
const existsNow = await this.doesIndexTemplateExist(name); | ||
if (!existsNow) { | ||
throw new Error(`error creating index template: ${err.message}`); | ||
} | ||
} | ||
} | ||
|
||
public async doesAliasExist(name: string): Promise<boolean> { | ||
let result; | ||
try { | ||
result = await this.callEs('indices.existsAlias', { name }); | ||
} catch (err) { | ||
throw new Error(`error checking existance of initial index: ${err.message}`); | ||
} | ||
return result as boolean; | ||
} | ||
|
||
public async createIndex(name: string): Promise<void> { | ||
try { | ||
await this.callEs('indices.create', { index: name }); | ||
} catch (err) { | ||
if (err.body?.error?.type !== 'resource_already_exists_exception') { | ||
throw new Error(`error creating initial index: ${err.message}`); | ||
} | ||
} | ||
} | ||
|
||
private async callEs(operation: string, body?: any): Promise<any> { | ||
try { | ||
this.debug(`callEs(${operation}) calls:`, body); | ||
const result = await this.clusterClient.callAsInternalUser(operation, body); | ||
this.debug(`callEs(${operation}) result:`, result); | ||
return result; | ||
} catch (err) { | ||
this.debug(`callEs(${operation}) error:`, { | ||
message: err.message, | ||
statusCode: err.statusCode, | ||
}); | ||
throw err; | ||
} | ||
} | ||
|
||
private debug(message: string, object?: any) { | ||
const objectString = object == null ? '' : JSON.stringify(object); | ||
this.logger.debug(`esContext: ${message} ${objectString}`); | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interesting that we don't need a result here!