diff --git a/src/plugins/plugin-pg.ts b/src/plugins/plugin-pg.ts index a0d7d49a9..2f645cd94 100644 --- a/src/plugins/plugin-pg.ts +++ b/src/plugins/plugin-pg.ts @@ -16,23 +16,18 @@ import {EventEmitter} from 'events'; import * as shimmer from 'shimmer'; -import {Readable} from 'stream'; -import {Patch, Plugin, Span} from '../plugin-types'; +import {Patch, Plugin, Span, Tracer} from '../plugin-types'; import {pg_6, pg_7} from './types'; // TS: Client#query also accepts a callback as a last argument, but TS cannot // detect this as it's a dependent type. So we don't specify it here. type ClientQueryArguments = - [{submit?: Function} & pg_7.QueryConfig]|[string]|[string, {}]; + [Submittable & pg_7.QueryConfig]|[string]|[string, {}]; type PG7QueryReturnValue = (pg_7.QueryConfig&({submit: Function}&EventEmitter)| pg_7.Query)|Promise; - -// tslint:disable-next-line:no-any -function isSubmittable(obj: any): obj is {submit: Function} { - return typeof obj.submit === 'function'; -} +type Callback = (err: Error|null, res?: T) => void; const noOp = () => {}; @@ -66,6 +61,103 @@ function populateLabelsFromOutputs( } } +/** + * Partial shape of objects returned by Client#query. Only contains methods that + * are significant to the query lifecycle. + */ +type Submittable = { + // Called when the query is completed. + handleReadyForQuery: () => void; + // Called when an error occurs. + handleError: () => void; + // A field that is populated when the Submittable is a Query object. + _result?: pg_7.QueryResult; +}; + +/** + * Utility class to help organize patching logic. + */ +class PostgresPatchUtility { + readonly maybePopulateLabelsFromInputs: typeof populateLabelsFromInputs; + readonly maybePopulateLabelsFromOutputs: typeof populateLabelsFromOutputs; + + constructor(private readonly tracer: Tracer) { + this.maybePopulateLabelsFromInputs = + tracer.enhancedDatabaseReportingEnabled() ? populateLabelsFromInputs : + noOp; + this.maybePopulateLabelsFromOutputs = + tracer.enhancedDatabaseReportingEnabled() ? populateLabelsFromOutputs : + noOp; + } + + patchSubmittable(pgQuery: Submittable, span: Span): Submittable { + let spanEnded = false; + const {maybePopulateLabelsFromOutputs} = this; + if (pgQuery.handleError) { + shimmer.wrap(pgQuery, 'handleError', (origCallback) => { + // Elements of args are not individually accessed. + // tslint:disable:no-any + return this.tracer.wrap(function( + this: Submittable, ...args: any[]): void { + // tslint:enable:no-any + if (!spanEnded) { + const err: Error = args[0]; + maybePopulateLabelsFromOutputs(span, err); + span.endSpan(); + spanEnded = true; + } + if (origCallback) { + origCallback.apply(this, args); + } + }); + }); + } + if (pgQuery.handleReadyForQuery) { + shimmer.wrap(pgQuery, 'handleReadyForQuery', (origCallback) => { + // Elements of args are not individually accessed. + // tslint:disable:no-any + return this.tracer.wrap(function( + this: Submittable, ...args: any[]): void { + // tslint:enable:no-any + if (!spanEnded) { + maybePopulateLabelsFromOutputs(span, null, this._result); + span.endSpan(); + spanEnded = true; + } + if (origCallback) { + origCallback.apply(this, args); + } + }); + }); + } + return pgQuery; + } + + patchCallback(callback: Callback, span: Span): + Callback { + return this.tracer.wrap((err: Error|null, res?: pg_7.QueryResult) => { + this.maybePopulateLabelsFromOutputs(span, err, res); + span.endSpan(); + callback(err, res); + }); + } + + patchPromise(promise: Promise, span: Span): + Promise { + return promise = promise.then( + (res) => { + this.maybePopulateLabelsFromOutputs(span, null, res); + span.endSpan(); + return res; + }, + (err) => { + this.maybePopulateLabelsFromOutputs(span, err); + span.endSpan(); + throw err; + }); + } +} + const plugin: Plugin = [ { file: 'lib/client.js', @@ -73,39 +165,42 @@ const plugin: Plugin = [ // TS: Client is a class name. // tslint:disable-next-line:variable-name patch: (Client, api) => { - const maybePopulateLabelsFromInputs = - api.enhancedDatabaseReportingEnabled() ? populateLabelsFromInputs : - noOp; - const maybePopulateLabelsFromOutputs = - api.enhancedDatabaseReportingEnabled() ? populateLabelsFromOutputs : - noOp; + const pgPatch = new PostgresPatchUtility(api); + shimmer.wrap(Client.prototype, 'query', (query) => { - return function query_trace(this: pg_6.Client) { - const span = api.createChildSpan({name: 'pg-query'}); - if (!api.isRealSpan(span)) { - return query.apply(this, arguments); - } - const argLength = arguments.length; - if (argLength >= 1) { - const args: ClientQueryArguments = - Array.prototype.slice.call(arguments, 0); + // Every call to Client#query will have a Submittable object associated + // with it. We need to patch two handlers (handleReadyForQuery and + // handleError) to end a span. + // There are a few things to note here: + // * query accepts a Submittable or a string. A Query is a Submittable. + // So if we can get a Submittable from the input we patch it + // proactively, otherwise (in the case of a string) we patch the + // output Query instead. + // * If query is passed a callback, the callback will be invoked from + // either handleReadyForQuery or handleError. So we don't need to + // separately patch the callback. + return function query_trace( + this: pg_6.Client, ...args: ClientQueryArguments) { + if (args.length >= 1) { + const span = api.createChildSpan({name: 'pg-query'}); + if (!api.isRealSpan(span)) { + return query.apply(this, args); + } // Extract query text and values, if needed. - maybePopulateLabelsFromInputs(span, args); + pgPatch.maybePopulateLabelsFromInputs(span, args); + if (typeof args[0] === 'object') { + pgPatch.patchSubmittable(args[0], span); + return query.apply(this, args); + } else { + return pgPatch.patchSubmittable( + query.apply(this, args) as Submittable, span); + } + } else { + // query was called with no arguments. + // This doesn't make sense, but don't do anything that might cause + // an error to get thrown here, or a span to be started. + return query.apply(this, args); } - const pgQuery: pg_6.QueryReturnValue = query.apply(this, arguments); - api.wrapEmitter(pgQuery); - const done = pgQuery.callback; - // TODO(kjin): Clean up this line a little bit by casting the function - // passed to api.wrap as a NonNullable. - pgQuery.callback = - api.wrap((err: Error|null, res?: pg_7.QueryResult) => { - maybePopulateLabelsFromOutputs(span, err, res); - span.endSpan(); - if (done) { - done(err, res); - } - }); - return pgQuery; }; }); }, @@ -121,12 +216,7 @@ const plugin: Plugin = [ // TS: Client is a class name. // tslint:disable-next-line:variable-name patch: (Client, api) => { - const maybePopulateLabelsFromInputs = - api.enhancedDatabaseReportingEnabled() ? populateLabelsFromInputs : - noOp; - const maybePopulateLabelsFromOutputs = - api.enhancedDatabaseReportingEnabled() ? populateLabelsFromOutputs : - noOp; + const pgPatch = new PostgresPatchUtility(api); shimmer.wrap(Client.prototype, 'query', (query) => { return function query_trace(this: pg_7.Client) { const span = api.createChildSpan({name: 'pg-query'}); @@ -151,25 +241,18 @@ const plugin: Plugin = [ Array.prototype.slice.call(arguments, 0); // Extract query text and values, if needed. - maybePopulateLabelsFromInputs(span, args); + pgPatch.maybePopulateLabelsFromInputs(span, args); // If we received a callback, bind it to the current context, // optionally adding labels as well. const callback = args[args.length - 1]; if (typeof callback === 'function') { - args[args.length - 1] = - api.wrap((err: Error|null, res?: pg_7.QueryResult) => { - maybePopulateLabelsFromOutputs(span, err, res); - span.endSpan(); - // TS: Type cast is safe as we know that callback is a - // Function. - (callback as (err: Error|null, res?: pg_7.QueryResult) => - void)(err, res); - }); - pgQuery = query.apply(this, args); - } else { - pgQuery = query.apply(this, arguments); + args[args.length - 1] = pgPatch.patchCallback( + callback as Callback, span); + } else if (typeof args[0] === 'object') { + pgPatch.patchSubmittable(args[0] as Submittable, span); } + pgQuery = query.apply(this, args); } else { pgQuery = query.apply(this, arguments); } @@ -178,19 +261,10 @@ const plugin: Plugin = [ if (pgQuery instanceof EventEmitter) { api.wrapEmitter(pgQuery); } else if (typeof pgQuery.then === 'function') { - // Ensure that the span is ended, optionally adding labels as - // well. - pgQuery = pgQuery.then( - (res) => { - maybePopulateLabelsFromOutputs(span, null, res); - span.endSpan(); - return res; - }, - (err) => { - maybePopulateLabelsFromOutputs(span, err); - span.endSpan(); - throw err; - }); + // Unlike in pg 6, the returned value can't be both a Promise and + // a Submittable. So we don't run the risk of double-patching + // here. + pgPatch.patchPromise(pgQuery, span); } } return pgQuery; diff --git a/test/plugins/test-trace-pg.ts b/test/plugins/test-trace-pg.ts index ecbc16bb0..0223cf5d1 100644 --- a/test/plugins/test-trace-pg.ts +++ b/test/plugins/test-trace-pg.ts @@ -46,22 +46,20 @@ pgVersions.forEach(pgVersion => { client = c; releaseClient = release; assert(!err); - client.query('CREATE TABLE t (name text NOT NULL, id text NOT NULL)', [], + client.query('DROP TABLE t', [], function(err, res) { + assert(!err || err.code == '42P01'); // table "t" does not exist + client.query('CREATE TABLE t (name text NOT NULL, id text NOT NULL)', [], function(err, res) { assert(!err); - common.cleanTraces(); done(); }); + }); }); }); - afterEach(function(done) { - client.query('DROP TABLE t', [], function(err, res) { - assert(!err); - releaseClient(); - common.cleanTraces(); - done(); - }); + afterEach(() => { + releaseClient(); + common.cleanTraces(); }); it('should perform basic operations', function(done) { @@ -167,19 +165,18 @@ pgVersions.forEach(pgVersion => { it('should work with generic Submittables', function(done) { common.runInTransaction(function(endRootSpan) { - let submitCalled = false; client.query({ submit: (connection) => { // Indicate that the next item may be processed. connection.emit('readyForQuery'); - submitCalled = true; + }, + handleReadyForQuery: () => { endRootSpan(); common.getMatchingSpan(function (span) { return span.name === 'pg-query'; }); done(); - }, - handleReadyForQuery: () => {} + } }); }); });