From d1de959405e91b8a6bc0d55f93b1ad7d6bb90e73 Mon Sep 17 00:00:00 2001 From: Kelvin Jin Date: Tue, 5 Feb 2019 16:59:32 -0800 Subject: [PATCH] feat: support child spans with tail latencies (#913) --- src/config.ts | 19 ++-- src/span-data.ts | 45 ++++++++- src/trace-writer.ts | 128 ++++++++++++++++---------- test/plugins/common.ts | 5 +- test/plugins/test-trace-http2.ts | 2 + test/test-span-data.ts | 51 ++++++++++ test/test-trace-uncaught-exception.ts | 2 +- test/test-trace-writer.ts | 104 ++++++++++++++------- 8 files changed, 261 insertions(+), 95 deletions(-) diff --git a/src/config.ts b/src/config.ts index b59b6e7db..58ae1abad 100644 --- a/src/config.ts +++ b/src/config.ts @@ -115,13 +115,6 @@ export interface Config { */ stackTraceLimit?: number; - /** - * Buffer the captured traces for `flushDelaySeconds` seconds before - * publishing to the trace API, unless the buffer fills up first. - * Also see `bufferSize`. - */ - flushDelaySeconds?: number; - /** * URLs that partially match any regex in ignoreUrls will not be traced. * In addition, URLs that are _exact matches_ of strings in ignoreUrls will @@ -171,8 +164,16 @@ export interface Config { contextHeaderBehavior?: ContextHeaderBehavior; /** - * The number of transactions we buffer before we publish to the trace - * API, unless `flushDelaySeconds` seconds have elapsed first. + * Buffer the captured traces for `flushDelaySeconds` seconds before + * publishing to the Stackdriver Trace API, unless the buffer fills up first. + * Also see `bufferSize`. + */ + flushDelaySeconds?: number; + + /** + * The number of spans in buffered traces needed to trigger a publish of all + * traces to the Stackdriver Trace API, unless `flushDelaySeconds` seconds + * has elapsed first. */ bufferSize?: number; diff --git a/src/span-data.ts b/src/span-data.ts index a9cbc2432..ac74e01a2 100644 --- a/src/span-data.ts +++ b/src/span-data.ts @@ -18,8 +18,7 @@ import * as crypto from 'crypto'; import * as util from 'util'; import {Constants, SpanType} from './constants'; -import * as types from './plugin-types'; -import {Span, SpanOptions} from './plugin-types'; +import {RootSpan, Span, SpanOptions} from './plugin-types'; import {SpanKind, Trace, TraceSpan} from './trace'; import {TraceLabels} from './trace-labels'; import {traceWriter} from './trace-writer'; @@ -104,6 +103,9 @@ export abstract class BaseSpanData implements Span { } endSpan(timestamp?: Date) { + if (!!this.span.endTime) { + return; + } timestamp = timestamp || new Date(); this.span.endTime = timestamp.toISOString(); } @@ -112,8 +114,11 @@ export abstract class BaseSpanData implements Span { /** * Represents a real root span, which corresponds to an incoming request. */ -export class RootSpanData extends BaseSpanData implements types.RootSpan { +export class RootSpanData extends BaseSpanData implements RootSpan { readonly type = SpanType.ROOT; + // Locally-tracked list of children. Used only to determine, once this span + // ends, whether a child still needs to be published. + private children: ChildSpanData[] = []; constructor( trace: Trace, spanName: string, parentSpanId: string, @@ -125,16 +130,30 @@ export class RootSpanData extends BaseSpanData implements types.RootSpan { createChildSpan(options?: SpanOptions): Span { options = options || {name: ''}; const skipFrames = options.skipFrames ? options.skipFrames + 1 : 1; - return new ChildSpanData( + const child = new ChildSpanData( this.trace, /* Trace object */ options.name, /* Span name */ this.span.spanId, /* Parent's span ID */ skipFrames); /* # of frames to skip in stack trace */ + this.children.push(child); + return child; } endSpan(timestamp?: Date) { + if (!!this.span.endTime) { + return; + } super.endSpan(timestamp); traceWriter.get().writeTrace(this.trace); + this.children.forEach(child => { + if (!child.span.endTime) { + // Child hasn't ended yet. + // Inform the child that it needs to self-publish. + child.shouldSelfPublish = true; + } + }); + // We no longer need to keep track of our children. + this.children = []; } } @@ -143,6 +162,9 @@ export class RootSpanData extends BaseSpanData implements types.RootSpan { */ export class ChildSpanData extends BaseSpanData { readonly type = SpanType.CHILD; + // Whether this span should publish itself. This is meant to be set to true + // by the parent RootSpanData. + shouldSelfPublish = false; constructor( trace: Trace, spanName: string, parentSpanId: string, @@ -150,6 +172,21 @@ export class ChildSpanData extends BaseSpanData { super(trace, spanName, parentSpanId, skipFrames); this.span.kind = SpanKind.RPC_CLIENT; } + + endSpan(timestamp?: Date) { + if (!!this.span.endTime) { + return; + } + super.endSpan(timestamp); + if (this.shouldSelfPublish) { + // Also, publish just this span. + traceWriter.get().writeTrace({ + projectId: this.trace.projectId, + traceId: this.trace.traceId, + spans: [this.span] + }); + } + } } // Helper function to generate static virtual trace spans. diff --git a/src/trace-writer.ts b/src/trace-writer.ts index 6c49124f6..938fd855e 100644 --- a/src/trace-writer.ts +++ b/src/trace-writer.ts @@ -15,7 +15,6 @@ */ import * as common from '@google-cloud/common'; -import {AxiosError} from 'axios'; import * as gcpMetadata from 'gcp-metadata'; import {OutgoingHttpHeaders} from 'http'; import * as os from 'os'; @@ -55,12 +54,50 @@ export interface LabelObject { [key: string]: string; } +export class TraceBuffer { + /** + * Buffered traces. + */ + private traces: Trace[] = []; + /** + * Number of buffered spans; this number must be at least as large as + * buffer.length. + */ + private numSpans = 0; + + /** + * Add a new trace to the buffer. + * @param trace The trace to add. + */ + add(trace: Trace) { + this.traces.push(trace); + this.numSpans += trace.spans.length; + } + + /** + * Gets the number of spans contained within buffered traces. + */ + getNumSpans() { + return this.numSpans; + } + + /** + * Clears the buffer, returning its original contents. + */ + drain(): Trace[] { + const result = this.traces; + this.traces = []; + this.numSpans = 0; + return result; + } +} + /** * A class representing a service that publishes traces in the background. */ export class TraceWriter extends common.Service { - /** Stringified traces to be published */ - buffer: string[]; + /** Traces to be published */ + protected buffer: TraceBuffer; /** Default labels to be attached to written spans */ defaultLabels: LabelObject; /** Reference to global unhandled exception handler */ @@ -89,7 +126,7 @@ export class TraceWriter extends common.Service { config); this.logger = logger; - this.buffer = []; + this.buffer = new TraceBuffer(); this.defaultLabels = {}; this.isActive = true; @@ -216,54 +253,31 @@ export class TraceWriter extends common.Service { } /** - * Ensures that all sub spans of the provided Trace object are - * closed and then queues the span data to be published. + * Queues a trace to be published. Spans with no end time are excluded. * * @param trace The trace to be queued. */ writeTrace(trace: Trace) { - for (const span of trace.spans) { - if (span.endTime === '') { - span.endTime = (new Date()).toISOString(); - } - } + const publishableSpans = trace.spans.filter(span => !!span.endTime); - trace.spans.forEach(spanData => { + publishableSpans.forEach(spanData => { if (spanData.kind === SpanKind.RPC_SERVER) { // Copy properties from the default labels. Object.assign(spanData.labels, this.defaultLabels); } }); - const afterProjectId = (projectId: string) => { - trace.projectId = projectId; - this.buffer.push(JSON.stringify(trace)); - this.logger.info( - `TraceWriter#writeTrace: buffer.size = ${this.buffer.length}`); - - // Publish soon if the buffer is getting big - if (this.buffer.length >= this.config.bufferSize) { - this.logger.info( - 'TraceWriter#writeTrace: Trace buffer full, flushing.'); - setImmediate(() => this.flushBuffer()); - } - }; - - // TODO(kjin): We should always be following the 'else' path. - // Any test that doesn't mock the Trace Writer will assume that traces get - // buffered synchronously. We need to refactor those tests to remove that - // assumption before we can make this fix. - if (this.projectId !== NO_PROJECT_ID_TOKEN) { - afterProjectId(this.projectId); - } else { - this.getProjectId().then(afterProjectId, (err: Error) => { - // Because failing to get a project ID means that the trace agent will - // get disabled, there is a very small window for this code path to be - // taken. For this reason we don't do anything more complex than just - // notifying that we are dropping the current trace. - this.logger.info( - 'TraceWriter#queueTrace: No project ID, dropping trace.'); - }); + this.buffer.add({ + traceId: trace.traceId, + projectId: trace.projectId, + spans: publishableSpans + }); + this.logger.info(`TraceWriter#writeTrace: number of buffered spans = ${ + this.buffer.getNumSpans()}`); + // Publish soon if the buffer is getting big + if (this.buffer.getNumSpans() >= this.config.bufferSize) { + this.logger.info('TraceWriter#writeTrace: Trace buffer full, flushing.'); + setImmediate(() => this.flushBuffer()); } } @@ -292,15 +306,35 @@ export class TraceWriter extends common.Service { * Serializes the buffered traces to be published asynchronously. */ private flushBuffer() { - if (this.buffer.length === 0) { + // Privatize and clear the buffer. + const flushedTraces = this.buffer.drain(); + if (flushedTraces.length === 0) { return; } - // Privatize and clear the buffer. - const buffer = this.buffer; - this.buffer = []; - this.logger.debug('TraceWriter#flushBuffer: Flushing traces', buffer); - this.publish(`{"traces":[${buffer.join()}]}`); + const afterProjectId = (projectId: string) => { + flushedTraces.forEach(trace => trace.projectId = projectId); + this.logger.debug( + 'TraceWriter#flushBuffer: Flushing traces', flushedTraces); + this.publish(JSON.stringify({traces: flushedTraces})); + }; + + // TODO(kjin): We should always be following the 'else' path. + // Any test that doesn't mock the Trace Writer will assume that traces get + // buffered synchronously. We need to refactor those tests to remove that + // assumption before we can make this fix. + if (this.projectId !== NO_PROJECT_ID_TOKEN) { + afterProjectId(this.projectId); + } else { + this.getProjectId().then(afterProjectId, (err: Error) => { + // Because failing to get a project ID means that the trace agent will + // get disabled, there is a very small window for this code path to be + // taken. For this reason we don't do anything more complex than just + // notifying that we are dropping the current traces. + this.logger.info( + 'TraceWriter#flushBuffer: No project ID, dropping traces.'); + }); + } } /** diff --git a/test/plugins/common.ts b/test/plugins/common.ts index 6d78eb7d5..99b5b40b9 100644 --- a/test/plugins/common.ts +++ b/test/plugins/common.ts @@ -94,11 +94,11 @@ function replaceWarnLogger(fn) { * Cleans the tracer state between test runs. */ function cleanTraces() { - traceWriter.get().buffer = []; + traceWriter.get()['buffer'].drain(); } function getTraces() { - return traceWriter.get().buffer.map(buffer => JSON.parse(buffer)); + return traceWriter.get()['buffer']['traces']; } function getMatchingSpan(predicate) { @@ -138,6 +138,7 @@ function createChildSpan(cb, duration) { assert.ok(span); var t = setTimeout(function() { assert.strictEqual(span.type, SpanType.CHILD); + span.endSpan(); if (cb) { cb(); } diff --git a/test/plugins/test-trace-http2.ts b/test/plugins/test-trace-http2.ts index 88dad69d0..005caaa2c 100644 --- a/test/plugins/test-trace-http2.ts +++ b/test/plugins/test-trace-http2.ts @@ -206,6 +206,7 @@ maybeSkipHttp2('Trace Agent integration with http2', () => { assert.ok(rootSpan.type === SpanType.ROOT); const session = http2.connect(`http://localhost:${serverPort}`); const s = session.request({':path': '/?foo=bar'}); + s.on('data', () => {}); // enter flowing mode s.end(); setTimeout(() => { rootSpan.endSpan(); @@ -225,6 +226,7 @@ maybeSkipHttp2('Trace Agent integration with http2', () => { assert.ok(rootSpan.type === SpanType.ROOT); const session = http2.connect(`http://localhost:${serverPort}`); const s = session.request({':path': '/'}); + s.on('data', () => {}); // enter flowing mode s.end(); setTimeout(() => { rootSpan.endSpan(); diff --git a/test/test-span-data.ts b/test/test-span-data.ts index 1a1b45570..4583f62da 100644 --- a/test/test-span-data.ts +++ b/test/test-span-data.ts @@ -182,6 +182,13 @@ describe('SpanData', () => { } myFunction(); }); + + it(`doesn't call TraceWriter#writeTrace when ended`, () => { + const spanData = new CommonSpanData(trace, 'name', '0', 0); + spanData.endSpan(); + // writeTrace writes to capturedTrace. + assert.ok(!capturedTrace); + }); }); describe('RootSpanData', () => { @@ -198,5 +205,49 @@ describe('SpanData', () => { rootSpanData.endSpan(); assert.strictEqual(capturedTrace, rootSpanData.trace); }); + + it(`doesn't write to a Trace Writer more than once`, () => { + const rootSpanData = new RootSpanData(trace, 'root', '0', 0); + rootSpanData.endSpan(); + assert.strictEqual(capturedTrace, rootSpanData.trace); + capturedTrace = null; + rootSpanData.endSpan(); + assert.ok(!capturedTrace); + }); + + it('if already ended, allows open child spans to publish themselves later', + () => { + const rootSpanData = new RootSpanData(trace, 'root', '0', 0); + const firstChildSpanData = rootSpanData.createChildSpan( + {name: 'short-child'}) as ChildSpanData; + const secondChildSpanData = rootSpanData.createChildSpan( + {name: 'long-child'}) as ChildSpanData; + // End the first child span. + firstChildSpanData.endSpan(); + // End the root span. Note that the second child span hasn't ended yet. + rootSpanData.endSpan(); + // writeTrace should've been called from rootSpanData.endSpan. + assert.ok(capturedTrace); + // Save the value of capturedTrace, and then clear it, so writeTrace + // doesn't fail an assertion. + const firstTrace = capturedTrace!; + capturedTrace = null; + // Now end the second child span. This should trigger another call to + // writeTrace. + secondChildSpanData.endSpan(); + // writeTrace should've been called again, this time from + // childSpanData.endSpan. + assert.ok(capturedTrace); + assert.strictEqual(firstTrace.traceId, capturedTrace!.traceId); + // The child span should've written a trace with only itself as a span. + assert.strictEqual(capturedTrace!.spans.length, 1); + assert.strictEqual(capturedTrace!.spans[0], secondChildSpanData.span); + // Ensure that calling endSpan on a span that already ended doesn't + // do anything. + capturedTrace = null; + firstChildSpanData.endSpan(); + secondChildSpanData.endSpan(); + assert.ok(!capturedTrace); + }); }); }); diff --git a/test/test-trace-uncaught-exception.ts b/test/test-trace-uncaught-exception.ts index 5398d66a9..46a47cac8 100644 --- a/test/test-trace-uncaught-exception.ts +++ b/test/test-trace-uncaught-exception.ts @@ -60,7 +60,7 @@ describe('Trace Writer', () => { super.writeTrace(trace); // Since flushBuffer doesn't call publish unless a trace is buffered, // do that as well - this.buffer.push(JSON.stringify(trace)); + this.buffer.add(trace); } protected publish(json: string) { diff --git a/test/test-trace-writer.ts b/test/test-trace-writer.ts index 7df1ad103..1af92d0ef 100644 --- a/test/test-trace-writer.ts +++ b/test/test-trace-writer.ts @@ -28,7 +28,7 @@ import * as shimmer from 'shimmer'; import {SpanKind, Trace} from '../src/trace'; import {TraceLabels} from '../src/trace-labels'; -import {TraceWriter, TraceWriterConfig} from '../src/trace-writer'; +import {TraceBuffer, TraceWriter, TraceWriterConfig} from '../src/trace-writer'; import {TestLogger} from './logger'; import {hostname, instanceId, oauth2} from './nocks'; @@ -63,18 +63,20 @@ function mockNoMetadata() { }; } -function createDummyTrace(): Trace { +let traceIdHighWatermark = 0; +function createDummyTrace(numSpans: number): Trace { + const time = new Date().toString(); return { projectId: '', - traceId: '', - spans: [{ - labels: {}, - startTime: '', - endTime: '', - kind: SpanKind.RPC_SERVER, - name: '', - spanId: '' - }] + traceId: `${traceIdHighWatermark++}`, + spans: new Array(numSpans).fill(null).map(_ => ({ + labels: {}, + startTime: time, + endTime: time, + kind: SpanKind.RPC_SERVER, + name: '', + spanId: '' + })) }; } @@ -313,7 +315,7 @@ describe('Trace Writer', () => { const writer = new MockedRequestTraceWriter( Object.assign({}, DEFAULT_CONFIG, {bufferSize: 1}), logger); await writer.initialize(); - writer.writeTrace(createDummyTrace()); + writer.writeTrace(createDummyTrace(1)); // TraceWriter#publish should be called soon // (Promise task queue drain + immediate). await wait(200); @@ -329,25 +331,63 @@ describe('Trace Writer', () => { writer.stop(); }); + it(`doesn't enqueue open spans`, async () => { + const NUM_SPANS = 5; + const writer = new MockedRequestTraceWriter( + Object.assign({}, DEFAULT_CONFIG, {bufferSize: NUM_SPANS}), logger); + await writer.initialize(); + const trace = createDummyTrace(NUM_SPANS); + // By setting the endTime to a falsey value, we're specifying that they + // have yet to end. + trace.spans.forEach(span => span.endTime = ''); + // Write the trace. No spans should've been written, so a publish + // shouldn't occur. + writer.writeTrace(trace); + await wait(200); + // Didn't publish yet + assert.ok(!capturedRequestOptions); + // "End" the spans. + trace.spans.forEach(span => span.endTime = new Date().toString()); + writer.writeTrace(trace); + await wait(200); + // Published, so look at capturedRequestOptions + const publishedTraces: Trace[] = + JSON.parse(capturedRequestOptions!.body).traces; + // We should observe that two traces were published. One has no spans, + // the other one has NUM_SPANS spans. + assert.strictEqual(publishedTraces.length, 2); + assert.strictEqual(publishedTraces[0].spans.length, 0); + assert.strictEqual(publishedTraces[1].spans.length, NUM_SPANS); + writer.stop(); + }); + describe('condition for publishing traces', () => { - it('is satisfied when the buffer is full', async () => { - const NUM_SPANS = 5; - const writer = new MockedRequestTraceWriter( - Object.assign({}, DEFAULT_CONFIG, {bufferSize: NUM_SPANS}), logger); - await writer.initialize(); - writer.writeTrace(createDummyTrace()); - await wait(200); - // Didn't publish yet - assert.ok(!capturedRequestOptions); - for (let i = 1; i < NUM_SPANS; i++) { - writer.writeTrace(createDummyTrace()); - } - await wait(200); - const publishedTraces: Trace[] = - JSON.parse(capturedRequestOptions!.body).traces; - assert.strictEqual(publishedTraces.length, NUM_SPANS); - writer.stop(); - }); + it('is satisfied when the number of enqueued spans >= bufferSize', + async () => { + const NUM_SPANS = 5; + const writer = new MockedRequestTraceWriter( + Object.assign({}, DEFAULT_CONFIG, {bufferSize: NUM_SPANS}), + logger); + await writer.initialize(); + // Write a trace with a number of spans less than NUM_SPANS. A + // publish shouldn't occur. + writer.writeTrace(createDummyTrace(NUM_SPANS - 1)); + await wait(200); + // Didn't publish yet + assert.ok(!capturedRequestOptions); + // Write another trace with one span to push us over the limit. + writer.writeTrace(createDummyTrace(1)); + await wait(200); + // Published, so look at capturedRequestOptions + const publishedTraces: Trace[] = + JSON.parse(capturedRequestOptions!.body).traces; + assert.strictEqual(publishedTraces.length, 2); + // Count number of spans. It should be NUM_SPANS. + assert.strictEqual(publishedTraces.reduce((spanCount, trace) => { + return spanCount + trace.spans.length; + }, 0), NUM_SPANS); + writer.stop(); + }); it('is satisfied periodically', async () => { const writer = new MockedRequestTraceWriter( @@ -355,7 +395,7 @@ describe('Trace Writer', () => { await writer.initialize(); // Two rounds to ensure that it's periodical for (let round = 0; round < 2; round++) { - writer.writeTrace(createDummyTrace()); + writer.writeTrace(createDummyTrace(1)); await wait(500); // Didn't publish yet assert.ok(!capturedRequestOptions); @@ -372,7 +412,7 @@ describe('Trace Writer', () => { const writer = new MockedRequestTraceWriter( Object.assign({}, DEFAULT_CONFIG, {bufferSize: 1}), logger); await writer.initialize(); - writer.writeTrace(createDummyTrace()); + writer.writeTrace(createDummyTrace(1)); await wait(200); assert.strictEqual( logger.getNumLogsWith('error', 'TraceWriter#publish'), 1);