From 3c48d4c2c055615c6bdd940ff28e04bae4e80c72 Mon Sep 17 00:00:00 2001 From: affank Date: Tue, 4 Feb 2025 13:34:35 -0500 Subject: [PATCH] Fix issue where query get interrupted during prepare. (#7592) Co-authored-by: Affan Khan (cherry picked from commit f5485d71394f8ce6de37fe0ad953584ddd8223e7) # Conflicts: # common/config/rush/pnpm-lock.yaml --- common/api/core-common.api.md | 39 +++- common/api/summary/core-common.exports.csv | 1 + .../affank-ccq-fix_2025-01-22-22-18.json | 10 + .../affank-ccq-fix_2025-01-22-22-18.json | 10 + .../src/test/ecdb/ConcurrentQueryLoad.test.ts | 186 ++++++++++++++++++ core/backend/src/test/ecdb/ECSqlQuery.test.ts | 53 +++++ core/common/src/ConcurrentQuery.ts | 37 ++++ core/common/src/ECSqlReader.ts | 7 +- 8 files changed, 332 insertions(+), 11 deletions(-) create mode 100644 common/changes/@itwin/core-backend/affank-ccq-fix_2025-01-22-22-18.json create mode 100644 common/changes/@itwin/core-common/affank-ccq-fix_2025-01-22-22-18.json create mode 100644 core/backend/src/test/ecdb/ConcurrentQueryLoad.test.ts diff --git a/common/api/core-common.api.md b/common/api/core-common.api.md index 9806cd27bfb4..6e3e7bc7f7b9 100644 --- a/common/api/core-common.api.md +++ b/common/api/core-common.api.md @@ -616,6 +616,8 @@ export interface BaseReaderOptions { priority?: number; quota?: QueryQuota; restartToken?: string; + // @internal (undocumented) + testingArgs?: TestingArgs; usePrimaryConn?: boolean; } @@ -1998,15 +2000,19 @@ export interface DbBlobResponse extends DbResponse { // @internal (undocumented) export interface DbQueryConfig { + allowTestingArgs?: boolean; + autoShutdowWhenIdlelForSeconds?: number; // (undocumented) - globalQuota?: QueryQuota; + doNotUsePrimaryConnToPrepare?: boolean; // (undocumented) + globalQuota?: QueryQuota; ignoreDelay?: boolean; - // (undocumented) ignorePriority?: boolean; + memoryMapFileSize?: number; // (undocumented) + monitorPollInterval?: number; requestQueueSize?: number; - // (undocumented) + statementCacheSizePerWorker?: number; workerThreads?: number; } @@ -2045,6 +2051,8 @@ export interface DbQueryResponse extends DbResponse { export interface DbRequest extends BaseReaderOptions { // (undocumented) kind?: DbRequestKind; + // (undocumented) + testingArgs?: TestingArgs; } // @internal (undocumented) @@ -2096,17 +2104,19 @@ export enum DbResponseStatus { // (undocumented) Error_BlobIO_OutOfRange = 106,/* could not submit the query as queue was full.*/ // (undocumented) - Error_ECSql_BindingFailed = 104,/* generic error*/ + Error_ECSql_BindingFailed = 104,/* Shutdown is in progress. */ + // (undocumented) + Error_ECSql_PreparedFailed = 101,/* generic error*/ // (undocumented) - Error_ECSql_PreparedFailed = 101,/* ecsql prepared failed*/ + Error_ECSql_RowToJsonFailed = 103,/* ecsql prepared failed*/ // (undocumented) - Error_ECSql_RowToJsonFailed = 103,/* ecsql step failed*/ + Error_ECSql_StepFailed = 102,/* ecsql step failed*/ // (undocumented) - Error_ECSql_StepFailed = 102,/* ecsql failed to serialized row to json.*/ + Partial = 3,/* ecsql failed to serialized row to json.*/ // (undocumented) - Partial = 3,/* ecsql binding failed.*/ + QueueFull = 5,/* ecsql binding failed.*/ // (undocumented) - QueueFull = 5,/* class or property or instance specified was not found or property as not of type blob.*/ + ShuttingDown = 6,/* class or property or instance specified was not found or property as not of type blob.*/ // (undocumented) Timeout = 4 } @@ -2122,6 +2132,8 @@ export interface DbRuntimeStats { // (undocumented) memUsed: number; // (undocumented) + prepareTime: number; + // (undocumented) timeLimit: number; // (undocumented) totalTime: number; @@ -7477,6 +7489,8 @@ export class QueryOptionsBuilder { setRestartToken(val: string): this; setRowFormat(val: QueryRowFormat): this; setSuppressLogErrors(val: boolean): this; + // @internal + setTestingArgs(val: TestingArgs): this; setUsePrimaryConnection(val: boolean): this; } @@ -7548,6 +7562,7 @@ export interface QueryStats { backendMemUsed: number; backendRowsReturned: number; backendTotalTime: number; + prepareTime: number; retryCount: number; totalTime: number; } @@ -9711,6 +9726,12 @@ export class TerrainSettings { toJSON(): TerrainProps; } +// @internal (undocumented) +export interface TestingArgs { + // (undocumented) + interrupt?: boolean; +} + // @internal export class TestRpcManager { // (undocumented) diff --git a/common/api/summary/core-common.exports.csv b/common/api/summary/core-common.exports.csv index 68e894aca5b0..dfea80ef4447 100644 --- a/common/api/summary/core-common.exports.csv +++ b/common/api/summary/core-common.exports.csv @@ -831,6 +831,7 @@ public;interface;TerrainProps public;type;TerrainProviderName deprecated;type;TerrainProviderName public;class;TerrainSettings +internal;interface;TestingArgs internal;class;TestRpcManager beta;class;TextAnnotation public;interface;TextAnnotation2dProps diff --git a/common/changes/@itwin/core-backend/affank-ccq-fix_2025-01-22-22-18.json b/common/changes/@itwin/core-backend/affank-ccq-fix_2025-01-22-22-18.json new file mode 100644 index 000000000000..96c411225519 --- /dev/null +++ b/common/changes/@itwin/core-backend/affank-ccq-fix_2025-01-22-22-18.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@itwin/core-backend", + "comment": "Fix issue with concurrent query where it interrupt statement during prepare", + "type": "none" + } + ], + "packageName": "@itwin/core-backend" +} \ No newline at end of file diff --git a/common/changes/@itwin/core-common/affank-ccq-fix_2025-01-22-22-18.json b/common/changes/@itwin/core-common/affank-ccq-fix_2025-01-22-22-18.json new file mode 100644 index 000000000000..9239d77e78d8 --- /dev/null +++ b/common/changes/@itwin/core-common/affank-ccq-fix_2025-01-22-22-18.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@itwin/core-common", + "comment": "Update ECSql reader api to support no stat and error code.", + "type": "none" + } + ], + "packageName": "@itwin/core-common" +} \ No newline at end of file diff --git a/core/backend/src/test/ecdb/ConcurrentQueryLoad.test.ts b/core/backend/src/test/ecdb/ConcurrentQueryLoad.test.ts new file mode 100644 index 000000000000..b57d93272bf3 --- /dev/null +++ b/core/backend/src/test/ecdb/ConcurrentQueryLoad.test.ts @@ -0,0 +1,186 @@ +import { Logger, LogLevel, StopWatch } from "@itwin/core-bentley"; +import { DbQueryConfig, ECSqlReader, QueryStats } from "@itwin/core-common"; +import { expect } from "chai"; +import { ConcurrentQuery } from "../../ConcurrentQuery"; +import { ECDb } from "../../ECDb"; +import { IModelDb, SnapshotDb } from "../../IModelDb"; +import { _nativeDb } from "../../core-backend"; +import { IModelTestUtils } from "../IModelTestUtils"; + +interface ITaskResult { + stats: QueryStats; + error?: any; +} + +interface ISenario { + name: string; + config?: DbQueryConfig; + totalBatches: number; + taskPerBatch: number; + createReader: (db: ECDb | IModelDb) => ECSqlReader; +} + + +class LoadSimulator { + constructor(public db: ECDb | IModelDb, public senario: ISenario) { } + private async runQueryTask(reader: ECSqlReader): Promise { + try { + while (await reader.step()) { } + return { stats: reader.stats }; + } catch (err) { + return { stats: reader.stats, error: err }; + } + } + + public async run() { + ConcurrentQuery.shutdown(this.db[_nativeDb]); + if (this.senario.config) { + const config = ConcurrentQuery.resetConfig(this.db[_nativeDb], this.senario.config); + // eslint-disable-next-line no-console + console.log(config); + } + const overalTime = new StopWatch(); + overalTime.start(); + const results: ITaskResult[] = []; + for (let i = 0; i < this.senario.totalBatches; ++i) { + const promises: Promise[] = []; + const readerTasks = Array(this.senario.taskPerBatch).fill(undefined).map(() => this.senario.createReader(this.db)); + readerTasks.forEach((reader) => { + promises.push(this.runQueryTask(reader)); + }); + results.push(... await Promise.all(promises)); + + } + overalTime.stop(); + const errors = results.filter((x) => x.error !== undefined); + const errorsMap = new Map(); + errors.forEach((x) => { + if (x.error instanceof Error) { + if (!errorsMap.has(x.error.message)) { + errorsMap.set(x.error.message, 1); + } else { + errorsMap.set(x.error.message, errorsMap.get(x.error.message)! + 1); + } + } else { + if (!errorsMap.has("error")) { + errorsMap.set("error", 1); + } else { + errorsMap.set("error", errorsMap.get("error")! + 1); + } + } + }); + const errorCount = errors.length; + let backendCpuTime: bigint = BigInt(0); + let backendTotalTime: bigint = BigInt(0); + let backendMemUsed: bigint = BigInt(0); + let backendRowsReturned: bigint = BigInt(0); + let totalTime: bigint = BigInt(0); + let retryCount: bigint = BigInt(0); + let prepareTime: bigint = BigInt(0); + + // Calculate average + results.forEach((r: ITaskResult) => { + backendCpuTime += BigInt(r.stats.backendCpuTime); + backendTotalTime += BigInt(r.stats.backendTotalTime); + backendMemUsed += BigInt(r.stats.backendMemUsed); + backendRowsReturned += BigInt(r.stats.backendRowsReturned); + totalTime += BigInt(r.stats.totalTime); + retryCount += BigInt(r.stats.retryCount); + prepareTime += BigInt(r.stats.prepareTime); + }); + + backendCpuTime /= BigInt(results.length); + backendTotalTime /= BigInt(results.length); + backendMemUsed /= BigInt(results.length); + backendRowsReturned /= BigInt(results.length); + totalTime /= BigInt(results.length); + retryCount /= BigInt(results.length); + // prepareTime /= BigInt(results.length); + + return { + result: { + backendCpuTime, + backendTotalTime, + backendMemUsed, + backendRowsReturned, + totalTime, + retryCount, + prepareTime, + }, + overalTimeInSec: overalTime.currentSeconds, + errorCount, + totalQueries: results.length, + errorMap: errorsMap + }; + + } +} + +describe.skip("ConcurrentQueryLoad", () => { + it("should run", async () => { + Logger.initializeToConsole(); + Logger.setLevel("ECDb.ConcurrentQuery", LogLevel.Trace); + // { + // workerThreads: 4, + // requestQueueSize: 2000, + // ignorePriority: false, + // ignoreDelay: true, + // doNotUsePrimaryConnToPrepare: false, + // autoShutdowWhenIdlelForSeconds: 300, + // statementCacheSizePerWorker: 40, + // monitorPollInterval: 1000, + // memoryMapFileSize: 0, + // allowTestingArgs: false, + // globalQuota: { time: 60, memory: 8388608 } + // } + + const senario: ISenario = { + name: "ConcurrentQueryLoad", + config: { + + }, + totalBatches: 1, + taskPerBatch: 1, + createReader: (dbs: ECDb | IModelDb) => { + const quries = [ + { + sql: ` + WITH sequence(n) AS ( + SELECT 1 + UNION ALL + SELECT n + 1 FROM sequence WHERE n < 10000 + ) + SELECT COUNT(*) + FROM bis.SpatialIndex i, sequence s + WHERE i.ECInstanceId MATCH iModel_spatial_overlap_aabb( + iModel_bbox(random(), random(), random(), random(),random(), random()))` + }, + { + sql: ` + WITH sequence(n) AS ( + SELECT 1 + UNION ALL + SELECT n + 1 FROM sequence WHERE n < 1000000 + ) + SELECT COUNT(*) FROM sequence` + }, + { + sql: "SELECT $ FROM bis.Element LIMIT 10000" + } + ]; + const idx = Math.floor(Math.random() * quries.length); + return dbs.createQueryReader(quries[idx].sql); + } + }; + + const verySmallFile = IModelTestUtils.resolveAssetFile("test.bim"); + const db = SnapshotDb.openFile(verySmallFile); + const simulator = new LoadSimulator(db, senario); + const result = await simulator.run(); + // eslint-disable-next-line no-console + console.log(result); + db.close(); + expect(result.errorCount).to.be.equal(0); + }); + +}); \ No newline at end of file diff --git a/core/backend/src/test/ecdb/ECSqlQuery.test.ts b/core/backend/src/test/ecdb/ECSqlQuery.test.ts index 41bcbe9ec0b2..e2eeb30360cd 100644 --- a/core/backend/src/test/ecdb/ECSqlQuery.test.ts +++ b/core/backend/src/test/ecdb/ECSqlQuery.test.ts @@ -393,7 +393,60 @@ describe("ECSql Query", () => { assert.isTrue(hasRow, "imodel1.query() must return latest one row"); } }); + // new new addon build + it("ecsql interrupt check", async () => { + let cancelled = 0; + let successful = 0; + let rowCount = 0; + try { + ConcurrentQuery.shutdown(imodel1[_nativeDb]); + ConcurrentQuery.resetConfig(imodel1[_nativeDb], { allowTestingArgs: true }); + const scheduleQuery = async () => { + return new Promise(async (resolve, reject) => { + try { + const options = new QueryOptionsBuilder(); + options.setTestingArgs({ interrupt: true }); + options.setDelay(1000); + const reader = imodel1.createQueryReader(` + WITH sequence(n) AS ( + SELECT 1 + UNION ALL + SELECT n + 1 FROM sequence WHERE n < 10000 + ) + SELECT COUNT(*) + FROM bis.SpatialIndex i, sequence s`, undefined, options.getOptions()); + while (await reader.step()) { + rowCount++; + } + successful++; + resolve(); + } catch (err: any) { + // we expect query to be cancelled + if (err.errorNumber === DbResult.BE_SQLITE_INTERRUPT) { + cancelled++; + resolve(); + } else { + reject(new Error("rejected")); + } + } + }); + }; + const queries = []; + for (let i = 0; i < 100; i++) { + queries.push(scheduleQuery()); + } + + await Promise.all(queries); + // We expect at least one query to be cancelled + assert.equal(successful, 100, "success should be 100"); + assert.equal(rowCount, 100, "expect 100 rows"); + assert.isAtLeast(cancelled, 0, "should not have any cancelled query"); + } finally { + ConcurrentQuery.shutdown(imodel1[_nativeDb]); + ConcurrentQuery.resetConfig(imodel1[_nativeDb]); + } + }); // new new addon build it("ecsql with blob", async () => { let rows = await executeQuery(imodel1, "SELECT ECInstanceId,GeometryStream FROM bis.GeometricElement3d WHERE GeometryStream IS NOT NULL LIMIT 1"); diff --git a/core/common/src/ConcurrentQuery.ts b/core/common/src/ConcurrentQuery.ts index 1711fd52735e..7db8341cecea 100644 --- a/core/common/src/ConcurrentQuery.ts +++ b/core/common/src/ConcurrentQuery.ts @@ -74,6 +74,7 @@ export interface DbRuntimeStats { timeLimit: number; memLimit: number; memUsed: number; + prepareTime: number; } /** @@ -110,6 +111,10 @@ export interface BaseReaderOptions { * concurrent query is configure to honour it. */ delay?: number; + /** + * @internal + */ + testingArgs?: TestingArgs; } /** @@ -247,6 +252,16 @@ export class QueryOptionsBuilder { this._options.delay = val; return this; } + /** + * @internal + * Use for testing internal logic. This parameter is ignored by default unless concurrent query is configure to not ignore it. + * @param val Testing arguments. + * @returns @type QueryOptionsBuilder for fluent interface. + */ + public setTestingArgs(val: TestingArgs) { + this._options.testingArgs = val; + return this; + } } /** @beta */ export class BlobOptionsBuilder { @@ -659,6 +674,7 @@ export enum DbResponseStatus { Partial = 3, /* query was running but ran out of quota.*/ Timeout = 4, /* query time quota expired while it was in queue.*/ QueueFull = 5, /* could not submit the query as queue was full.*/ + ShuttingDown = 6, /* Shutdown is in progress. */ Error = 100, /* generic error*/ Error_ECSql_PreparedFailed = Error + 1, /* ecsql prepared failed*/ Error_ECSql_StepFailed = Error + 2, /* ecsql step failed*/ @@ -668,6 +684,11 @@ export enum DbResponseStatus { Error_BlobIO_OutOfRange = Error + 6, /* range specified is invalid based on size of blob.*/ } +/** @internal */ +export interface TestingArgs { + interrupt?: boolean +} + /** @internal */ export enum DbValueFormat { ECSqlNames = 0, @@ -677,6 +698,7 @@ export enum DbValueFormat { /** @internal */ export interface DbRequest extends BaseReaderOptions { kind?: DbRequestKind; + testingArgs?: TestingArgs } /** @internal */ @@ -737,8 +759,23 @@ export interface DbRequestExecutor { private _props = new PropertyMetaDataMap([]); private _param = new QueryBinder().serialize(); private _lockArgs: boolean = false; - private _stats = { backendCpuTime: 0, backendTotalTime: 0, backendMemUsed: 0, backendRowsReturned: 0, totalTime: 0, retryCount: 0 }; + private _stats = { backendCpuTime: 0, backendTotalTime: 0, backendMemUsed: 0, backendRowsReturned: 0, totalTime: 0, retryCount: 0, prepareTime: 0 }; private _options: QueryOptions = new QueryOptionsBuilder().getOptions(); private _rowProxy = new Proxy(this, { @@ -366,11 +368,12 @@ export class ECSqlReader implements AsyncIterableIterator { * @internal */ protected async runWithRetry(request: DbQueryRequest) { - const needRetry = (rs: DbQueryResponse) => (rs.status === DbResponseStatus.Partial || rs.status === DbResponseStatus.QueueFull || rs.status === DbResponseStatus.Timeout) && (rs.data === undefined || rs.data.length === 0); + const needRetry = (rs: DbQueryResponse) => (rs.status === DbResponseStatus.Partial || rs.status === DbResponseStatus.QueueFull || rs.status === DbResponseStatus.Timeout || rs.status === DbResponseStatus.ShuttingDown) && (rs.data === undefined || rs.data.length === 0); const updateStats = (rs: DbQueryResponse) => { this._stats.backendCpuTime += rs.stats.cpuTime; this._stats.backendTotalTime += rs.stats.totalTime; this._stats.backendMemUsed += rs.stats.memUsed; + this._stats.prepareTime += rs.stats.prepareTime; this._stats.backendRowsReturned += (rs.data === undefined) ? 0 : rs.data.length; }; const execQuery = async (req: DbQueryRequest) => {