Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

esArchiver datastream support #132853

Merged
merged 20 commits into from
Jun 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/kbn-es-archiver/src/actions/save.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ export async function saveAction({
// export and save the matching indices to mappings.json
createPromiseFromStreams([
createListStream(indices),
createGenerateIndexRecordsStream({ client, stats, keepIndexNames }),
createGenerateIndexRecordsStream({ client, stats, keepIndexNames, log }),
...createFormatArchiveStreams(),
createWriteStream(resolve(outputDir, 'mappings.json')),
] as [Readable, ...Writable[]]),
Expand Down
2 changes: 1 addition & 1 deletion packages/kbn-es-archiver/src/actions/unload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export async function unloadAction({
await createPromiseFromStreams([
createReadStream(resolve(inputDir, filename)) as Readable,
...createParseArchiveStreams({ gzip: isGzip(filename) }),
createFilterRecordsStream('index'),
createFilterRecordsStream((record) => ['index', 'data_stream'].includes(record.type)),
createDeleteIndexStream(client, stats, log),
] as [Readable, ...Writable[]]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,29 @@ interface SearchResponses {
}>;
}

function createMockClient(responses: SearchResponses) {
function createMockClient(responses: SearchResponses, hasDataStreams = false) {
// TODO: replace with proper mocked client
const client: any = {
helpers: {
scrollSearch: jest.fn(function* ({ index }) {
if (hasDataStreams) {
index = `.ds-${index}`;
}

while (responses[index] && responses[index].length) {
yield responses[index].shift()!;
}
}),
},
indices: {
get: jest.fn(async ({ index }) => {
return { [index]: { data_stream: hasDataStreams && index.substring(4) } };
}),
getDataStream: jest.fn(async ({ name }) => {
if (!hasDataStreams) return { data_streams: [] };
return { data_streams: [{ name }] };
}),
},
};
return client;
}
Expand Down Expand Up @@ -217,6 +230,35 @@ describe('esArchiver: createGenerateDocRecordsStream()', () => {
`);
});

it('supports data streams', async () => {
const hits = [
{ _index: '.ds-foo-datastream', _id: '0', _source: {} },
{ _index: '.ds-foo-datastream', _id: '1', _source: {} },
];
const responses = {
'.ds-foo-datastream': [{ body: { hits: { hits, total: hits.length } } }],
};
const client = createMockClient(responses, true);

const stats = createStats('test', log);
const progress = new Progress();

const results = await createPromiseFromStreams([
createListStream(['foo-datastream']),
createGenerateDocRecordsStream({
client,
stats,
progress,
}),
createMapStream((record: any) => {
return `${record.value.data_stream}:${record.value.id}`;
}),
createConcatStream([]),
]);

expect(results).toEqual(['foo-datastream:0', 'foo-datastream:1']);
});

describe('keepIndexNames', () => {
it('changes .kibana* index names if keepIndexNames is not enabled', async () => {
const hits = [{ _index: '.kibana_7.16.0_001', _id: '0', _source: {} }];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ export function createGenerateDocRecordsStream({
}
);

const hasDatastreams =
(await client.indices.getDataStream({ name: index })).data_streams.length > 0;
const indexToDatastream = new Map();

let remainingHits: number | null = null;

for await (const resp of interator) {
Expand All @@ -57,14 +61,25 @@ export function createGenerateDocRecordsStream({

for (const hit of resp.body.hits.hits) {
remainingHits -= 1;
stats.archivedDoc(hit._index);

if (hasDatastreams && !indexToDatastream.has(hit._index)) {
const {
[hit._index]: { data_stream: dataStream },
} = await client.indices.get({ index: hit._index, filter_path: ['*.data_stream'] });
indexToDatastream.set(hit._index, dataStream);
}

const dataStream = indexToDatastream.get(hit._index);
stats.archivedDoc(dataStream || hit._index);

this.push({
type: 'doc',
value: {
// if keepIndexNames is false, rewrite the .kibana_* index to .kibana_1 so that
// when it is loaded it can skip migration, if possible
index:
hit._index.startsWith('.kibana') && !keepIndexNames ? '.kibana_1' : hit._index,
data_stream: dataStream,
Comment on lines 78 to +82
Copy link
Contributor

@spalger spalger Jun 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Part of me would prefer that docs either had an index or a data_stream, but I'm not opposed to keeping the index if there's some use for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mainly kept it for traceability when debugging or inspecting archived data, besides that there's no real use for it :)

id: hit._id,
source: hit._source,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,55 @@ describe('bulk helper onDocument param', () => {
createIndexDocRecordsStream(client as any, stats, progress, true),
]);
});

it('returns create ops for data stream documents', async () => {
const records = [
{
type: 'doc',
value: {
index: '.ds-foo-ds',
data_stream: 'foo-ds',
id: '0',
source: {
hello: 'world',
},
},
},
{
type: 'doc',
value: {
index: '.ds-foo-ds',
data_stream: 'foo-ds',
id: '1',
source: {
hello: 'world',
},
},
},
];
expect.assertions(records.length);

const client = new MockClient();
client.helpers.bulk.mockImplementation(async ({ datasource, onDocument }) => {
for (const d of datasource) {
const op = onDocument(d);
expect(op).toEqual({
create: {
_index: 'foo-ds',
_id: expect.stringMatching(/^\d$/),
},
});
}
});

const stats = createStats('test', log);
const progress = new Progress();

await createPromiseFromStreams([
createListStream(records),
createIndexDocRecordsStream(client as any, stats, progress),
]);
});
});

describe('bulk helper onDrop param', () => {
Expand Down
15 changes: 11 additions & 4 deletions packages/kbn-es-archiver/src/lib/docs/index_doc_records_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,19 @@ import { Stats } from '../stats';
import { Progress } from '../progress';
import { ES_CLIENT_HEADERS } from '../../client_headers';

enum BulkOperation {
Create = 'create',
Index = 'index',
}

export function createIndexDocRecordsStream(
client: Client,
stats: Stats,
progress: Progress,
useCreate: boolean = false
) {
async function indexDocs(docs: any[]) {
const operation = useCreate === true ? 'create' : 'index';
const operation = useCreate === true ? BulkOperation.Create : BulkOperation.Index;
const ops = new WeakMap<any, any>();
const errors: string[] = [];

Expand All @@ -29,9 +34,11 @@ export function createIndexDocRecordsStream(
retries: 5,
datasource: docs.map((doc) => {
const body = doc.source;
const op = doc.data_stream ? BulkOperation.Create : operation;
const index = doc.data_stream || doc.index;
ops.set(body, {
[operation]: {
_index: doc.index,
[op]: {
_index: index,
_id: doc.id,
},
});
Expand All @@ -56,7 +63,7 @@ export function createIndexDocRecordsStream(
}

for (const doc of docs) {
stats.indexedDoc(doc.index);
stats.indexedDoc(doc.data_stream || doc.index);
}
}

Expand Down
2 changes: 2 additions & 0 deletions packages/kbn-es-archiver/src/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,5 @@ export {
export { readDirectory } from './directory';

export { Progress } from './progress';

export { getIndexTemplate } from './index_template';
105 changes: 105 additions & 0 deletions packages/kbn-es-archiver/src/lib/index_template.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { Client } from '@elastic/elasticsearch';

import sinon from 'sinon';
import { getIndexTemplate } from './index_template';

describe('esArchiver: index template', () => {
describe('getIndexTemplate', () => {
it('returns the index template', async () => {
const client = {
indices: {
getIndexTemplate: sinon.stub().resolves({
index_templates: [
{
index_template: {
index_patterns: ['pattern-*'],
template: {
mappings: { properties: { foo: { type: 'keyword' } } },
},
priority: 500,
composed_of: [],
data_stream: { hidden: false },
},
},
],
}),
},
} as unknown as Client;

const template = await getIndexTemplate(client, 'template-foo');

expect(template).toEqual({
name: 'template-foo',
index_patterns: ['pattern-*'],
template: {
mappings: { properties: { foo: { type: 'keyword' } } },
},
priority: 500,
data_stream: { hidden: false },
});
});

it('resolves component templates', async () => {
const client = {
indices: {
getIndexTemplate: sinon.stub().resolves({
index_templates: [
{
index_template: {
index_patterns: ['pattern-*'],
composed_of: ['the-settings', 'the-mappings'],
},
},
],
}),
},
cluster: {
getComponentTemplate: sinon
.stub()
.onFirstCall()
.resolves({
component_templates: [
{
component_template: {
template: {
aliases: { 'foo-alias': {} },
},
},
},
],
})
.onSecondCall()
.resolves({
component_templates: [
{
component_template: {
template: {
mappings: { properties: { foo: { type: 'keyword' } } },
},
},
},
],
}),
},
} as unknown as Client;

const template = await getIndexTemplate(client, 'template-foo');

expect(template).toEqual({
name: 'template-foo',
index_patterns: ['pattern-*'],
template: {
aliases: { 'foo-alias': {} },
mappings: { properties: { foo: { type: 'keyword' } } },
},
});
});
});
});
37 changes: 37 additions & 0 deletions packages/kbn-es-archiver/src/lib/index_template.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import { merge } from 'lodash';
import type { Client } from '@elastic/elasticsearch';

import { ES_CLIENT_HEADERS } from '../client_headers';

export const getIndexTemplate = async (client: Client, templateName: string) => {
const { index_templates: indexTemplates } = await client.indices.getIndexTemplate(
{ name: templateName },
{ headers: ES_CLIENT_HEADERS }
);
const {
index_template: { template, composed_of: composedOf = [], ...indexTemplate },
} = indexTemplates[0];

const components = await Promise.all(
composedOf.map(async (component) => {
const { component_templates: componentTemplates } = await client.cluster.getComponentTemplate(
{ name: component }
);
return componentTemplates[0].component_template.template;
})
);

return {
...indexTemplate,
name: templateName,
template: merge(template, ...components),
};
};
Loading