From 79ab435a980d8e23b56887191fc310e1cd0a6313 Mon Sep 17 00:00:00 2001 From: Kelvin Jin Date: Fri, 4 May 2018 10:10:35 -0700 Subject: [PATCH] fix: adjust async_hooks cls behavior (#734) PR-URL: #734 --- src/cls/async-hooks.ts | 121 ++++++++++++++++++++++++++++++---------- src/cls/base.ts | 3 + test/test-cls-ah.ts | 122 +++++++++++++++++++++++++++++++++++++++++ test/test-cls.ts | 32 ++++------- tsconfig.json | 1 + 5 files changed, 229 insertions(+), 50 deletions(-) create mode 100644 test/test-cls-ah.ts diff --git a/src/cls/async-hooks.ts b/src/cls/async-hooks.ts index 59551ef69..1c2af94c5 100644 --- a/src/cls/async-hooks.ts +++ b/src/cls/async-hooks.ts @@ -14,8 +14,8 @@ * limitations under the License. */ -// This file requires continuation-local-storage in the AsyncHooksCLS -// constructor, rather than upon module load. +// This file calls require('async_hooks') in the AsyncHooksCLS constructor, +// rather than upon module load. import * as asyncHooksModule from 'async_hooks'; import {EventEmitter} from 'events'; import * as shimmer from 'shimmer'; @@ -31,29 +31,63 @@ const EVENT_EMITTER_METHODS: Array = const WRAPPED = Symbol('@google-cloud/trace-agent:AsyncHooksCLS:WRAPPED'); type ContextWrapped = T&{[WRAPPED]?: boolean}; +type Reference = { + value: T +}; /** * An implementation of continuation-local storage on top of the async_hooks * module. */ export class AsyncHooksCLS implements CLS { - private currentContext: {value: Context}; - private contexts: {[id: number]: Context} = {}; + // instance-scope reference to avoid top-level require. + private ah: AsyncHooksModule; + + /** A map of AsyncResource IDs to Context objects. */ + private contexts: {[id: number]: Reference} = {}; + /** The AsyncHook that proactively populates entries in this.contexts. */ private hook: asyncHooksModule.AsyncHook; + /** Whether this instance is enabled. */ private enabled = false; constructor(private readonly defaultContext: Context) { - this.currentContext = {value: this.defaultContext}; - this.hook = (require('async_hooks') as AsyncHooksModule).createHook({ + // Store a reference to the async_hooks module, since we will need to query + // the current AsyncResource ID often. + this.ah = require('async_hooks') as AsyncHooksModule; + + // Create the hook. + this.hook = this.ah.createHook({ init: (id: number, type: string, triggerId: number, resource: {}) => { - this.contexts[id] = this.currentContext.value; - }, - before: (id: number) => { - if (this.contexts[id]) { - this.currentContext.value = this.contexts[id]; + // init is called when a new AsyncResource is created. We want code + // that runs within the scope of this new AsyncResource to see the same + // context as its "parent" AsyncResource. The criteria for the parent + // depends on the type of the AsyncResource. + if (type === 'PROMISE') { + // Opt not to use the trigger ID for Promises, as this causes context + // confusion in applications using async/await. + // Instead, use the ID of the AsyncResource in whose scope we are + // currently running. + this.contexts[id] = this.contexts[this.ah.executionAsyncId()]; + } else { + // Use the trigger ID for any other type. In Node core, this is + // usually equal the ID of the AsyncResource in whose scope we are + // currently running (the "current" AsyncResource), or that of one + // of its ancestors, so the behavior is not expected to be different + // from using the ID of the current AsyncResource instead. + // A divergence is expected only to arise through the user + // AsyncResource API, because users of that API can specify their own + // trigger ID. In this case, we choose to respect the user's + // selection. + this.contexts[id] = this.contexts[triggerId]; } + // Note that this function always assigns values in this.contexts to + // values under other keys, which may or may not be undefined. Consumers + // of the CLS API will get the sentinel (default) value if they query + // the current context when it is stored as undefined. }, destroy: (id: number) => { + // destroy is called when the AsyncResource is no longer used, so also + // delete its entry in the map. delete this.contexts[id]; } }); @@ -64,51 +98,85 @@ export class AsyncHooksCLS implements CLS { } enable(): void { - this.currentContext.value = this.defaultContext; + this.contexts = {}; this.hook.enable(); this.enabled = true; } disable(): void { - this.currentContext.value = this.defaultContext; + this.contexts = {}; this.hook.disable(); this.enabled = false; } getContext(): Context { - return this.currentContext.value; + // We don't store this.defaultContext directly in this.contexts. + // Getting undefined when looking up this.contexts means that it wasn't + // set, so return the default context. + const current = this.contexts[this.ah.executionAsyncId()]; + return current ? current.value : this.defaultContext; } setContext(value: Context): void { - this.currentContext.value = value; + const id = this.ah.executionAsyncId(); + const current = this.contexts[id]; + if (current) { + current.value = value; + } else { + this.contexts[id] = {value}; + } } runWithNewContext(fn: Func): T { - const oldContext = this.currentContext.value; - this.currentContext.value = this.defaultContext; + // Run fn() so that any AsyncResource objects that are created in + // fn will have the context set by this.setContext. + const id = this.ah.executionAsyncId(); + const oldContext = this.contexts[id]; + // Reset the current context. This prevents this.getContext from returning + // a stale value. + this.contexts[id] = {value: this.defaultContext}; try { return fn(); } finally { - this.currentContext.value = oldContext; + // Revert the current context to what it was before any calls to + // this.setContext from within fn. + this.contexts[id] = oldContext; } } bindWithCurrentContext(fn: Func): Func { - if ((fn as ContextWrapped>)[WRAPPED] || !this.currentContext) { + // Return if we have already wrapped the function. + if ((fn as ContextWrapped>)[WRAPPED]) { return fn; } - const current = this.currentContext; - const boundContext = this.currentContext.value; + // Capture the context of the current AsyncResource. + const boundContext = this.contexts[this.ah.executionAsyncId()]; + // Return if there is no current context to bind. + if (!boundContext) { + return fn; + } + const that = this; + // TODO(kjin): This code is somewhat duplicated with runWithNewContext. + // Can we merge this? + // Wrap fn so that any AsyncResource objects that are created in fn will + // share context with that of the AsyncResource with the given ID. const contextWrapper: ContextWrapped> = function(this: {}) { - const oldContext = current.value; - current.value = boundContext; + const id = that.ah.executionAsyncId(); + const oldContext = that.contexts[id]; + // Restore the captured context. + that.contexts[id] = boundContext; try { return fn.apply(this, arguments) as T; } finally { - current.value = oldContext; + // Revert the current context to what it was before it was set to the + // captured context. + that.contexts[id] = oldContext; } }; + // Prevent re-wrapping. contextWrapper[WRAPPED] = true; + // Explicitly inherit the original function's length, because it is + // otherwise zero-ed out. Object.defineProperty(contextWrapper, 'length', { enumerable: false, configurable: true, @@ -118,11 +186,6 @@ export class AsyncHooksCLS implements CLS { return contextWrapper; } - // This function is not technically needed and all tests currently pass - // without it (after removing call sites). While it is not a complete - // solution, restoring correct context before running every request/response - // event handler reduces the number of situations in which userspace queuing - // will cause us to lose context. patchEmitterToPropagateContext(ee: EventEmitter): void { const that = this; EVENT_EMITTER_METHODS.forEach((method) => { diff --git a/src/cls/base.ts b/src/cls/base.ts index 5b4877172..1820dc894 100644 --- a/src/cls/base.ts +++ b/src/cls/base.ts @@ -72,6 +72,7 @@ export interface CLS { /** * Runs the given function as the start of a new continuation. + * TODO(kjin): Merge this with setContext. * @param fn The function to run synchronously. * @returns The return result of running `fn`. */ @@ -82,6 +83,7 @@ export interface CLS { * the CLS implementation's propagating mechanism doesn't automatically do so. * If not called from within a continuation, behavior is implementation- * defined. + * TODO(kjin): Determine a more accurate name for this function. * @param fn The function to bind. * @returns A wrapped version of the given function with the same signature. */ @@ -91,6 +93,7 @@ export interface CLS { * Patches an EventEmitter to lazily bind all future event listeners on this * instance so that they belong in the same continuation as the execution * path in which they were attached to the EventEmitter object. + * TODO(kjin): Determine a more accurate name for this function. * @param ee The EventEmitter to bind. This instance will be mutated. */ patchEmitterToPropagateContext(ee: EventEmitter): void; diff --git a/test/test-cls-ah.ts b/test/test-cls-ah.ts new file mode 100644 index 000000000..f035dc3b4 --- /dev/null +++ b/test/test-cls-ah.ts @@ -0,0 +1,122 @@ +/** + * Copyright 2018 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as assert from 'assert'; +import * as asyncHooksModule from 'async_hooks'; +import {IContextDefinition} from 'mocha'; +import * as semver from 'semver'; + +import {AsyncHooksCLS} from '../src/cls/async-hooks'; + +type AsyncHooksModule = typeof asyncHooksModule; + +const TEST_ASYNC_RESOURCE = '@google-cloud/trace-agent:test'; +const maybeSkip = (describe: IContextDefinition) => + semver.satisfies(process.version, '>=8.1') ? describe : describe.skip; + +maybeSkip(describe)('AsyncHooks-based CLS', () => { + let asyncHooks: AsyncHooksModule; + // tslint:disable-next-line:variable-name + let AsyncResource: typeof asyncHooksModule.AsyncResource; + let cls: AsyncHooksCLS; + + before(() => { + asyncHooks = require('async_hooks') as AsyncHooksModule; + AsyncResource = class extends asyncHooks.AsyncResource { + // tslint:disable:no-any + runInAsyncScope( + fn: (this: This, ...args: any[]) => Result, thisArg?: This): Result { + // tslint:enable:no-any + // Polyfill for versions in which runInAsyncScope isn't defined + if (super.runInAsyncScope) { + return super.runInAsyncScope.apply(this, arguments); + } else { + this.emitBefore(); + try { + return fn.apply( + thisArg, Array.prototype.slice.apply(arguments).slice(2)); + } finally { + this.emitAfter(); + } + } + } + }; + }); + + beforeEach(() => { + cls = new AsyncHooksCLS('default'); + cls.enable(); + }); + + it('Correctly assumes the type of Promise resources', () => { + const actual: Array> = []; + const expected: Array> = []; + const hook = asyncHooks + .createHook({ + init: + (uid: number, type: string, tid: number, + resource: {promise: Promise}) => { + if (type === 'PROMISE') { + actual.push(resource.promise); + } + } + }) + .enable(); + expected.push(Promise.resolve()); + expected.push(actual[0].then(() => {})); + assert.deepStrictEqual(actual, expected); + hook.disable(); + }); + + it('Supports basic context propagation across async-await boundaries', () => { + return cls.runWithNewContext(async () => { + cls.setContext('modified'); + await Promise.resolve(); + assert.strictEqual(cls.getContext(), 'modified'); + await Promise.resolve(); + assert.strictEqual(cls.getContext(), 'modified'); + }); + }); + + describe('Using AsyncResource API', () => { + it('Supports context propagation without trigger ID', async () => { + let res!: asyncHooksModule.AsyncResource; + await cls.runWithNewContext(async () => { + res = new AsyncResource(TEST_ASYNC_RESOURCE); + cls.setContext('modified'); + }); + res.runInAsyncScope(() => { + assert.strictEqual(cls.getContext(), 'modified'); + }); + }); + + it('Supports context propagation with trigger ID', async () => { + let triggerId!: number; + let res!: asyncHooksModule.AsyncResource; + await cls.runWithNewContext(async () => { + triggerId = new AsyncResource(TEST_ASYNC_RESOURCE).asyncId(); + cls.setContext('correct'); + }); + await cls.runWithNewContext(async () => { + res = new AsyncResource(TEST_ASYNC_RESOURCE, triggerId); + cls.setContext('incorrect'); + }); + res.runInAsyncScope(() => { + assert.strictEqual(cls.getContext(), 'correct'); + }); + }); + }); +}); diff --git a/test/test-cls.ts b/test/test-cls.ts index ca820e5e1..2b4cc4437 100644 --- a/test/test-cls.ts +++ b/test/test-cls.ts @@ -71,22 +71,16 @@ describe('Continuation-Local Storage', () => { }); describe('Implementations', () => { - const testCases: Array<{clazz: CLSConstructor, testAsyncAwait: boolean}> = - asyncAwaitSupported ? - [ - {clazz: AsyncHooksCLS, testAsyncAwait: true}, - {clazz: AsyncListenerCLS, testAsyncAwait: false} - ] : - [{clazz: AsyncListenerCLS, testAsyncAwait: false}]; + const testCases: CLSConstructor[] = asyncAwaitSupported ? + [AsyncHooksCLS, AsyncListenerCLS] : + [AsyncListenerCLS]; for (const testCase of testCases) { - describe(`CLS for class ${testCase.clazz.name}`, () => { - const maybeSkip = (it: ITestDefinition) => - testCase.testAsyncAwait ? it : it.skip; + describe(`CLS for class ${testCase.name}`, () => { let c!: CLS; beforeEach(() => { - c = new testCase.clazz('default'); + c = new testCase('default'); c.enable(); }); @@ -157,6 +151,12 @@ describe('Continuation-Local Storage', () => { runLater(); assert.strictEqual(c.getContext(), 'default'); }); + c.runWithNewContext(() => { + c.setContext('modified-but-different'); + // bind it again + runLater = c.bindWithCurrentContext(runLater); + }); + runLater(); }); it('Corrects context when function run with new context throws', () => { @@ -234,16 +234,6 @@ describe('Continuation-Local Storage', () => { }); }); }); - - maybeSkip(it)( - 'Supports basic context propagation across await boundaries', - () => { - return c.runWithNewContext(async () => { - c.setContext('modified'); - await Promise.resolve(); - assert.strictEqual(c.getContext(), 'modified'); - }); - }); }); } }); diff --git a/tsconfig.json b/tsconfig.json index 439a87f2d..814401e6e 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -24,6 +24,7 @@ "test/logger.ts", "test/nocks.ts", "test/test-cls.ts", + "test/test-cls-ah.ts", "test/test-config-cls.ts", "test/test-config-credentials.ts", "test/test-config-plugins.ts",