Skip to content

Commit

Permalink
[FTR] migrate p-retry usage to Retry service (elastic#206088)
Browse files Browse the repository at this point in the history
## Summary

Use the
[tryWithRetries](https://github.com/elastic/kibana/blob/37d7a5efb760dc271a4fb26f1eeba0d66e037b07/packages/kbn-ftr-common-functional-services/services/retry/retry.ts#L105)
service method instead of `pRetry` as detailed
[here](elastic#178535)

`tryWithRetries` offers granular control of `retryCount`, `retryDelay`,
and `timeout`.
> [!IMPORTANT]
In some cases, there are helper functions that do not have access to the
FTR's provider context.
So, instead of using `retry.tryWithRetries`, we are using
`retryForSuccess` instead.
`retryForSuccess` is the function that `tryWithRetries` uses "_under the
hood_".
As long as we use the `retryCount` argument, we will get the retry
logging, as per [this related
pr](elastic#205894)
 
Related: elastic#178535,
elastic#205894

---------

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
  • Loading branch information
wayneseymour and elasticmachine authored Jan 16, 2025
1 parent 5e8d2b5 commit 9a439b7
Show file tree
Hide file tree
Showing 13 changed files with 158 additions and 111 deletions.
1 change: 1 addition & 0 deletions packages/kbn-ftr-common-functional-services/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@ export { DeploymentService } from './services/deployment';
export { IndexPatternsService } from './services/index_patterns';
export { RandomnessService } from './services/randomness';
export { KibanaSupertestProvider, ElasticsearchSupertestProvider } from './services/supertest';
export { retryForSuccess } from './services/retry/retry_for_success';
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import type {
AggregationsAggregate,
SearchResponse,
} from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import pRetry from 'p-retry';
import { retryForSuccess } from '@kbn/ftr-common-functional-services';
import { ToolingLog } from '@kbn/tooling-log';

const debugLog = ToolingLog.bind(ToolingLog, { level: 'debug', writeTo: process.stdout });

export async function waitForDocumentInIndex<T>({
esClient,
Expand All @@ -21,8 +24,10 @@ export async function waitForDocumentInIndex<T>({
indexName: string;
docCountTarget?: number;
}): Promise<SearchResponse<T, Record<string, AggregationsAggregate>>> {
return pRetry(
async () => {
return await retryForSuccess(new debugLog({ context: 'waitForDocumentInIndex' }), {
timeout: 20_000,
methodName: 'waitForDocumentInIndex',
block: async () => {
const response = await esClient.search<T>({ index: indexName, rest_total_hits_as_int: true });
if (
!response.hits.total ||
Expand All @@ -33,8 +38,8 @@ export async function waitForDocumentInIndex<T>({
}
return response;
},
{ retries: 10 }
);
retryCount: 10,
});
}

export async function waitForIndexToBeEmpty<T>({
Expand All @@ -44,15 +49,17 @@ export async function waitForIndexToBeEmpty<T>({
esClient: Client;
indexName: string;
}): Promise<SearchResponse<T, Record<string, AggregationsAggregate>>> {
return pRetry(
async () => {
return await retryForSuccess(new debugLog({ context: 'waitForIndexToBeEmpty' }), {
timeout: 20_000,
methodName: 'waitForIndexToBeEmpty',
block: async () => {
const response = await esClient.search<T>({ index: indexName, rest_total_hits_as_int: true });
// @ts-expect-error upgrade typescript v5.1.6
if (response.hits.total != null && response.hits.total > 0) {
throw new Error(`Found ${response.hits.total} docs.`);
}
return response;
},
{ retries: 10 }
);
retryCount: 10,
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@
import { Client, errors } from '@elastic/elasticsearch';
import { ParsedTechnicalFields } from '@kbn/rule-registry-plugin/common';
import { ApmRuleParamsType } from '@kbn/apm-plugin/common/rules/apm_rule_types';
import pRetry from 'p-retry';
import type { Agent as SuperTestAgent } from 'supertest';
import { ApmRuleType } from '@kbn/rule-data-utils';

import { ObservabilityApmAlert } from '@kbn/alerts-as-data-utils';
import { retryForSuccess } from '@kbn/ftr-common-functional-services';
import { ToolingLog } from '@kbn/tooling-log';
import {
APM_ACTION_VARIABLE_INDEX,
APM_ALERTS_INDEX,
} from '../../../../api_integration/deployment_agnostic/apis/observability/apm/alerts/helpers/alerting_helper';

const debugLog = ToolingLog.bind(ToolingLog, { level: 'debug', writeTo: process.stdout });

export async function createApmRule<T extends ApmRuleType>({
supertest,
name,
Expand Down Expand Up @@ -59,8 +61,10 @@ export async function runRuleSoon({
ruleId: string;
supertest: SuperTestAgent;
}): Promise<Record<string, any>> {
return pRetry(
async () => {
return await retryForSuccess(new debugLog({ context: 'runRuleSoon' }), {
timeout: 20_000,
methodName: 'runRuleSoon',
block: async () => {
try {
const response = await supertest
.post(`/internal/alerting/rule/${ruleId}/_run_soon`)
Expand All @@ -74,8 +78,8 @@ export async function runRuleSoon({
throw new Error(`[Rule] Running a rule ${ruleId} failed: ${error}`);
}
},
{ retries: 10 }
);
retryCount: 10,
});
}

export async function deleteAlertsByRuleId({ es, ruleId }: { es: Client; ruleId: string }) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/
import type { Client } from '@elastic/elasticsearch';
import { ToolingLog } from '@kbn/tooling-log';
import pRetry from 'p-retry';
import { retryForSuccess } from '@kbn/ftr-common-functional-services';
import { APM_ALERTS_INDEX } from '../../../../api_integration/deployment_agnostic/apis/observability/apm/alerts/helpers/alerting_helper';

export async function getActiveApmAlerts({
Expand Down Expand Up @@ -58,8 +58,10 @@ export function waitForActiveApmAlert({
log: ToolingLog;
}): Promise<Record<string, any>> {
log.debug(`Wait for the rule ${ruleId} to be active`);
return pRetry(
async () => {
return retryForSuccess(log, {
timeout: 20_000,
methodName: 'waitForActiveApmAlert',
block: async () => {
const activeApmAlerts = await getActiveApmAlerts({ ruleId, esClient });

if (activeApmAlerts.length === 0) {
Expand All @@ -70,12 +72,7 @@ export function waitForActiveApmAlert({

return activeApmAlerts[0];
},
{
retries: RETRIES_COUNT,
factor: 1.5,
onFailedAttempt: (error) => {
log.info(`Attempt ${error.attemptNumber}/${RETRIES_COUNT}: Waiting for active alert`);
},
}
);
retryCount: RETRIES_COUNT,
retryDelay: 500,
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
*/

import { ToolingLog } from '@kbn/tooling-log';
import pRetry from 'p-retry';
import type SuperTest from 'supertest';
import { retryForSuccess } from '@kbn/ftr-common-functional-services';

const RETRIES_COUNT = 10;
const debugLog = ToolingLog.bind(ToolingLog, { level: 'debug', writeTo: process.stdout });
const retryCount = 10;

export async function waitForActiveRule({
ruleId,
Expand All @@ -20,25 +21,18 @@ export async function waitForActiveRule({
supertest: SuperTest.Agent;
logger?: ToolingLog;
}): Promise<Record<string, any>> {
return pRetry(
async () => {
return await retryForSuccess(logger || new debugLog({ context: 'waitForActiveRule' }), {
timeout: 20_000,
methodName: 'waitForActiveRule',
block: async () => {
const response = await supertest.get(`/api/alerting/rule/${ruleId}`);
const status = response.body?.execution_status?.status;
const expectedStatus = 'active';

if (status !== expectedStatus) {
throw new Error(`Expected: ${expectedStatus}: got ${status}`);
}
if (status !== expectedStatus) throw new Error(`Expected: ${expectedStatus}: got ${status}`);

return status;
},
{
retries: RETRIES_COUNT,
onFailedAttempt: (error) => {
if (logger) {
logger.info(`Attempt ${error.attemptNumber}/${RETRIES_COUNT}: Waiting for active rule`);
}
},
}
);
retryCount,
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ import type {
AggregationsAggregate,
SearchResponse,
} from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import pRetry from 'p-retry';
import { ToolingLog } from '@kbn/tooling-log';
import { retryForSuccess } from '@kbn/ftr-common-functional-services';
import {
APM_ALERTS_INDEX,
ApmAlertFields,
} from '../../../../api_integration/deployment_agnostic/apis/observability/apm/alerts/helpers/alerting_helper';

const debugLog = ToolingLog.bind(ToolingLog, { level: 'debug', writeTo: process.stdout });

async function getAlertByRuleId({ es, ruleId }: { es: Client; ruleId: string }) {
const response = (await es.search({
index: APM_ALERTS_INDEX,
Expand All @@ -40,16 +43,17 @@ export async function waitForAlertsForRule({
ruleId: string;
minimumAlertCount?: number;
}) {
return pRetry(
async () => {
return await retryForSuccess(new debugLog({ context: 'waitForAlertsForRule' }), {
timeout: 20_000,
methodName: 'waitForAlertsForRule',
block: async () => {
const alerts = await getAlertByRuleId({ es, ruleId });
const actualAlertCount = alerts.length;
if (actualAlertCount < minimumAlertCount) {
if (actualAlertCount < minimumAlertCount)
throw new Error(`Expected ${minimumAlertCount} but got ${actualAlertCount} alerts`);
}

return alerts;
},
{ retries: 5 }
);
retryCount: 5,
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
*/

import { Client } from '@elastic/elasticsearch';
import pRetry from 'p-retry';
import { ToolingLog } from '@kbn/tooling-log';
import { retryForSuccess } from '@kbn/ftr-common-functional-services';
import { APM_ACTION_VARIABLE_INDEX } from '../../../../api_integration/deployment_agnostic/apis/observability/apm/alerts/helpers/alerting_helper';

const debugLog = ToolingLog.bind(ToolingLog, { level: 'debug', writeTo: process.stdout });

async function getIndexConnectorResults(es: Client) {
const res = await es.search({ index: APM_ACTION_VARIABLE_INDEX });
return res.hits.hits.map((hit) => hit._source) as Array<Record<string, string>>;
Expand All @@ -21,11 +24,16 @@ export async function waitForIndexConnectorResults({
es: Client;
minCount?: number;
}) {
return pRetry(async () => {
const results = await getIndexConnectorResults(es);
if (results.length < minCount) {
throw new Error(`Expected ${minCount} but got ${results.length} results`);
}
return results;
return await retryForSuccess(new debugLog({ context: 'waitForIndexConnectorResults' }), {
timeout: 20_000,
methodName: 'waitForIndexConnectorResults',
block: async () => {
const results = await getIndexConnectorResults(es);
if (results.length < minCount)
throw new Error(`Expected ${minCount} but got ${results.length} results`);

return results;
},
retryCount: 10,
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { createEsClientForFtrConfig } from '@kbn/test';
import { ApmDocumentType } from '@kbn/apm-plugin/common/document_type';
import { RollupInterval } from '@kbn/apm-plugin/common/rollup';
import { SecurityRoleDescriptor } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import pRetry from 'p-retry';
import { RetryService } from '@kbn/ftr-common-functional-services';
import { FtrProviderContext } from '../../common/ftr_provider_context';
import { getBettertest } from '../../common/bettertest';
import {
Expand Down Expand Up @@ -166,45 +166,58 @@ export default function ApiTest(ftrProviderContext: FtrProviderContext) {
});

it('the events can be seen on the Service Inventory Page', async () => {
// Retry logic added to handle delays in data ingestion and indexing (the test shows some flakiness without this)
await retry.try(async () => {
const apmServices = await getApmServices(apmApiClient, scenario.start, scenario.end);
expect(apmServices[0].serviceName).to.be('opbeans-java');
expect(apmServices[0].environments?.[0]).to.be('ingested-via-fleet');
expect(apmServices[0].latency).to.be(2550000);
expect(apmServices[0].throughput).to.be(2);
expect(apmServices[0].transactionErrorRate).to.be(0.5);
});
const apmServices = await getApmServices(
apmApiClient,
scenario.start,
scenario.end,
retry
);
expect(apmServices[0].serviceName).to.be('opbeans-java');
expect(apmServices[0].environments?.[0]).to.be('ingested-via-fleet');
expect(apmServices[0].latency).to.be(2550000);
expect(apmServices[0].throughput).to.be(2);
expect(apmServices[0].transactionErrorRate).to.be(0.5);
});
});
});
});
}

function getApmServices(apmApiClient: ApmApiClient, start: string, end: string) {
return pRetry(async () => {
const res = await apmApiClient.readUser({
endpoint: 'GET /internal/apm/services',
params: {
query: {
start,
end,
probability: 1,
environment: 'ENVIRONMENT_ALL',
kuery: '',
documentType: ApmDocumentType.TransactionMetric,
rollupInterval: RollupInterval.OneMinute,
useDurationSummary: true,
async function getApmServices(
apmApiClient: ApmApiClient,
start: string,
end: string,
retrySvc: RetryService
) {
return await retrySvc.tryWithRetries(
'getApmServices',
async () => {
const res = await apmApiClient.readUser({
endpoint: 'GET /internal/apm/services',
params: {
query: {
start,
end,
probability: 1,
environment: 'ENVIRONMENT_ALL',
kuery: '',
documentType: ApmDocumentType.TransactionMetric,
rollupInterval: RollupInterval.OneMinute,
useDurationSummary: true,
},
},
},
});
});

if (res.body.items.length === 0 || !res.body.items[0].latency) {
throw new Error(`Timed-out: No APM Services were found`);
}
if (res.body.items.length === 0 || !res.body.items[0].latency)
throw new Error(`Timed-out: No APM Services were found`);

return res.body.items;
});
return res.body.items;
},
{
retryCount: 10,
timeout: 20_000,
}
);
}

function getSynthtraceScenario() {
Expand Down
Loading

0 comments on commit 9a439b7

Please sign in to comment.