diff --git a/.gitignore b/.gitignore index 5d32b2378..5c70cc712 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ system-test/*key.json .DS_Store package-lock.json __pycache__ +.idea diff --git a/samples/rpc-priority.js b/samples/rpc-priority.js new file mode 100644 index 000000000..c3981059b --- /dev/null +++ b/samples/rpc-priority.js @@ -0,0 +1,73 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +'use strict'; + +async function main(instanceId, databaseId, projectId) { + // TODO: Add start region tag here + // Imports the Google Cloud client library. + const {Spanner, protos} = require('@google-cloud/spanner'); + const Priority = protos.google.spanner.v1.RequestOptions.Priority; + + /** + * TODO(developer): Uncomment the following lines before running the sample. + */ + // const projectId = 'my-project-id'; + // const instanceId = 'my-instance'; + // const databaseId = 'my-database'; + + // Creates a client + const spanner = new Spanner({ + projectId: projectId, + }); + + async function queryWithRpcPriority(instanceId, databaseId) { + // Gets a reference to a Cloud Spanner instance and database. + const instance = spanner.instance(instanceId); + const database = instance.database(databaseId); + + const sql = `SELECT AlbumId, AlbumTitle, MarketingBudget + FROM Albums + ORDER BY AlbumTitle`; + + try { + // Execute a query with low priority. Note that the default for all + // requests is PRIORITY_HIGH, and that this option can only be used to + // reduce the priority of a request. + const [rows] = await database.run({ + sql, + requestOptions: { + priority: Priority.PRIORITY_LOW, + }, + }); + + rows.forEach(row => { + const json = row.toJSON(); + console.log( + `AlbumId: ${json.AlbumId}, AlbumTitle: ${json.AlbumTitle}, MarketingBudget: ${json.MarketingBudget}` + ); + }); + } catch (err) { + console.error('ERROR:', err); + } finally { + // Close the database when finished. + await database.close(); + } + } + // TODO: Add end region tag here + await queryWithRpcPriority(instanceId, databaseId); +} +main(...process.argv.slice(2)).then(() => + console.log('Finished executing sample') +); diff --git a/samples/system-test/spanner.test.js b/samples/system-test/spanner.test.js index 8460ff0b4..be6c784c6 100644 --- a/samples/system-test/spanner.test.js +++ b/samples/system-test/spanner.test.js @@ -28,6 +28,7 @@ const crudCmd = 'node crud.js'; const schemaCmd = 'node schema.js'; const indexingCmd = 'node indexing.js'; const queryOptionsCmd = 'node queryoptions.js'; +const rpcPriorityCommand = 'node rpc-priority.js'; const transactionCmd = 'node transaction.js'; const timestampCmd = 'node timestamp.js'; const structCmd = 'node struct.js'; @@ -489,6 +490,18 @@ describe('Spanner', () => { ); }); + // query with RPC priority + // TODO: Enable when RPC Priority has been released. + it.skip('should use request options', async () => { + const output = execSync( + `${rpcPriorityCommand} queryWithRpcPriority ${INSTANCE_ID} ${DATABASE_ID} ${PROJECT_ID}` + ); + assert.match( + output, + /AlbumId: 2, AlbumTitle: Forever Hold your Peace, MarketingBudget:/ + ); + }); + // read_only_transaction it('should read an example table using transactions', async () => { const output = execSync( diff --git a/src/index.ts b/src/index.ts index f4bfb7d7e..9e1f92fe6 100644 --- a/src/index.ts +++ b/src/index.ts @@ -96,6 +96,7 @@ export interface CreateInstanceRequest { labels?: {[k: string]: string} | null; gaxOptions?: CallOptions; } + /** * Translates enum values to string keys. * diff --git a/src/transaction.ts b/src/transaction.ts index 2c4d7d5c6..e5a3b544c 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -37,6 +37,7 @@ import {NormalCallback, CLOUD_RESOURCE_HEADER} from './common'; import {google} from '../protos/protos'; import IAny = google.protobuf.IAny; import IQueryOptions = google.spanner.v1.ExecuteSqlRequest.IQueryOptions; +import IRequestOptions = google.spanner.v1.IRequestOptions; import {Database} from '.'; export type Rows = Array; @@ -60,6 +61,7 @@ export interface RequestOptions { } export interface CommitOptions { + requestOptions?: Pick; returnCommitStats?: boolean; gaxOptions?: CallOptions; } @@ -76,6 +78,7 @@ export interface ExecuteSqlRequest extends Statement, RequestOptions { partitionToken?: Uint8Array | string; seqno?: number; queryOptions?: IQueryOptions; + requestOptions?: Omit; } export interface KeyRange { @@ -95,6 +98,7 @@ export interface ReadRequest extends RequestOptions { limit?: number | Long | null; resumeToken?: Uint8Array | null; partitionToken?: Uint8Array | null; + requestOptions?: Omit; } export interface BatchUpdateError extends grpc.ServiceError { @@ -127,6 +131,10 @@ export interface BatchUpdateCallback { response?: spannerClient.spanner.v1.ExecuteBatchDmlResponse ): void; } +export interface BatchUpdateOptions { + requestOptions?: Omit; + gaxOptions?: CallOptions; +} export type ReadCallback = NormalCallback; @@ -1269,7 +1277,7 @@ export class Transaction extends Dml { batchUpdate( queries: Array, - gaxOptions?: CallOptions + options?: BatchUpdateOptions | CallOptions ): Promise; batchUpdate( queries: Array, @@ -1277,7 +1285,7 @@ export class Transaction extends Dml { ): void; batchUpdate( queries: Array, - gaxOptions: CallOptions, + options: BatchUpdateOptions | CallOptions, callback: BatchUpdateCallback ): void; /** @@ -1311,6 +1319,7 @@ export class Transaction extends Dml { * @param {object} [query.types] A map of parameter types. * @param {object} [gaxOptions] Request configuration options, outlined here: * https://googleapis.github.io/gax-nodejs/classes/CallSettings.html. + * @param {BatchUpdateOptions} [options] Options for configuring the request. * @param {RunUpdateCallback} [callback] Callback function. * @returns {Promise} * @@ -1337,13 +1346,17 @@ export class Transaction extends Dml { */ batchUpdate( queries: Array, - gaxOptionsOrCallback?: CallOptions | BatchUpdateCallback, + optionsOrCallback?: BatchUpdateOptions | CallOptions | BatchUpdateCallback, cb?: BatchUpdateCallback ): Promise | void { - const gaxOpts = - typeof gaxOptionsOrCallback === 'object' ? gaxOptionsOrCallback : {}; + const options = + typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; const callback = - typeof gaxOptionsOrCallback === 'function' ? gaxOptionsOrCallback : cb!; + typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; + const gaxOpts = + 'gaxOptions' in options + ? (options as BatchUpdateOptions).gaxOptions + : options; if (!Array.isArray(queries) || !queries.length) { const rowCounts: number[] = []; @@ -1371,6 +1384,7 @@ export class Transaction extends Dml { session: this.session.formattedName_!, transaction: {id: this.id!}, seqno: this._seqno++, + requestOptions: (options as BatchUpdateOptions).requestOptions, statements, } as spannerClient.spanner.v1.ExecuteBatchDmlRequest; @@ -1442,6 +1456,8 @@ export class Transaction extends Dml { commit(options: CommitOptions | CallOptions, callback: CommitCallback): void; /** * @typedef {object} CommitOptions + * @property {IRequestOptions} requestOptions The request options to include + * with the commit request. * @property {boolean} returnCommitStats Include statistics related to the * transaction in the {@link CommitResponse}. * @property {CallOptions} [gaxOptions] The request configuration options @@ -1511,7 +1527,8 @@ export class Transaction extends Dml { const mutations = this._queuedMutations; const session = this.session.formattedName_!; - const reqOpts: CommitRequest = {mutations, session}; + const requestOptions = (options as CommitOptions).requestOptions; + const reqOpts: CommitRequest = {mutations, session, requestOptions}; if (this.id) { reqOpts.transactionId = this.id as Uint8Array; diff --git a/synth.metadata b/synth.metadata index d86f4d578..5d35323c9 100644 --- a/synth.metadata +++ b/synth.metadata @@ -52,4 +52,4 @@ } } ] -} \ No newline at end of file +} diff --git a/test/database.ts b/test/database.ts index 580db7148..ff5308c14 100644 --- a/test/database.ts +++ b/test/database.ts @@ -33,6 +33,7 @@ import {MockError} from './mockserver/mockspanner'; import {IOperation} from '../src/instance'; import {CLOUD_RESOURCE_HEADER} from '../src/common'; import {google} from '../protos/protos'; +import RequestOptions = google.spanner.v1.RequestOptions; import EncryptionType = google.spanner.admin.database.v1.RestoreDatabaseEncryptionConfig.EncryptionType; let promisified = false; @@ -2397,6 +2398,28 @@ describe('Database', () => { assert.strictEqual(releaseStub.callCount, 1); }); + + it('should accept requestOptions', () => { + const fakeCallback = sandbox.spy(); + + database.runPartitionedUpdate( + { + sql: QUERY.sql, + params: QUERY.params, + requestOptions: {priority: RequestOptions.Priority.PRIORITY_LOW}, + }, + fakeCallback + ); + + const [query] = runUpdateStub.lastCall.args; + + assert.deepStrictEqual(query, { + sql: QUERY.sql, + params: QUERY.params, + requestOptions: {priority: RequestOptions.Priority.PRIORITY_LOW}, + }); + assert.ok(fakeCallback.calledOnce); + }); }); describe('runTransaction', () => { diff --git a/test/mockserver/mockspanner.ts b/test/mockserver/mockspanner.ts index 0d8606170..603acbcc8 100644 --- a/test/mockserver/mockspanner.ts +++ b/test/mockserver/mockspanner.ts @@ -254,6 +254,9 @@ export class MockSpanner { this.executeBatchDml = this.executeBatchDml.bind(this); this.executeStreamingSql = this.executeStreamingSql.bind(this); + + this.read = this.read.bind(this); + this.streamingRead = this.streamingRead.bind(this); } /** @@ -674,6 +677,7 @@ export class MockSpanner { >, callback: protobuf.Spanner.ExecuteBatchDmlCallback ) { + this.requests.push(call.request!); this.simulateExecutionTime(this.executeBatchDml.name) .then(() => { if (call.request!.transaction && call.request!.transaction.id) { @@ -824,6 +828,7 @@ export class MockSpanner { call: grpc.ServerUnaryCall, callback: protobuf.Spanner.CommitCallback ) { + this.requests.push(call.request!); this.simulateExecutionTime(this.commit.name) .then(() => { if (call.request!.transactionId) { diff --git a/test/spanner.ts b/test/spanner.ts index 4f89910c4..23a89120a 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -14,7 +14,7 @@ * limitations under the License. */ -import {Done, describe, before, after, beforeEach, it} from 'mocha'; +import {after, before, beforeEach, describe, Done, it} from 'mocha'; import * as assert from 'assert'; import {grpc, Status} from 'google-gax'; import {Database, Instance, SessionPool, Snapshot, Spanner} from '../src'; @@ -41,13 +41,15 @@ import { SessionPoolOptions, } from '../src/session-pool'; import {Json} from '../src/codec'; +import * as stream from 'stream'; +import * as util from 'util'; import CreateInstanceMetadata = google.spanner.admin.instance.v1.CreateInstanceMetadata; import QueryOptions = google.spanner.v1.ExecuteSqlRequest.QueryOptions; import v1 = google.spanner.v1; import IQueryOptions = google.spanner.v1.ExecuteSqlRequest.IQueryOptions; import ResultSetStats = google.spanner.v1.ResultSetStats; -import * as stream from 'stream'; -import * as util from 'util'; +import RequestOptions = google.spanner.v1.RequestOptions; +import Priority = google.spanner.v1.RequestOptions.Priority; function numberToEnglishWord(num: number): string { switch (num) { @@ -178,6 +180,95 @@ describe('Spanner with mock server', () => { } }); + it('should execute query with requestOptions', async () => { + const priority = RequestOptions.Priority.PRIORITY_HIGH; + const database = newTestDatabase(); + try { + const [rows] = await database.run({ + sql: selectSql, + requestOptions: {priority: priority}, + }); + assert.strictEqual(rows.length, 3); + } finally { + await database.close(); + } + const request = spannerMock.getRequests().find(val => { + return (val as v1.ExecuteSqlRequest).sql; + }) as v1.ExecuteSqlRequest; + assert.ok(request, 'no ExecuteSqlRequest found'); + assert.ok( + request.requestOptions, + 'no requestOptions found on ExecuteSqlRequest' + ); + assert.strictEqual(request.requestOptions!.priority, 'PRIORITY_HIGH'); + }); + + it('should execute read with requestOptions', async () => { + const database = newTestDatabase(); + const [snapshot] = await database.getSnapshot(); + try { + await snapshot.read('foo', { + keySet: {all: true}, + requestOptions: {priority: Priority.PRIORITY_MEDIUM}, + }); + } catch (e) { + // Ignore the fact that streaming read is unimplemented on the mock + // server. We just want to verify that the correct request is sent. + assert.strictEqual(e.code, Status.UNIMPLEMENTED); + } finally { + snapshot.end(); + await database.close(); + } + const request = spannerMock.getRequests().find(val => { + return (val as v1.ReadRequest).table === 'foo'; + }) as v1.ReadRequest; + assert.ok(request, 'no ReadRequest found'); + assert.ok( + request.requestOptions, + 'no requestOptions found on ReadRequest' + ); + assert.strictEqual(request.requestOptions!.priority, 'PRIORITY_MEDIUM'); + }); + + it('should execute batchUpdate with requestOptions', async () => { + const database = newTestDatabase(); + await database.runTransactionAsync(tx => { + return tx!.batchUpdate([insertSql, insertSql], { + requestOptions: {priority: RequestOptions.Priority.PRIORITY_MEDIUM}, + }); + }); + await database.close(); + const request = spannerMock.getRequests().find(val => { + return (val as v1.ExecuteBatchDmlRequest).statements; + }) as v1.ExecuteBatchDmlRequest; + assert.ok(request, 'no ExecuteBatchDmlRequest found'); + assert.ok( + request.requestOptions, + 'no requestOptions found on ExecuteBatchDmlRequest' + ); + assert.strictEqual(request.requestOptions!.priority, 'PRIORITY_MEDIUM'); + }); + + it('should execute update with requestOptions', async () => { + const database = newTestDatabase(); + await database.runTransactionAsync(tx => { + return tx!.runUpdate({ + sql: insertSql, + requestOptions: {priority: RequestOptions.Priority.PRIORITY_LOW}, + }); + }); + await database.close(); + const request = spannerMock.getRequests().find(val => { + return (val as v1.ExecuteSqlRequest).sql; + }) as v1.ExecuteSqlRequest; + assert.ok(request, 'no ExecuteSqlRequest found'); + assert.ok( + request.requestOptions, + 'no requestOptions found on ExecuteSqlRequest' + ); + assert.strictEqual(request.requestOptions!.priority, 'PRIORITY_LOW'); + }); + it('should return an array of json objects', async () => { const database = newTestDatabase(); try { @@ -2467,6 +2558,30 @@ describe('Spanner with mock server', () => { }); }); + describe('table', () => { + it('should use requestOptions for mutations', async () => { + const database = newTestDatabase(); + await database + .table('foo') + .upsert( + {id: 1, name: 'bar'}, + {requestOptions: {priority: RequestOptions.Priority.PRIORITY_MEDIUM}} + ); + + const request = spannerMock.getRequests().find(val => { + return (val as v1.CommitRequest).mutations; + }) as v1.CommitRequest; + assert.ok(request, 'no CommitRequest found'); + assert.ok( + request.requestOptions, + 'no requestOptions found on CommitRequest' + ); + assert.strictEqual(request.requestOptions!.priority, 'PRIORITY_MEDIUM'); + + await database.close(); + }); + }); + describe('instanceAdmin', () => { it('should list instance configurations', async () => { const [configs] = await spanner.getInstanceConfigs(); diff --git a/test/table.ts b/test/table.ts index 049f47002..b1027a24c 100644 --- a/test/table.ts +++ b/test/table.ts @@ -25,6 +25,8 @@ import {Transform} from 'stream'; import * as through from 'through2'; import {TimestampBounds} from '../src/transaction'; +import {google} from '../protos/protos'; +import RequestOptions = google.spanner.v1.RequestOptions; let promisified = false; const fakePfy = extend({}, pfy, { @@ -270,6 +272,17 @@ describe('Table', () => { table.deleteRows(KEYS, deleteRowsOptions, assert.ifError); }); + it('should accept requestOptions', done => { + const deleteRowsOptions = { + requestOptions: {priority: RequestOptions.Priority.PRIORITY_HIGH}, + }; + transaction.commit = options => { + assert.strictEqual(options, deleteRowsOptions); + done(); + }; + table.deleteRows(KEYS, deleteRowsOptions, assert.ifError); + }); + it('should delete the rows via transaction', done => { const stub = (sandbox.stub( transaction, @@ -368,6 +381,22 @@ describe('Table', () => { table.insert(ROW, insertRowsOptions, assert.ifError); }); + + it('should accept requestOptions', done => { + const insertRowsOptions = { + requestOptions: {priority: RequestOptions.Priority.PRIORITY_HIGH}, + }; + (sandbox.stub(transaction, 'insert') as sinon.SinonStub).withArgs( + table.name, + ROW + ); + transaction.commit = options => { + assert.strictEqual(options, insertRowsOptions); + done(); + }; + + table.insert(ROW, insertRowsOptions, assert.ifError); + }); }); describe('read', () => { @@ -490,6 +519,22 @@ describe('Table', () => { table.replace(ROW, replaceRowsOptions, assert.ifError); }); + + it('should accept requestOptions', done => { + const replaceRowsOptions = { + requestOptions: {priority: RequestOptions.Priority.PRIORITY_HIGH}, + }; + (sandbox.stub(transaction, 'replace') as sinon.SinonStub).withArgs( + table.name, + ROW + ); + transaction.commit = options => { + assert.strictEqual(options, replaceRowsOptions); + done(); + }; + + table.replace(ROW, replaceRowsOptions, assert.ifError); + }); }); describe('update', () => { @@ -548,6 +593,22 @@ describe('Table', () => { table.update(ROW, updateRowsOptions, assert.ifError); }); + + it('should accept requestOptions', done => { + const updateRowsOptions = { + requestOptions: {priority: RequestOptions.Priority.PRIORITY_LOW}, + }; + (sandbox.stub(transaction, 'update') as sinon.SinonStub).withArgs( + table.name, + ROW + ); + transaction.commit = options => { + assert.strictEqual(options, updateRowsOptions); + done(); + }; + + table.update(ROW, updateRowsOptions, assert.ifError); + }); }); describe('upsert', () => { @@ -606,5 +667,21 @@ describe('Table', () => { table.upsert(ROW, upsertRowsOptions, assert.ifError); }); + + it('should accept requestOptions', done => { + const upsertRowsOptions = { + requestOptions: {priority: RequestOptions.Priority.PRIORITY_MEDIUM}, + }; + (sandbox.stub(transaction, 'upsert') as sinon.SinonStub).withArgs( + table.name, + ROW + ); + transaction.commit = options => { + assert.strictEqual(options, upsertRowsOptions); + done(); + }; + + table.upsert(ROW, upsertRowsOptions, assert.ifError); + }); }); }); diff --git a/test/transaction.ts b/test/transaction.ts index c9d9e1a3a..55d91de4b 100644 --- a/test/transaction.ts +++ b/test/transaction.ts @@ -25,6 +25,7 @@ import * as sinon from 'sinon'; import {codec} from '../src/codec'; import {google} from '../protos/protos'; import {CLOUD_RESOURCE_HEADER} from '../src/common'; +import RequestOptions = google.spanner.v1.RequestOptions; describe('Transaction', () => { const sandbox = sinon.createSandbox(); @@ -1085,6 +1086,29 @@ describe('Transaction', () => { transaction.batchUpdate(STRING_STATEMENTS, gaxOptions, assert.ifError); }); + it('should accept gaxOptions in BatchUpdateOptions', done => { + const options = {gaxOptions: {}}; + transaction.request = config => { + assert.strictEqual(config.gaxOpts, options.gaxOptions); + done(); + }; + transaction.batchUpdate(STRING_STATEMENTS, options, assert.ifError); + }); + + it('should accept requestOptions', done => { + const options = { + requestOptions: {priority: RequestOptions.Priority.PRIORITY_MEDIUM}, + }; + transaction.request = config => { + assert.strictEqual( + config.reqOpts.requestOptions, + options.requestOptions + ); + done(); + }; + transaction.batchUpdate(STRING_STATEMENTS, options, assert.ifError); + }); + it('should return an error if statements are missing', done => { transaction.batchUpdate(null, err => { assert.strictEqual( @@ -1323,6 +1347,29 @@ describe('Transaction', () => { transaction.commit(options, assert.ifError); }); + it('should accept gaxOptions in CommitOptions', done => { + const options = {gaxOptions: {}}; + transaction.request = config => { + assert.strictEqual(config.gaxOpts, options.gaxOptions); + done(); + }; + transaction.commit(options, assert.ifError); + }); + + it('should accept requestOptions', done => { + const options = { + requestOptions: {priority: RequestOptions.Priority.PRIORITY_MEDIUM}, + }; + transaction.request = config => { + assert.strictEqual( + config.reqOpts.requestOptions, + options.requestOptions + ); + done(); + }; + transaction.commit(options, assert.ifError); + }); + it('should use the transaction `id` when set', () => { const id = 'transaction-id-123'; const stub = sandbox.stub(transaction, 'request');