Skip to content

Commit

Permalink
feat(spanner): add support for Multiplexed Session for Read Only Tran…
Browse files Browse the repository at this point in the history
…sactions
  • Loading branch information
alkatrivedi committed Jan 9, 2025
1 parent b467380 commit 85d3650
Show file tree
Hide file tree
Showing 11 changed files with 1,954 additions and 790 deletions.
1,558 changes: 1,089 additions & 469 deletions observability-test/database.ts

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion protos/protos.d.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion protos/protos.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 25 additions & 9 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1135,7 +1135,7 @@ class Database extends common.GrpcServiceObject {
private _releaseOnEnd(session: Session, transaction: Snapshot, span: Span) {
transaction.once('end', () => {
try {
this.pool_.release(session);
this.sessionFactory_.release(session);
} catch (e) {
setSpanErrorAndException(span, e as Error);
this.emit('error', e);
Expand Down Expand Up @@ -2112,7 +2112,7 @@ class Database extends common.GrpcServiceObject {
}

return startTrace('Database.getSnapshot', this._traceConfig, span => {
this.pool_.getSession((err, session) => {
this.sessionFactory_.getSession((err, session) => {
if (err) {
setSpanError(span, err);
span.end();
Expand All @@ -2125,17 +2125,28 @@ class Database extends common.GrpcServiceObject {
snapshot.begin(err => {
if (err) {
setSpanError(span, err);
if (isSessionNotFoundError(err)) {
if (
isSessionNotFoundError(err) &&
!this.sessionFactory_.isMultiplexedEnabled()
) {
span.addEvent('No session available', {
'session.id': session?.id,
});
session!.lastError = err;
this.pool_.release(session!);
this.sessionFactory_.release(session!);
span.end();
this.getSnapshot(options, callback!);
} else if (this.sessionFactory_.isMultiplexedEnabled()) {
span.addEvent('No session available', {
'session.id': session?.id,
});
session!.lastError = err;
this.sessionFactory_.release(session!);
span.end();
callback!(err);
} else {
span.addEvent('Using Session', {'session.id': session?.id});
this.pool_.release(session!);
this.sessionFactory_.release(session!);
span.end();
callback!(err);
}
Expand Down Expand Up @@ -3071,7 +3082,7 @@ class Database extends common.GrpcServiceObject {
...this._traceConfig,
};
return startTrace('Database.runStream', traceConfig, span => {
this.pool_.getSession((err, session) => {
this.sessionFactory_.getSession((err, session) => {
if (err) {
setSpanError(span, err);
proxyStream.destroy(err);
Expand All @@ -3098,7 +3109,8 @@ class Database extends common.GrpcServiceObject {

if (
!dataReceived &&
isSessionNotFoundError(err as grpc.ServiceError)
isSessionNotFoundError(err as grpc.ServiceError) &&
!this.sessionFactory_.isMultiplexedEnabled()
) {
// If it is a 'Session not found' error and we have not yet received
// any data, we can safely retry the query on a new session.
Expand Down Expand Up @@ -3656,8 +3668,12 @@ class Database extends common.GrpcServiceObject {
: {};

return startTrace('Database.writeAtLeastOnce', this._traceConfig, span => {
this.pool_.getSession((err, session?, transaction?) => {
if (err && isSessionNotFoundError(err as grpc.ServiceError)) {
this.sessionFactory_.getSession((err, session?, transaction?) => {
if (
err &&
isSessionNotFoundError(err as grpc.ServiceError) &&
!this.sessionFactory_.isMultiplexedEnabled()
) {
span.addEvent('No session available', {
'session.id': session?.id,
});
Expand Down
4 changes: 0 additions & 4 deletions src/multiplexed-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ export class MultiplexedSession
database: Database;
// frequency to create new mux session
refreshRate: number;
isMultiplexedEnabled: boolean;
_multiplexedSession: Session | null;
_refreshHandle!: NodeJS.Timer;
_observabilityOptions?: ObservabilityOptions;
Expand All @@ -82,9 +81,6 @@ export class MultiplexedSession
this.refreshRate = 7;
this._multiplexedSession = null;
this._observabilityOptions = database._observabilityOptions;
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS === 'true'
? (this.isMultiplexedEnabled = true)
: (this.isMultiplexedEnabled = false);
}

/**
Expand Down
23 changes: 16 additions & 7 deletions src/session-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ export interface SessionFactoryInterface {
* @param {Session} session The session to be released.
*/
release(session: Session): void;

isMultiplexedEnabled(): boolean;
}

/**
Expand All @@ -89,6 +91,7 @@ export class SessionFactory
{
multiplexedSession_: MultiplexedSessionInterface;
pool_: SessionPoolInterface;
isMultiplexed: boolean;
constructor(
database: Database,
name: String,
Expand All @@ -105,8 +108,11 @@ export class SessionFactory
this.pool_.on('error', this.emit.bind(database, 'error'));
this.pool_.open();
this.multiplexedSession_ = new MultiplexedSession(database);
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS === 'true'
? (this.isMultiplexed = true)
: (this.isMultiplexed = false);
// Multiplexed sessions should only be created if its enabled.
if ((this.multiplexedSession_ as MultiplexedSession).isMultiplexedEnabled) {
if (this.isMultiplexed) {
this.multiplexedSession_.on('error', this.emit.bind(database, 'error'));
this.multiplexedSession_.createSession();
}
Expand All @@ -122,12 +128,13 @@ export class SessionFactory
*/

getSession(callback: GetSessionCallback): void {
const sessionHandler = (this.multiplexedSession_ as MultiplexedSession)
.isMultiplexedEnabled
const sessionHandler = this.isMultiplexed
? this.multiplexedSession_
: this.pool_;

sessionHandler!.getSession((err, session) => callback(err, session));
sessionHandler!.getSession((err, session, transaction) =>
callback(err, session, transaction)
);
}

/**
Expand All @@ -152,10 +159,12 @@ export class SessionFactory
* @throws {Error} If the session is invalid or cannot be released.
*/
release(session: Session): void {
if (
!(this.multiplexedSession_ as MultiplexedSession).isMultiplexedEnabled
) {
if (!this.isMultiplexed) {
this.pool_.release(session);
}
}

isMultiplexedEnabled(): boolean {
return this.isMultiplexed;
}
}
Loading

0 comments on commit 85d3650

Please sign in to comment.