Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
* Infer type of scoped functions in execute
* Expand IsISE to full name
* Rename tryAbort since it doesn't return a bool
* Fix a typo in log message
  • Loading branch information
Simon Zhao committed Apr 20, 2021
1 parent 977fa86 commit 843486e
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 49 deletions.
80 changes: 39 additions & 41 deletions src/QldbDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,13 @@ export class QldbDriver {
* Note: There is no corresponding `open` method and the only option is to instantiate another driver.
*/
close(): void {
this._isClosed = true;
while (this._sessionPool.length > 0) {
const session: QldbSession = this._sessionPool.pop();
if (session != undefined) {
session.endSession();
}
}
this._isClosed = true;
}

/**
Expand Down Expand Up @@ -169,56 +169,54 @@ export class QldbDriver {
* @throws {@linkcode InvalidSessionException} When a session expires either due to a long running transaction or session being idle for long time.
* @throws {@linkcode BadRequestException} When Amazon QLDB is not able to execute a query or transaction.
*/
async executeLambda<Type>(
transactionLambda: (transactionExecutor: TransactionExecutor) => Promise<Type>,
async executeLambda<T>(
transactionLambda: (transactionExecutor: TransactionExecutor) => Promise<T>,
retryConfig?: RetryConfig
): Promise<Type> {
): Promise<T> {
if (this._isClosed) {
throw new DriverClosedError();
}

// Acquire semaphore and get a session from the pool
const getSession: (thisDriver: QldbDriver, startNewSession: boolean) => Promise<QldbSession> =
async function(thisDriver: QldbDriver, startNewSession: boolean): Promise<QldbSession> {
debug(
`Getting session. Current free session count: ${thisDriver._sessionPool.length}. ` +
`Currently available permit count: ${thisDriver._availablePermits}.`
);
if (thisDriver._semaphore.tryAcquire()) {
thisDriver._availablePermits--;
try {
let session: QldbSession
if (!startNewSession) {
session = thisDriver._sessionPool.pop();
}
if (startNewSession || session == undefined) {
debug("Creating a new session.");
const communicator: Communicator =
await Communicator.create(thisDriver._qldbClient, thisDriver._ledgerName);
session = new QldbSession(communicator);
}
return session;
} catch (e) {
// An error when failing to start a new session is always retriable
throw new ExecuteError(e, true, true);
const getSession = async function(thisDriver: QldbDriver, startNewSession: boolean): Promise<QldbSession> {
debug(
`Getting session. Current free session count: ${thisDriver._sessionPool.length}. ` +
`Currently available permit count: ${thisDriver._availablePermits}.`
);
if (thisDriver._semaphore.tryAcquire()) {
thisDriver._availablePermits--;
try {
let session: QldbSession
if (!startNewSession) {
session = thisDriver._sessionPool.pop();
}
} else {
throw new SessionPoolEmptyError()
if (startNewSession || session == undefined) {
debug("Creating a new session.");
const communicator: Communicator =
await Communicator.create(thisDriver._qldbClient, thisDriver._ledgerName);
session = new QldbSession(communicator);
}
return session;
} catch (e) {
// An error when failing to start a new session is always retriable
throw new ExecuteError(e, true, true);
}
} else {
throw new SessionPoolEmptyError()
}
}

// Release semaphore and if the session is alive return it to the pool and return true
const releaseSession: (thisDriver: QldbDriver, session: QldbSession) => boolean =
function(thisDriver: QldbDriver, session: QldbSession): boolean {
thisDriver._semaphore.release();
thisDriver._availablePermits++;
if (session != null && session.isAlive()) {
thisDriver._sessionPool.push(session);
return true;
} else {
return false;
}
const releaseSession = function(thisDriver: QldbDriver, session: QldbSession): boolean {
thisDriver._semaphore.release();
thisDriver._availablePermits++;
if (session != null && session.isAlive()) {
thisDriver._sessionPool.push(session);
return true;
} else {
return false;
}
}

retryConfig = (retryConfig == null) ? this._retryConfig : retryConfig;
let session: QldbSession;
Expand All @@ -232,8 +230,8 @@ export class QldbDriver {
if (e instanceof ExecuteError) {
if (e.isRetriable) {
// Always retry on the first attempt if failure was caused by a stale session in the pool
if (retryAttempt == 1 && e.isISE) {
debug("Initial session received from pool is invalid. Retrtying...");
if (retryAttempt == 1 && e.isInvalidSessionException) {
debug("Initial session received from pool is invalid. Retrying...");
continue;
}
if (retryAttempt > retryConfig.getRetryLimit()) {
Expand Down
4 changes: 2 additions & 2 deletions src/QldbSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ export class QldbSession {
this._isAlive = false;
} else if (!isOccConflictException(e)) {
// OCC does not need session state reset as the transaction is implicitly closed
await this._tryAbort();
await this._cleanSessionState();
}
throw new ExecuteError(e, isRetriable, isISE, transactionId);
}
Expand All @@ -82,7 +82,7 @@ export class QldbSession {
return new Transaction(this._communicator, startTransactionResult.TransactionId);
}

private async _tryAbort(): Promise<void> {
private async _cleanSessionState(): Promise<void> {
try {
await this._communicator.abortTransaction();
} catch (e) {
Expand Down
6 changes: 3 additions & 3 deletions src/errors/Errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,16 @@ export class SessionPoolEmptyError extends Error {
export class ExecuteError extends Error {
cause: Error;
isRetriable: boolean;
isISE: boolean;
isInvalidSessionException: boolean;
transactionId: string;

constructor(cause: Error, isRetriable: boolean, isISE: boolean, transactionId: string = null) {
constructor(cause: Error, isRetriable: boolean, isInvalidSessionException: boolean, transactionId: string = null) {
const message: string = "Error containing the context of a failure during Execute.";
super(message);
Object.setPrototypeOf(this, ExecuteError.prototype)
this.cause = cause;
this.isRetriable = isRetriable;
this.isISE = isISE;
this.isInvalidSessionException = isInvalidSessionException;
this.transactionId = transactionId;
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/test/QldbSession.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -276,18 +276,18 @@ describe("QldbSession", () => {
});
});

describe("#_tryAbort()", () => {
describe("#_cleanSessionState()", () => {
it("should call abortTransaction()", async () => {
const communicatorAbortSpy = sandbox.spy(mockCommunicator, "abortTransaction");
await qldbSession["_tryAbort"]();
await qldbSession["_cleanSessionState"]();
sinon.assert.calledOnce(communicatorAbortSpy);
});

it("should log warning message when error is thrown and set alive state to false", async () => {
const communicatorAbortStub = sandbox.stub(mockCommunicator, "abortTransaction");
communicatorAbortStub.throws(new Error("testError"));
const logSpy = sandbox.spy(LogUtil, "warn");
await qldbSession["_tryAbort"]();
await qldbSession["_cleanSessionState"]();
sinon.assert.calledOnce(communicatorAbortStub);
sinon.assert.calledOnce(logSpy);
chai.assert.isFalse(qldbSession.isAlive());
Expand Down

0 comments on commit 843486e

Please sign in to comment.