Skip to content

Commit

Permalink
changed reassign to work on batches of 10k
Browse files Browse the repository at this point in the history
  • Loading branch information
juliaElastic committed Jun 22, 2022
1 parent 5e9d1b0 commit af3a2d4
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 16 deletions.
48 changes: 48 additions & 0 deletions x-pack/plugins/fleet/server/services/agents/crud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,54 @@ export async function getAgentsByKuery(
};
}

export async function processAgentsInBatches(
esClient: ElasticsearchClient,
options: Omit<ListWithKuery, 'page'> & {
showInactive: boolean;
},
processAgents: (
agents: Agent[],
includeSuccess: boolean
) => Promise<{ items: BulkActionResult[] }>
): Promise<{ items: BulkActionResult[] }> {
const pitId = await openAgentsPointInTime(esClient);

const perPage = options.perPage ?? SO_SEARCH_LIMIT;

const res = await getAgentsByKueryPit(esClient, {
...options,
page: 1,
perPage,
pitId,
});

let currentAgents = res.agents;
// include successful agents if total agents does not exceed 10k
const skipSuccess = res.total > SO_SEARCH_LIMIT;

let results = await processAgents(currentAgents, skipSuccess);
let allAgentsProcessed = currentAgents.length;

while (allAgentsProcessed < res.total) {
const lastAgent = currentAgents[currentAgents.length - 1];
const nextPage = await getAgentsByKueryPit(esClient, {
...options,
page: 1,
perPage,
pitId,
searchAfter: lastAgent.sort!,
});
currentAgents = nextPage.agents;
const currentResults = await processAgents(currentAgents, skipSuccess);
results = { items: results.items.concat(currentResults.items) };
allAgentsProcessed += currentAgents.length;
}

await closeAgentsPointInTime(esClient, pitId);

return results;
}

export async function getAllAgentsByKuery(
esClient: ElasticsearchClient,
options: Omit<ListWithKuery, 'page'> & {
Expand Down
79 changes: 63 additions & 16 deletions x-pack/plugins/fleet/server/services/agents/reassign.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import { AgentReassignmentError, HostedAgentPolicyRestrictionRelatedError } from

import {
getAgentDocuments,
getAgents,
getAgentPolicyForAgent,
updateAgent,
bulkUpdateAgents,
processAgentsInBatches,
} from './crud';
import type { GetAgentsOptions } from '.';
import { createAgentAction } from './actions';
Expand Down Expand Up @@ -107,10 +107,46 @@ export async function reassignAgents(
}
}
} else if ('kuery' in options) {
givenAgents = await getAgents(esClient, options);
return await processAgentsInBatches(
esClient,
{
kuery: options.kuery,
showInactive: options.showInactive ?? false,
perPage: options.perPage,
},
async (agents: Agent[], skipSuccess: boolean) =>
await reassignBatch(
soClient,
esClient,
newAgentPolicyId,
agents,
outgoingErrors,
undefined,
skipSuccess
)
);
}
const givenOrder =
'agentIds' in options ? options.agentIds : givenAgents.map((agent) => agent.id);

return await reassignBatch(
soClient,
esClient,
newAgentPolicyId,
givenAgents,
outgoingErrors,
'agentIds' in options ? options.agentIds : undefined
);
}

async function reassignBatch(
soClient: SavedObjectsClientContract,
esClient: ElasticsearchClient,
newAgentPolicyId: string,
givenAgents: Agent[],
outgoingErrors: Record<Agent['id'], Error>,
agentIds?: string[],
skipSuccess?: boolean
): Promise<{ items: BulkActionResult[] }> {
const errors: Record<Agent['id'], Error> = { ...outgoingErrors };

const hostedPolicies = await getHostedPolicies(soClient, givenAgents);

Expand All @@ -137,7 +173,7 @@ export async function reassignAgents(
agents.push(result.value);
} else {
const id = givenAgents[index].id;
outgoingErrors[id] = result.reason;
errors[id] = result.reason;
}
return agents;
}, []);
Expand All @@ -153,17 +189,28 @@ export async function reassignAgents(
}))
);

const orderedOut = givenOrder.map((agentId) => {
const hasError = agentId in outgoingErrors;
const result: BulkActionResult = {
let results;

if (!skipSuccess) {
const givenOrder = agentIds ? agentIds : givenAgents.map((agent) => agent.id);
results = givenOrder.map((agentId) => {
const hasError = agentId in errors;
const result: BulkActionResult = {
id: agentId,
success: !hasError,
};
if (hasError) {
result.error = errors[agentId];
}
return result;
});
} else {
results = Object.entries(errors).map(([agentId, error]) => ({
id: agentId,
success: !hasError,
};
if (hasError) {
result.error = outgoingErrors[agentId];
}
return result;
});
success: false,
error,
}));
}

const now = new Date().toISOString();
await createAgentAction(esClient, {
Expand All @@ -172,5 +219,5 @@ export async function reassignAgents(
type: 'POLICY_REASSIGN',
});

return { items: orderedOut };
return { items: results };
}

0 comments on commit af3a2d4

Please sign in to comment.