Skip to content

Commit

Permalink
[Security Solution][Siem migrations] Implement rate limit backoff (el…
Browse files Browse the repository at this point in the history
…astic#211469)

## Summary

Implements an exponential backoff retry strategy when the LLM API throws
rate limit (`429`) errors.

### Backoff implementation

- The `run` method from the `RuleMigrationsTaskClient` has been moved to
the new `RuleMigrationTaskRunner` class.
- The settings for the backoff are defined in this class with:
```ts
/** Exponential backoff configuration to handle rate limit errors */
const RETRY_CONFIG = {
  initialRetryDelaySeconds: 1,
  backoffMultiplier: 2,
  maxRetries: 8,
  // max waiting time 4m15s (1*2^8 = 256s)
} as const;
```
- Only one rule will be retried at a time, the rest of the concurrent
rule translations blocked by the rate limit will await for the API to
recover before attempting the translation again.

```ts
/** Executor sleep configuration
 * A sleep time applied at the beginning of each single rule translation in the execution pool,
 * The objective of this sleep is to spread the load of concurrent translations, and prevent hitting the rate limit repeatedly.
 * The sleep time applied is a random number between [0-value]. Every time we hit rate limit the value is increased by the multiplier, up to the limit.
 */
const EXECUTOR_SLEEP = {
  initialValueSeconds: 3,
  multiplier: 2,
  limitSeconds: 96, // 1m36s (5 increases)
} as const;
```

### Migration batching changes

```ts
/** Number of concurrent rule translations in the pool */
const TASK_CONCURRENCY = 10 as const;
/** Number of rules loaded in memory to be translated in the pool */
const TASK_BATCH_SIZE = 100 as const;
```

#### Before 

- Batches of 15 rules were retrieved and executed in a `Promise.all`,
requiring all of them to be completed before proceeding to the next
batch.
- A "batch sleep" of 10s was executed at the end of each iteration.

#### In this PR

- Batches of 100 rules are retrieved and kept in memory. The execution
is performed in a task pool with a concurrency of 10 rules. This ensures
there are always 10 rules executing at a time.
- The "batch sleep" has been removed in favour of an "execution sleep"
of rand[1-3]s at the start of each single rule migration. This
individual sleep serves two goals:
  - Spread the load when the migration is first launched.
- Prevent hitting the rate limit consistently: The sleep duration is
increased every time we hit a rate limit.

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
  • Loading branch information
semd and kibanamachine authored Feb 21, 2025
1 parent de7d33d commit 64426b2
Show file tree
Hide file tree
Showing 13 changed files with 921 additions and 329 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@
* 2.0.
*/

import { mockRuleMigrationsDataClient } from '../data/__mocks__/mocks';
import { mockRuleMigrationsTaskClient } from '../task/__mocks__/mocks';
import { createRuleMigrationsDataClientMock } from '../data/__mocks__/mocks';
import { createRuleMigrationsTaskClientMock } from '../task/__mocks__/mocks';

export const createRuleMigrationDataClient = jest
.fn()
.mockImplementation(() => mockRuleMigrationsDataClient);
.mockImplementation(() => createRuleMigrationsDataClientMock());

export const createRuleMigrationTaskClient = jest
.fn()
.mockImplementation(() => mockRuleMigrationsTaskClient);
.mockImplementation(() => createRuleMigrationsTaskClientMock());

export const createRuleMigrationClient = () => ({
data: createRuleMigrationDataClient(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,28 @@
* 2.0.
*/

import type { RuleMigrationsDataIntegrationsClient } from '../rule_migrations_data_integrations_client';
import type { RuleMigrationsDataLookupsClient } from '../rule_migrations_data_lookups_client';
import type { RuleMigrationsDataPrebuiltRulesClient } from '../rule_migrations_data_prebuilt_rules_client';
import type { RuleMigrationsDataResourcesClient } from '../rule_migrations_data_resources_client';
import type { RuleMigrationsDataRulesClient } from '../rule_migrations_data_rules_client';

// Rule migrations data rules client
export const mockRuleMigrationsDataRulesClient = {
create: jest.fn().mockResolvedValue(undefined),
get: jest.fn().mockResolvedValue([]),
get: jest.fn().mockResolvedValue({ data: [], total: 0 }),
searchBatches: jest.fn().mockReturnValue({
next: jest.fn().mockResolvedValue([]),
all: jest.fn().mockResolvedValue([]),
}),
takePending: jest.fn().mockResolvedValue([]),
saveProcessing: jest.fn().mockResolvedValue(undefined),
saveCompleted: jest.fn().mockResolvedValue(undefined),
saveError: jest.fn().mockResolvedValue(undefined),
releaseProcessing: jest.fn().mockResolvedValue(undefined),
updateStatus: jest.fn().mockResolvedValue(undefined),
getStats: jest.fn().mockResolvedValue(undefined),
getAllStats: jest.fn().mockResolvedValue([]),
} as unknown as RuleMigrationsDataRulesClient;
} as unknown as jest.Mocked<RuleMigrationsDataRulesClient>;
export const MockRuleMigrationsDataRulesClient = jest
.fn()
.mockImplementation(() => mockRuleMigrationsDataRulesClient);
Expand All @@ -35,30 +39,42 @@ export const mockRuleMigrationsDataResourcesClient = {
next: jest.fn().mockResolvedValue([]),
all: jest.fn().mockResolvedValue([]),
}),
};
} as unknown as jest.Mocked<RuleMigrationsDataResourcesClient>;
export const MockRuleMigrationsDataResourcesClient = jest
.fn()
.mockImplementation(() => mockRuleMigrationsDataResourcesClient);

export const mockRuleMigrationsDataIntegrationsClient = {
populate: jest.fn().mockResolvedValue(undefined),
retrieveIntegrations: jest.fn().mockResolvedValue([]),
};
} as unknown as jest.Mocked<RuleMigrationsDataIntegrationsClient>;

export const mockRuleMigrationsDataPrebuiltRulesClient = {
populate: jest.fn().mockResolvedValue(undefined),
search: jest.fn().mockResolvedValue([]),
} as unknown as jest.Mocked<RuleMigrationsDataPrebuiltRulesClient>;
export const mockRuleMigrationsDataLookupsClient = {
create: jest.fn().mockResolvedValue(undefined),
indexData: jest.fn().mockResolvedValue(undefined),
} as unknown as jest.Mocked<RuleMigrationsDataLookupsClient>;

// Rule migrations data client
export const mockRuleMigrationsDataClient = {
export const createRuleMigrationsDataClientMock = () => ({
rules: mockRuleMigrationsDataRulesClient,
resources: mockRuleMigrationsDataResourcesClient,
integrations: mockRuleMigrationsDataIntegrationsClient,
};
prebuiltRules: mockRuleMigrationsDataPrebuiltRulesClient,
lookups: mockRuleMigrationsDataLookupsClient,
});

export const MockRuleMigrationsDataClient = jest
.fn()
.mockImplementation(() => mockRuleMigrationsDataClient);
.mockImplementation(() => createRuleMigrationsDataClientMock());

// Rule migrations data service
export const mockIndexName = 'mocked_siem_rule_migrations_index_name';
export const mockInstall = jest.fn().mockResolvedValue(undefined);
export const mockCreateClient = jest.fn().mockReturnValue(mockRuleMigrationsDataClient);
export const mockCreateClient = jest.fn(() => createRuleMigrationsDataClientMock());

export const MockRuleMigrationsDataService = jest.fn().mockImplementation(() => ({
createAdapter: jest.fn(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,45 +151,19 @@ export class RuleMigrationsDataRulesClient extends RuleMigrationsDataBaseClient
}
}

/**
* Retrieves `pending` rule migrations with the provided id and updates their status to `processing`.
* This operation is not atomic at migration level:
* - Multiple tasks can process different migrations simultaneously.
* - Multiple tasks should not process the same migration simultaneously.
*/
async takePending(migrationId: string, size: number): Promise<StoredRuleMigration[]> {
/** Updates one rule migration status to `processing` */
async saveProcessing(id: string): Promise<void> {
const index = await this.getIndexName();
const profileId = await this.getProfileUid();
const query = this.getFilterQuery(migrationId, { status: SiemMigrationStatus.PENDING });

const storedRuleMigrations = await this.esClient
.search<RuleMigration>({ index, query, sort: '_doc', size })
.then((response) =>
this.processResponseHits(response, { status: SiemMigrationStatus.PROCESSING })
)
.catch((error) => {
this.logger.error(`Error searching rule migrations: ${error.message}`);
throw error;
});

await this.esClient
.bulk({
refresh: 'wait_for',
operations: storedRuleMigrations.flatMap(({ id, status }) => [
{ update: { _id: id, _index: index } },
{
doc: { status, updated_by: profileId, updated_at: new Date().toISOString() },
},
]),
})
.catch((error) => {
this.logger.error(
`Error updating for rule migrations status to processing: ${error.message}`
);
throw error;
});

return storedRuleMigrations;
const doc = {
status: SiemMigrationStatus.PROCESSING,
updated_by: profileId,
updated_at: new Date().toISOString(),
};
await this.esClient.update({ index, id, doc, refresh: 'wait_for' }).catch((error) => {
this.logger.error(`Error updating rule migration status to processing: ${error.message}`);
throw error;
});
}

/** Updates one rule migration with the provided data and sets the status to `completed` */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* 2.0.
*/

export const mockRuleMigrationsTaskClient = {
export const createRuleMigrationsTaskClientMock = () => ({
start: jest.fn().mockResolvedValue({ started: true }),
stop: jest.fn().mockResolvedValue({ stopped: true }),
getStats: jest.fn().mockResolvedValue({
Expand All @@ -19,15 +19,15 @@ export const mockRuleMigrationsTaskClient = {
},
}),
getAllStats: jest.fn().mockResolvedValue([]),
};
});

export const MockRuleMigrationsTaskClient = jest
.fn()
.mockImplementation(() => mockRuleMigrationsTaskClient);
.mockImplementation(() => createRuleMigrationsTaskClientMock());

// Rule migrations task service
export const mockStopAll = jest.fn();
export const mockCreateClient = jest.fn().mockReturnValue(mockRuleMigrationsTaskClient);
export const mockCreateClient = jest.fn(() => createRuleMigrationsTaskClientMock());

export const MockRuleMigrationsTaskService = jest.fn().mockImplementation(() => ({
createClient: mockCreateClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ export interface RuleMigrationsRetrieverClients {
savedObjects: SavedObjectsClientContract;
}

/**
* RuleMigrationsRetriever is a class that is responsible for retrieving all the necessary data during the rule migration process.
* It is composed of multiple retrievers that are responsible for retrieving specific types of data.
* Such as rule integrations, prebuilt rules, and rule resources.
*/
export class RuleMigrationsRetriever {
public readonly resources: RuleResourceRetriever;
public readonly integrations: IntegrationRetriever;
Expand Down
Loading

0 comments on commit 64426b2

Please sign in to comment.