Skip to content

Commit

Permalink
feat: support RPC priority (#1282)
Browse files Browse the repository at this point in the history
* regen: regenerate client with rpc priority

* feat: support rpc priority

* fix: change to non-breaking change + fix sample

* docs: cleanup sample and add comment on prio

* fix: update copyright year

* chore: cleanup after merge

* fix: sample test + possible undefined requestoptions

* fix: wait for sample to finish

* merge: remove manual changes from generated files

* fix: omit transaction tags + add tests

* fix: skip notation

* fix: CommitOptions should not include transaction tag
  • Loading branch information
olavloite authored Apr 8, 2021
1 parent 5659e40 commit 8c82694
Show file tree
Hide file tree
Showing 11 changed files with 383 additions and 11 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ system-test/*key.json
.DS_Store
package-lock.json
__pycache__
.idea
73 changes: 73 additions & 0 deletions samples/rpc-priority.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

'use strict';

async function main(instanceId, databaseId, projectId) {
// TODO: Add start region tag here
// Imports the Google Cloud client library.
const {Spanner, protos} = require('@google-cloud/spanner');
const Priority = protos.google.spanner.v1.RequestOptions.Priority;

/**
* TODO(developer): Uncomment the following lines before running the sample.
*/
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
projectId: projectId,
});

async function queryWithRpcPriority(instanceId, databaseId) {
// Gets a reference to a Cloud Spanner instance and database.
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

const sql = `SELECT AlbumId, AlbumTitle, MarketingBudget
FROM Albums
ORDER BY AlbumTitle`;

try {
// Execute a query with low priority. Note that the default for all
// requests is PRIORITY_HIGH, and that this option can only be used to
// reduce the priority of a request.
const [rows] = await database.run({
sql,
requestOptions: {
priority: Priority.PRIORITY_LOW,
},
});

rows.forEach(row => {
const json = row.toJSON();
console.log(
`AlbumId: ${json.AlbumId}, AlbumTitle: ${json.AlbumTitle}, MarketingBudget: ${json.MarketingBudget}`
);
});
} catch (err) {
console.error('ERROR:', err);
} finally {
// Close the database when finished.
await database.close();
}
}
// TODO: Add end region tag here
await queryWithRpcPriority(instanceId, databaseId);
}
main(...process.argv.slice(2)).then(() =>
console.log('Finished executing sample')
);
13 changes: 13 additions & 0 deletions samples/system-test/spanner.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const crudCmd = 'node crud.js';
const schemaCmd = 'node schema.js';
const indexingCmd = 'node indexing.js';
const queryOptionsCmd = 'node queryoptions.js';
const rpcPriorityCommand = 'node rpc-priority.js';
const transactionCmd = 'node transaction.js';
const timestampCmd = 'node timestamp.js';
const structCmd = 'node struct.js';
Expand Down Expand Up @@ -489,6 +490,18 @@ describe('Spanner', () => {
);
});

// query with RPC priority
// TODO: Enable when RPC Priority has been released.
it.skip('should use request options', async () => {
const output = execSync(
`${rpcPriorityCommand} queryWithRpcPriority ${INSTANCE_ID} ${DATABASE_ID} ${PROJECT_ID}`
);
assert.match(
output,
/AlbumId: 2, AlbumTitle: Forever Hold your Peace, MarketingBudget:/
);
});

// read_only_transaction
it('should read an example table using transactions', async () => {
const output = execSync(
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ export interface CreateInstanceRequest {
labels?: {[k: string]: string} | null;
gaxOptions?: CallOptions;
}

/**
* Translates enum values to string keys.
*
Expand Down
31 changes: 24 additions & 7 deletions src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import {NormalCallback, CLOUD_RESOURCE_HEADER} from './common';
import {google} from '../protos/protos';
import IAny = google.protobuf.IAny;
import IQueryOptions = google.spanner.v1.ExecuteSqlRequest.IQueryOptions;
import IRequestOptions = google.spanner.v1.IRequestOptions;
import {Database} from '.';

export type Rows = Array<Row | Json>;
Expand All @@ -60,6 +61,7 @@ export interface RequestOptions {
}

export interface CommitOptions {
requestOptions?: Pick<IRequestOptions, 'priority'>;
returnCommitStats?: boolean;
gaxOptions?: CallOptions;
}
Expand All @@ -76,6 +78,7 @@ export interface ExecuteSqlRequest extends Statement, RequestOptions {
partitionToken?: Uint8Array | string;
seqno?: number;
queryOptions?: IQueryOptions;
requestOptions?: Omit<IRequestOptions, 'transactionTag'>;
}

export interface KeyRange {
Expand All @@ -95,6 +98,7 @@ export interface ReadRequest extends RequestOptions {
limit?: number | Long | null;
resumeToken?: Uint8Array | null;
partitionToken?: Uint8Array | null;
requestOptions?: Omit<IRequestOptions, 'transactionTag'>;
}

export interface BatchUpdateError extends grpc.ServiceError {
Expand Down Expand Up @@ -127,6 +131,10 @@ export interface BatchUpdateCallback {
response?: spannerClient.spanner.v1.ExecuteBatchDmlResponse
): void;
}
export interface BatchUpdateOptions {
requestOptions?: Omit<IRequestOptions, 'transactionTag'>;
gaxOptions?: CallOptions;
}

export type ReadCallback = NormalCallback<Rows>;

Expand Down Expand Up @@ -1269,15 +1277,15 @@ export class Transaction extends Dml {

batchUpdate(
queries: Array<string | Statement>,
gaxOptions?: CallOptions
options?: BatchUpdateOptions | CallOptions
): Promise<BatchUpdateResponse>;
batchUpdate(
queries: Array<string | Statement>,
callback: BatchUpdateCallback
): void;
batchUpdate(
queries: Array<string | Statement>,
gaxOptions: CallOptions,
options: BatchUpdateOptions | CallOptions,
callback: BatchUpdateCallback
): void;
/**
Expand Down Expand Up @@ -1311,6 +1319,7 @@ export class Transaction extends Dml {
* @param {object} [query.types] A map of parameter types.
* @param {object} [gaxOptions] Request configuration options, outlined here:
* https://googleapis.github.io/gax-nodejs/classes/CallSettings.html.
* @param {BatchUpdateOptions} [options] Options for configuring the request.
* @param {RunUpdateCallback} [callback] Callback function.
* @returns {Promise<RunUpdateResponse>}
*
Expand All @@ -1337,13 +1346,17 @@ export class Transaction extends Dml {
*/
batchUpdate(
queries: Array<string | Statement>,
gaxOptionsOrCallback?: CallOptions | BatchUpdateCallback,
optionsOrCallback?: BatchUpdateOptions | CallOptions | BatchUpdateCallback,
cb?: BatchUpdateCallback
): Promise<BatchUpdateResponse> | void {
const gaxOpts =
typeof gaxOptionsOrCallback === 'object' ? gaxOptionsOrCallback : {};
const options =
typeof optionsOrCallback === 'object' ? optionsOrCallback : {};
const callback =
typeof gaxOptionsOrCallback === 'function' ? gaxOptionsOrCallback : cb!;
typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!;
const gaxOpts =
'gaxOptions' in options
? (options as BatchUpdateOptions).gaxOptions
: options;

if (!Array.isArray(queries) || !queries.length) {
const rowCounts: number[] = [];
Expand Down Expand Up @@ -1371,6 +1384,7 @@ export class Transaction extends Dml {
session: this.session.formattedName_!,
transaction: {id: this.id!},
seqno: this._seqno++,
requestOptions: (options as BatchUpdateOptions).requestOptions,
statements,
} as spannerClient.spanner.v1.ExecuteBatchDmlRequest;

Expand Down Expand Up @@ -1442,6 +1456,8 @@ export class Transaction extends Dml {
commit(options: CommitOptions | CallOptions, callback: CommitCallback): void;
/**
* @typedef {object} CommitOptions
* @property {IRequestOptions} requestOptions The request options to include
* with the commit request.
* @property {boolean} returnCommitStats Include statistics related to the
* transaction in the {@link CommitResponse}.
* @property {CallOptions} [gaxOptions] The request configuration options
Expand Down Expand Up @@ -1511,7 +1527,8 @@ export class Transaction extends Dml {

const mutations = this._queuedMutations;
const session = this.session.formattedName_!;
const reqOpts: CommitRequest = {mutations, session};
const requestOptions = (options as CommitOptions).requestOptions;
const reqOpts: CommitRequest = {mutations, session, requestOptions};

if (this.id) {
reqOpts.transactionId = this.id as Uint8Array;
Expand Down
2 changes: 1 addition & 1 deletion synth.metadata
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,4 @@
}
}
]
}
}
23 changes: 23 additions & 0 deletions test/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import {MockError} from './mockserver/mockspanner';
import {IOperation} from '../src/instance';
import {CLOUD_RESOURCE_HEADER} from '../src/common';
import {google} from '../protos/protos';
import RequestOptions = google.spanner.v1.RequestOptions;
import EncryptionType = google.spanner.admin.database.v1.RestoreDatabaseEncryptionConfig.EncryptionType;

let promisified = false;
Expand Down Expand Up @@ -2397,6 +2398,28 @@ describe('Database', () => {

assert.strictEqual(releaseStub.callCount, 1);
});

it('should accept requestOptions', () => {
const fakeCallback = sandbox.spy();

database.runPartitionedUpdate(
{
sql: QUERY.sql,
params: QUERY.params,
requestOptions: {priority: RequestOptions.Priority.PRIORITY_LOW},
},
fakeCallback
);

const [query] = runUpdateStub.lastCall.args;

assert.deepStrictEqual(query, {
sql: QUERY.sql,
params: QUERY.params,
requestOptions: {priority: RequestOptions.Priority.PRIORITY_LOW},
});
assert.ok(fakeCallback.calledOnce);
});
});

describe('runTransaction', () => {
Expand Down
5 changes: 5 additions & 0 deletions test/mockserver/mockspanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ export class MockSpanner {

this.executeBatchDml = this.executeBatchDml.bind(this);
this.executeStreamingSql = this.executeStreamingSql.bind(this);

this.read = this.read.bind(this);
this.streamingRead = this.streamingRead.bind(this);
}

/**
Expand Down Expand Up @@ -674,6 +677,7 @@ export class MockSpanner {
>,
callback: protobuf.Spanner.ExecuteBatchDmlCallback
) {
this.requests.push(call.request!);
this.simulateExecutionTime(this.executeBatchDml.name)
.then(() => {
if (call.request!.transaction && call.request!.transaction.id) {
Expand Down Expand Up @@ -824,6 +828,7 @@ export class MockSpanner {
call: grpc.ServerUnaryCall<protobuf.CommitRequest, protobuf.CommitResponse>,
callback: protobuf.Spanner.CommitCallback
) {
this.requests.push(call.request!);
this.simulateExecutionTime(this.commit.name)
.then(() => {
if (call.request!.transactionId) {
Expand Down
Loading

0 comments on commit 8c82694

Please sign in to comment.