Skip to content

Commit

Permalink
fix(deps)!: use grpc-js instead of grpc extension (#658)
Browse files Browse the repository at this point in the history
  • Loading branch information
callmehiphop authored Jun 14, 2019
1 parent a9972ea commit 535a917
Show file tree
Hide file tree
Showing 13 changed files with 57 additions and 34 deletions.
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
"extend": "^3.0.2",
"google-auth-library": "^3.0.0",
"google-gax": "^1.0.0",
"grpc": "1.21.1",
"is-stream-ended": "^0.1.4",
"lodash.snakecase": "^4.1.1",
"p-defer": "^3.0.0",
Expand Down
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@
const v1 = require('./v1');
export {v1};

export {ServiceError} from '@grpc/grpc-js';
export {CallOptions} from 'google-gax';
export {ServiceError} from 'grpc';
export {
Policy,
GetPolicyCallback,
Expand Down
8 changes: 5 additions & 3 deletions src/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

import {CallOptions} from 'google-gax';
import {Metadata, ServiceError, status} from 'grpc';
import {Metadata, ServiceError, status} from '@grpc/grpc-js';
import defer = require('p-defer');

import {Message, Subscriber} from './subscriber';
Expand Down Expand Up @@ -48,15 +48,17 @@ export interface BatchOptions {
*/
export class BatchError extends Error implements ServiceError {
ackIds: string[];
code?: status;
metadata?: Metadata;
code: status;
details: string;
metadata: Metadata;
constructor(err: ServiceError, ackIds: string[], rpc: string) {
super(
`Failed to "${rpc}" for ${ackIds.length} message(s). Reason: ${err.message}`
);

this.ackIds = ackIds;
this.code = err.code;
this.details = err.details;
this.metadata = err.metadata;
}
}
Expand Down
23 changes: 17 additions & 6 deletions src/message-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {
ServiceError,
status,
StatusObject,
} from 'grpc';
} from '@grpc/grpc-js';
import * as isStreamEnded from 'is-stream-ended';
import {PassThrough} from 'stream';

Expand Down Expand Up @@ -69,11 +69,13 @@ type PullStream = ClientDuplexStream<StreamingPullRequest, PullResponse> & {
* @param {object} status The gRPC status object.
*/
export class StatusError extends Error implements ServiceError {
code?: status;
metadata?: Metadata;
code: status;
details: string;
metadata: Metadata;
constructor(status: StatusObject) {
super(status.details);
this.code = status.code;
this.details = status.details;
this.metadata = status.metadata;
}
}
Expand All @@ -87,11 +89,15 @@ export class StatusError extends Error implements ServiceError {
*/
export class ChannelError extends Error implements ServiceError {
code: status;
details: string;
metadata: Metadata;
constructor(err: Error) {
super(`Failed to connect to channel. Reason: ${err.message}`);
this.code = err.message.includes('deadline')
? status.DEADLINE_EXCEEDED
: status.UNKNOWN;
this.details = err.message;
this.metadata = new Metadata();
}
}

Expand Down Expand Up @@ -259,9 +265,14 @@ export class MessageStream extends PassThrough {
* @private
*/
private _keepAlive(): void {
for (const stream of this._streams.keys()) {
stream.write({});
}
this._streams.forEach((receivedStatus, stream) => {
// its possible that a status event fires off (signaling the rpc being
// closed) but the stream hasn't drained yet, writing to this stream will
// result in a `write after end` error
if (!receivedStatus) {
stream.write({});
}
});
}
/**
* Once the stream has nothing left to read, we'll remove it and attempt to
Expand Down
10 changes: 3 additions & 7 deletions src/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import is from '@sindresorhus/is';
import * as extend from 'extend';
import {GoogleAuth} from 'google-auth-library';
import * as gax from 'google-gax';
import * as grpc from 'grpc';
import * as grpc from '@grpc/grpc-js';
import {ServiceError, ChannelCredentials} from '@grpc/grpc-js';

const PKG = require('../../package.json');
const v1 = require('./v1');
Expand All @@ -46,7 +47,6 @@ import {PublishOptions} from './publisher';
import {CallOptions} from 'google-gax';
import {Transform} from 'stream';
import {google} from '../proto/pubsub';
import {ServiceError, ChannelCredentials} from 'grpc';

const opts = {} as gax.GrpcClientOptions;

Expand Down Expand Up @@ -263,10 +263,6 @@ export class PubSub {
}
this.options = Object.assign(
{
grpc,
'grpc.keepalive_time_ms': 300000,
'grpc.max_send_message_length': -1,
'grpc.max_receive_message_length': 20000001,
libName: 'gccl',
libVersion: PKG.version,
scopes: Object.keys(allScopes),
Expand Down Expand Up @@ -932,7 +928,7 @@ export class PubSub {
request<T, R = void>(config: RequestConfig, callback: RequestCallback<T, R>) {
this.getClient_(config, (err, client) => {
if (err) {
callback(err);
callback(err as ServiceError);
return;
}
let reqOpts = extend(true, {}, config.reqOpts);
Expand Down
2 changes: 1 addition & 1 deletion src/pull-retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import {StatusObject, status} from 'grpc';
import {StatusObject, status} from '@grpc/grpc-js';

/*!
* retryable status codes
Expand Down
19 changes: 19 additions & 0 deletions system-test/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

import * as assert from 'assert';
import * as crypto from 'crypto';
import defer = require('p-defer');
import * as uuid from 'uuid';

Expand Down Expand Up @@ -506,6 +507,24 @@ describe('pubsub', () => {
}
});

it('should send and receive large messages', done => {
const subscription = topic.subscription(SUB_NAMES[0]);
const buf = crypto.randomBytes(9000000); // 9mb

topic.publish(buf, (err, messageId) => {
assert.ifError(err);

subscription.on('error', done).on('message', ({id, data}: Message) => {
if (id !== messageId) {
return;
}

assert.deepStrictEqual(data, buf);
subscription.close(done);
});
});
});

// can be ran manually to test options/memory usage/etc.
// tslint:disable-next-line ban
it.skip('should handle a large volume of messages', async function() {
Expand Down
6 changes: 3 additions & 3 deletions test/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import * as assert from 'assert';
import {EventEmitter} from 'events';
import {CallOptions} from 'google-gax';
import {Metadata, ServiceError} from 'grpc';
import {Metadata, ServiceError} from '@grpc/grpc-js';
import * as proxyquire from 'proxyquire';
import * as sinon from 'sinon';
import * as uuid from 'uuid';
Expand Down Expand Up @@ -322,7 +322,7 @@ describe('MessageQueues', () => {

const ackIds = messages.map(message => message.ackId);

const fakeError: ServiceError = new Error('Err.');
const fakeError = new Error('Err.') as ServiceError;
fakeError.code = 2;
fakeError.metadata = new Metadata();

Expand Down Expand Up @@ -446,7 +446,7 @@ describe('MessageQueues', () => {

const ackIds = messages.map(message => message.ackId);

const fakeError: ServiceError = new Error('Err.');
const fakeError = new Error('Err.') as ServiceError;
fakeError.code = 2;
fakeError.metadata = new Metadata();

Expand Down
2 changes: 1 addition & 1 deletion test/message-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

import * as assert from 'assert';
import {Metadata, ServiceError} from 'grpc';
import {Metadata, ServiceError} from '@grpc/grpc-js';
import * as proxyquire from 'proxyquire';
import * as sinon from 'sinon';
import {Duplex, PassThrough} from 'stream';
Expand Down
8 changes: 2 additions & 6 deletions test/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import * as promisify from '@google-cloud/promisify';
import arrify = require('arrify');
import * as assert from 'assert';
import * as gax from 'google-gax';
import * as grpc from 'grpc';
import {CallOptions, ServiceError, ChannelCredentials} from 'grpc';
import * as grpc from '@grpc/grpc-js';
import {CallOptions, ChannelCredentials, ServiceError} from '@grpc/grpc-js';
import * as proxyquire from 'proxyquire';
import * as sinon from 'sinon';

Expand Down Expand Up @@ -189,10 +189,6 @@ describe('PubSub', () => {

describe('instantiation', () => {
const DEFAULT_OPTIONS = {
grpc,
'grpc.keepalive_time_ms': 300000,
'grpc.max_send_message_length': -1,
'grpc.max_receive_message_length': 20000001,
libName: 'gccl',
libVersion: PKG.version,
scopes: [],
Expand Down
2 changes: 1 addition & 1 deletion test/pull-retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import assert = require('assert');
import sinon = require('sinon');
import {StatusObject, status} from 'grpc';
import {StatusObject, status} from '@grpc/grpc-js';
import {PullRetry} from '../src/pull-retry';

describe('PullRetry', () => {
Expand Down
4 changes: 2 additions & 2 deletions test/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import * as pfy from '@google-cloud/promisify';
import * as assert from 'assert';
import {EventEmitter} from 'events';
import {ServiceError} from 'grpc';
import {ServiceError} from '@grpc/grpc-js';
import * as proxyquire from 'proxyquire';
import * as sinon from 'sinon';

Expand Down Expand Up @@ -506,7 +506,7 @@ describe('Subscription', () => {
});

describe('error', () => {
const error = new Error('err');
const error = new Error('err') as ServiceError;

beforeEach(() => {
subscription.request = (config, callback) => {
Expand Down
4 changes: 2 additions & 2 deletions test/topic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import * as pfy from '@google-cloud/promisify';
import * as assert from 'assert';
import {CallOptions} from 'google-gax';
import {ServiceError} from 'grpc';
import {ServiceError} from '@grpc/grpc-js';
import * as proxyquire from 'proxyquire';
import * as sinon from 'sinon';

Expand Down Expand Up @@ -440,7 +440,7 @@ describe('Topic', () => {
});

it('should pass back any errors that occur', done => {
const error = new Error('err');
const error = new Error('err') as ServiceError;
const apiResponse = {};

topic.request = (
Expand Down

0 comments on commit 535a917

Please sign in to comment.