From 535a917d8cb74db63e455975c1892283903d6ba2 Mon Sep 17 00:00:00 2001 From: Dave Gramlich Date: Fri, 14 Jun 2019 11:36:44 -0700 Subject: [PATCH] fix(deps)!: use grpc-js instead of grpc extension (#658) --- package.json | 1 - src/index.ts | 2 +- src/message-queues.ts | 8 +++++--- src/message-stream.ts | 23 +++++++++++++++++------ src/pubsub.ts | 10 +++------- src/pull-retry.ts | 2 +- system-test/pubsub.ts | 19 +++++++++++++++++++ test/message-queues.ts | 6 +++--- test/message-stream.ts | 2 +- test/pubsub.ts | 8 ++------ test/pull-retry.ts | 2 +- test/subscription.ts | 4 ++-- test/topic.ts | 4 ++-- 13 files changed, 57 insertions(+), 34 deletions(-) diff --git a/package.json b/package.json index b50d0cfa2..1bb489149 100644 --- a/package.json +++ b/package.json @@ -62,7 +62,6 @@ "extend": "^3.0.2", "google-auth-library": "^3.0.0", "google-gax": "^1.0.0", - "grpc": "1.21.1", "is-stream-ended": "^0.1.4", "lodash.snakecase": "^4.1.1", "p-defer": "^3.0.0", diff --git a/src/index.ts b/src/index.ts index f7dc6610d..36b892c5e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -77,8 +77,8 @@ const v1 = require('./v1'); export {v1}; +export {ServiceError} from '@grpc/grpc-js'; export {CallOptions} from 'google-gax'; -export {ServiceError} from 'grpc'; export { Policy, GetPolicyCallback, diff --git a/src/message-queues.ts b/src/message-queues.ts index a20fa07b9..32f98f221 100644 --- a/src/message-queues.ts +++ b/src/message-queues.ts @@ -15,7 +15,7 @@ */ import {CallOptions} from 'google-gax'; -import {Metadata, ServiceError, status} from 'grpc'; +import {Metadata, ServiceError, status} from '@grpc/grpc-js'; import defer = require('p-defer'); import {Message, Subscriber} from './subscriber'; @@ -48,8 +48,9 @@ export interface BatchOptions { */ export class BatchError extends Error implements ServiceError { ackIds: string[]; - code?: status; - metadata?: Metadata; + code: status; + details: string; + metadata: Metadata; constructor(err: ServiceError, ackIds: string[], rpc: string) { super( `Failed to "${rpc}" for ${ackIds.length} message(s). Reason: ${err.message}` @@ -57,6 +58,7 @@ export class BatchError extends Error implements ServiceError { this.ackIds = ackIds; this.code = err.code; + this.details = err.details; this.metadata = err.metadata; } } diff --git a/src/message-stream.ts b/src/message-stream.ts index 8ca1cb62c..f78bd4522 100644 --- a/src/message-stream.ts +++ b/src/message-stream.ts @@ -22,7 +22,7 @@ import { ServiceError, status, StatusObject, -} from 'grpc'; +} from '@grpc/grpc-js'; import * as isStreamEnded from 'is-stream-ended'; import {PassThrough} from 'stream'; @@ -69,11 +69,13 @@ type PullStream = ClientDuplexStream & { * @param {object} status The gRPC status object. */ export class StatusError extends Error implements ServiceError { - code?: status; - metadata?: Metadata; + code: status; + details: string; + metadata: Metadata; constructor(status: StatusObject) { super(status.details); this.code = status.code; + this.details = status.details; this.metadata = status.metadata; } } @@ -87,11 +89,15 @@ export class StatusError extends Error implements ServiceError { */ export class ChannelError extends Error implements ServiceError { code: status; + details: string; + metadata: Metadata; constructor(err: Error) { super(`Failed to connect to channel. Reason: ${err.message}`); this.code = err.message.includes('deadline') ? status.DEADLINE_EXCEEDED : status.UNKNOWN; + this.details = err.message; + this.metadata = new Metadata(); } } @@ -259,9 +265,14 @@ export class MessageStream extends PassThrough { * @private */ private _keepAlive(): void { - for (const stream of this._streams.keys()) { - stream.write({}); - } + this._streams.forEach((receivedStatus, stream) => { + // its possible that a status event fires off (signaling the rpc being + // closed) but the stream hasn't drained yet, writing to this stream will + // result in a `write after end` error + if (!receivedStatus) { + stream.write({}); + } + }); } /** * Once the stream has nothing left to read, we'll remove it and attempt to diff --git a/src/pubsub.ts b/src/pubsub.ts index fbd59e8d3..1c961db49 100644 --- a/src/pubsub.ts +++ b/src/pubsub.ts @@ -21,7 +21,8 @@ import is from '@sindresorhus/is'; import * as extend from 'extend'; import {GoogleAuth} from 'google-auth-library'; import * as gax from 'google-gax'; -import * as grpc from 'grpc'; +import * as grpc from '@grpc/grpc-js'; +import {ServiceError, ChannelCredentials} from '@grpc/grpc-js'; const PKG = require('../../package.json'); const v1 = require('./v1'); @@ -46,7 +47,6 @@ import {PublishOptions} from './publisher'; import {CallOptions} from 'google-gax'; import {Transform} from 'stream'; import {google} from '../proto/pubsub'; -import {ServiceError, ChannelCredentials} from 'grpc'; const opts = {} as gax.GrpcClientOptions; @@ -263,10 +263,6 @@ export class PubSub { } this.options = Object.assign( { - grpc, - 'grpc.keepalive_time_ms': 300000, - 'grpc.max_send_message_length': -1, - 'grpc.max_receive_message_length': 20000001, libName: 'gccl', libVersion: PKG.version, scopes: Object.keys(allScopes), @@ -932,7 +928,7 @@ export class PubSub { request(config: RequestConfig, callback: RequestCallback) { this.getClient_(config, (err, client) => { if (err) { - callback(err); + callback(err as ServiceError); return; } let reqOpts = extend(true, {}, config.reqOpts); diff --git a/src/pull-retry.ts b/src/pull-retry.ts index 838ade599..462721c3e 100644 --- a/src/pull-retry.ts +++ b/src/pull-retry.ts @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import {StatusObject, status} from 'grpc'; +import {StatusObject, status} from '@grpc/grpc-js'; /*! * retryable status codes diff --git a/system-test/pubsub.ts b/system-test/pubsub.ts index c031d2a64..55fcc1f46 100644 --- a/system-test/pubsub.ts +++ b/system-test/pubsub.ts @@ -15,6 +15,7 @@ */ import * as assert from 'assert'; +import * as crypto from 'crypto'; import defer = require('p-defer'); import * as uuid from 'uuid'; @@ -506,6 +507,24 @@ describe('pubsub', () => { } }); + it('should send and receive large messages', done => { + const subscription = topic.subscription(SUB_NAMES[0]); + const buf = crypto.randomBytes(9000000); // 9mb + + topic.publish(buf, (err, messageId) => { + assert.ifError(err); + + subscription.on('error', done).on('message', ({id, data}: Message) => { + if (id !== messageId) { + return; + } + + assert.deepStrictEqual(data, buf); + subscription.close(done); + }); + }); + }); + // can be ran manually to test options/memory usage/etc. // tslint:disable-next-line ban it.skip('should handle a large volume of messages', async function() { diff --git a/test/message-queues.ts b/test/message-queues.ts index b95979e47..cddd0ccaf 100644 --- a/test/message-queues.ts +++ b/test/message-queues.ts @@ -17,7 +17,7 @@ import * as assert from 'assert'; import {EventEmitter} from 'events'; import {CallOptions} from 'google-gax'; -import {Metadata, ServiceError} from 'grpc'; +import {Metadata, ServiceError} from '@grpc/grpc-js'; import * as proxyquire from 'proxyquire'; import * as sinon from 'sinon'; import * as uuid from 'uuid'; @@ -322,7 +322,7 @@ describe('MessageQueues', () => { const ackIds = messages.map(message => message.ackId); - const fakeError: ServiceError = new Error('Err.'); + const fakeError = new Error('Err.') as ServiceError; fakeError.code = 2; fakeError.metadata = new Metadata(); @@ -446,7 +446,7 @@ describe('MessageQueues', () => { const ackIds = messages.map(message => message.ackId); - const fakeError: ServiceError = new Error('Err.'); + const fakeError = new Error('Err.') as ServiceError; fakeError.code = 2; fakeError.metadata = new Metadata(); diff --git a/test/message-stream.ts b/test/message-stream.ts index a00e2b891..5e83aebe1 100644 --- a/test/message-stream.ts +++ b/test/message-stream.ts @@ -15,7 +15,7 @@ */ import * as assert from 'assert'; -import {Metadata, ServiceError} from 'grpc'; +import {Metadata, ServiceError} from '@grpc/grpc-js'; import * as proxyquire from 'proxyquire'; import * as sinon from 'sinon'; import {Duplex, PassThrough} from 'stream'; diff --git a/test/pubsub.ts b/test/pubsub.ts index 20cba5f88..bcba89379 100644 --- a/test/pubsub.ts +++ b/test/pubsub.ts @@ -19,8 +19,8 @@ import * as promisify from '@google-cloud/promisify'; import arrify = require('arrify'); import * as assert from 'assert'; import * as gax from 'google-gax'; -import * as grpc from 'grpc'; -import {CallOptions, ServiceError, ChannelCredentials} from 'grpc'; +import * as grpc from '@grpc/grpc-js'; +import {CallOptions, ChannelCredentials, ServiceError} from '@grpc/grpc-js'; import * as proxyquire from 'proxyquire'; import * as sinon from 'sinon'; @@ -189,10 +189,6 @@ describe('PubSub', () => { describe('instantiation', () => { const DEFAULT_OPTIONS = { - grpc, - 'grpc.keepalive_time_ms': 300000, - 'grpc.max_send_message_length': -1, - 'grpc.max_receive_message_length': 20000001, libName: 'gccl', libVersion: PKG.version, scopes: [], diff --git a/test/pull-retry.ts b/test/pull-retry.ts index 4c098234a..ea844c490 100644 --- a/test/pull-retry.ts +++ b/test/pull-retry.ts @@ -16,7 +16,7 @@ import assert = require('assert'); import sinon = require('sinon'); -import {StatusObject, status} from 'grpc'; +import {StatusObject, status} from '@grpc/grpc-js'; import {PullRetry} from '../src/pull-retry'; describe('PullRetry', () => { diff --git a/test/subscription.ts b/test/subscription.ts index 0d647693a..4ee0c4dc1 100644 --- a/test/subscription.ts +++ b/test/subscription.ts @@ -17,7 +17,7 @@ import * as pfy from '@google-cloud/promisify'; import * as assert from 'assert'; import {EventEmitter} from 'events'; -import {ServiceError} from 'grpc'; +import {ServiceError} from '@grpc/grpc-js'; import * as proxyquire from 'proxyquire'; import * as sinon from 'sinon'; @@ -506,7 +506,7 @@ describe('Subscription', () => { }); describe('error', () => { - const error = new Error('err'); + const error = new Error('err') as ServiceError; beforeEach(() => { subscription.request = (config, callback) => { diff --git a/test/topic.ts b/test/topic.ts index ecf15a2e7..695dfd3a9 100644 --- a/test/topic.ts +++ b/test/topic.ts @@ -17,7 +17,7 @@ import * as pfy from '@google-cloud/promisify'; import * as assert from 'assert'; import {CallOptions} from 'google-gax'; -import {ServiceError} from 'grpc'; +import {ServiceError} from '@grpc/grpc-js'; import * as proxyquire from 'proxyquire'; import * as sinon from 'sinon'; @@ -440,7 +440,7 @@ describe('Topic', () => { }); it('should pass back any errors that occur', done => { - const error = new Error('err'); + const error = new Error('err') as ServiceError; const apiResponse = {}; topic.request = (