Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue where query get interrupted during prepare. #7592

Merged
merged 18 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 184 additions & 0 deletions core/backend/src/test/ecdb/ConcurrentQueryLoad.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
import { StopWatch } from "@itwin/core-bentley";
import { ECDb } from "../../ECDb";
import { DbQueryConfig, ECSqlReader, IModelError, QueryQuota, QueryStats } from "@itwin/core-common";
import { IModelTestUtils } from "../IModelTestUtils";
import { IModelDb, SnapshotDb } from "../../IModelDb";
import { _nativeDb } from "../../core-backend";
import { expect } from "chai";
import { ConcurrentQuery } from "../../ConcurrentQuery";

interface ITaskResult {
stats: QueryStats;
error?: any;
}

interface ISenario {
name: string;
config?: DbQueryConfig;
totalBatches: number;
taskPerBatch: number;
createReader: (db: ECDb | IModelDb) => ECSqlReader;
}


class LoadSimulator {
constructor(public db: ECDb | IModelDb, public senario: ISenario) { }
private async runQueryTask(reader: ECSqlReader): Promise<ITaskResult> {
try {
while (await reader.step()) { }
return { stats: reader.stats };
} catch (err) {
return { stats: reader.stats, error: err };
}
}

public async run() {
ConcurrentQuery.shutdown(this.db[_nativeDb]);
if (this.senario.config) {
const config = ConcurrentQuery.resetConfig(this.db[_nativeDb], this.senario.config);
console.log(config);
}
const overalTime = new StopWatch();
overalTime.start();
const results: ITaskResult[] = [];
for (let i = 0; i < this.senario.totalBatches; ++i) {
const promises: Promise<ITaskResult>[] = [];
const readerTasks = Array(this.senario.taskPerBatch).fill(undefined).map(() => this.senario.createReader(this.db));
readerTasks.forEach((reader) => {
promises.push(this.runQueryTask(reader));
});
results.push(... await Promise.all(promises));

}
overalTime.stop();
const errors = results.filter((x) => x.error !== undefined);
const errorsMap = new Map<string, number>();
errors.forEach((x) => {
if (x.error instanceof Error) {
if (!errorsMap.has(x.error.message)) {
errorsMap.set(x.error.message, 1);
} else {
errorsMap.set(x.error.message, errorsMap.get(x.error.message)! + 1);
}
} else {
if (!errorsMap.has("error")) {
errorsMap.set("error", 1);
} else {
errorsMap.set("error", errorsMap.get("error")! + 1);
}
}
});
const errorCount = errors.length;
let backendCpuTime: bigint = BigInt(0);
let backendTotalTime: bigint = BigInt(0);
let backendMemUsed: bigint = BigInt(0);
let backendRowsReturned: bigint = BigInt(0);
let totalTime: bigint = BigInt(0);
let retryCount: bigint = BigInt(0);
let prepareTime: bigint = BigInt(0);

// Calculate average
results.forEach((r: ITaskResult) => {
backendCpuTime += BigInt(r.stats.backendCpuTime);
backendTotalTime += BigInt(r.stats.backendTotalTime);
backendMemUsed += BigInt(r.stats.backendMemUsed);
backendRowsReturned += BigInt(r.stats.backendRowsReturned);
totalTime += BigInt(r.stats.totalTime);
retryCount += BigInt(r.stats.retryCount);
prepareTime += BigInt(r.stats.prepareTime);
});

backendCpuTime /= BigInt(results.length);
backendTotalTime /= BigInt(results.length);
backendMemUsed /= BigInt(results.length);
backendRowsReturned /= BigInt(results.length);
totalTime /= BigInt(results.length);
retryCount /= BigInt(results.length);
// prepareTime /= BigInt(results.length);

return {
result: {
backendCpuTime,
backendTotalTime,
backendMemUsed,
backendRowsReturned,
totalTime,
retryCount,
prepareTime,
},
overalTimeInSec: overalTime.currentSeconds,
errorCount,
totalQueries: results.length,
errorMap: errorsMap
};

}
}

describe("ConcurrentQueryLoad", () => {
it.only("should run", async () => {


// const defaultConfig: {
// workerThreads: 4,
// requestQueueSize: 2000,
// ignorePriority: false,
// ignoreDelay: true,
// doNotUsePrimaryConnToPrepare: false,
// autoShutdowWhenIdealForSeconds: 120,
// statementCacheSizePerWorker: 40,
// monitorPollInterval: 1000,
// memoryMapFileSize: 0,
// globalQuota: { time: 60, memory: 8388608 }
// };

const senario: ISenario = {
name: "ConcurrentQueryLoad",
config: {

},
totalBatches: 10,
taskPerBatch: 10,
createReader: (db: ECDb | IModelDb) => {
const quries = [
// {
// sql: `
// WITH sequence(n) AS (
// SELECT 1
// UNION ALL
// SELECT n + 1 FROM sequence WHERE n < 100
// )
// SELECT COUNT(*)
// FROM bis.SpatialIndex i, sequence s
// WHERE i.ECInstanceId MATCH iModel_spatial_overlap_aabb(
// iModel_bbox(random(), random(), random(), random(),random(), random()))`
// },
// {
// sql: `
// WITH sequence(n) AS (
// SELECT 1
// UNION ALL
// SELECT n + 1 FROM sequence WHERE n < 10000
// )
// SELECT COUNT(*) FROM sequence`
// },
{
sql: "SELECT $ FROM bis.Element LIMIT 10000"
}
];
const idx = Math.floor(Math.random() * quries.length);
return db.createQueryReader(quries[idx].sql);
}
};
const largeFile = `D:/temp/PaulM.bim`
const verySmallFile = IModelTestUtils.resolveAssetFile("test.bim");

const db = SnapshotDb.openFile(largeFile);
const simulator = new LoadSimulator(db, senario);
const result = await simulator.run();
console.log(result);
db.close();
expect(result.errorCount).to.be.equal(0);
});

});
14 changes: 14 additions & 0 deletions core/common/src/ConcurrentQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ export interface DbRuntimeStats {
timeLimit: number;
memLimit: number;
memUsed: number;
prepareTime: number;
}

/**
Expand Down Expand Up @@ -741,8 +742,21 @@ export interface DbRequestExecutor<TRequest extends DbRequest, TResponse extends
/** @internal */
export interface DbQueryConfig {
globalQuota?: QueryQuota;
/** For testing */
ignoreDelay?: boolean;
/** Priority of request is ignored */
ignorePriority?: boolean;
/** Max queue size after which queries are rejected with error QueueFull */
requestQueueSize?: number;
/** Number of worker thread, default to 4 */
workerThreads?: number;
doNotUsePrimaryConnToPrepare?: boolean;
/** After no activity for given time concurrenty query will automatically shutdown */
autoShutdowWhenIdealForSeconds?: number;
khanaffan marked this conversation as resolved.
Show resolved Hide resolved
/** Maximum number of statement cache per worker. Default to 40 */
statementCacheSizePerWorker?: number;
/* Monitor poll interval in milliseconds. Its responsable for cancelling queries that pass quota. It can be set between 1000 and Max time quota for query */
monitorPollInterval?: number;
/** Set memory map io for each worker connection size in bytes. Default to zero mean do not use mmap io */
memoryMapFileSize?: number;
}
5 changes: 4 additions & 1 deletion core/common/src/ECSqlReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ export interface QueryStats {
totalTime: number;
/** The number of retries attempted to execute the query. */
retryCount: number;
/** Total time in millisecond to prepare ECSQL or grabing it from cache and binding parameters */
prepareTime: number;
}

/**
Expand Down Expand Up @@ -173,7 +175,7 @@ export class ECSqlReader implements AsyncIterableIterator<QueryRowProxy> {
private _props = new PropertyMetaDataMap([]);
private _param = new QueryBinder().serialize();
private _lockArgs: boolean = false;
private _stats = { backendCpuTime: 0, backendTotalTime: 0, backendMemUsed: 0, backendRowsReturned: 0, totalTime: 0, retryCount: 0 };
private _stats = { backendCpuTime: 0, backendTotalTime: 0, backendMemUsed: 0, backendRowsReturned: 0, totalTime: 0, retryCount: 0, prepareTime: 0 };
private _options: QueryOptions = new QueryOptionsBuilder().getOptions();

private _rowProxy = new Proxy<ECSqlReader>(this, {
Expand Down Expand Up @@ -373,6 +375,7 @@ export class ECSqlReader implements AsyncIterableIterator<QueryRowProxy> {
this._stats.backendCpuTime += rs.stats.cpuTime;
this._stats.backendTotalTime += rs.stats.totalTime;
this._stats.backendMemUsed += rs.stats.memUsed;
this._stats.prepareTime += rs.stats.prepareTime;
this._stats.backendRowsReturned += (rs.data === undefined) ? 0 : rs.data.length;
};
const execQuery = async (req: DbQueryRequest) => {
Expand Down
Loading