Skip to content

Commit

Permalink
feat: support child spans with tail latencies (#913)
Browse files Browse the repository at this point in the history
  • Loading branch information
kjin authored Feb 6, 2019
1 parent f88cdd4 commit d1de959
Show file tree
Hide file tree
Showing 8 changed files with 261 additions and 95 deletions.
19 changes: 10 additions & 9 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,6 @@ export interface Config {
*/
stackTraceLimit?: number;

/**
* Buffer the captured traces for `flushDelaySeconds` seconds before
* publishing to the trace API, unless the buffer fills up first.
* Also see `bufferSize`.
*/
flushDelaySeconds?: number;

/**
* URLs that partially match any regex in ignoreUrls will not be traced.
* In addition, URLs that are _exact matches_ of strings in ignoreUrls will
Expand Down Expand Up @@ -171,8 +164,16 @@ export interface Config {
contextHeaderBehavior?: ContextHeaderBehavior;

/**
* The number of transactions we buffer before we publish to the trace
* API, unless `flushDelaySeconds` seconds have elapsed first.
* Buffer the captured traces for `flushDelaySeconds` seconds before
* publishing to the Stackdriver Trace API, unless the buffer fills up first.
* Also see `bufferSize`.
*/
flushDelaySeconds?: number;

/**
* The number of spans in buffered traces needed to trigger a publish of all
* traces to the Stackdriver Trace API, unless `flushDelaySeconds` seconds
* has elapsed first.
*/
bufferSize?: number;

Expand Down
45 changes: 41 additions & 4 deletions src/span-data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ import * as crypto from 'crypto';
import * as util from 'util';

import {Constants, SpanType} from './constants';
import * as types from './plugin-types';
import {Span, SpanOptions} from './plugin-types';
import {RootSpan, Span, SpanOptions} from './plugin-types';
import {SpanKind, Trace, TraceSpan} from './trace';
import {TraceLabels} from './trace-labels';
import {traceWriter} from './trace-writer';
Expand Down Expand Up @@ -104,6 +103,9 @@ export abstract class BaseSpanData implements Span {
}

endSpan(timestamp?: Date) {
if (!!this.span.endTime) {
return;
}
timestamp = timestamp || new Date();
this.span.endTime = timestamp.toISOString();
}
Expand All @@ -112,8 +114,11 @@ export abstract class BaseSpanData implements Span {
/**
* Represents a real root span, which corresponds to an incoming request.
*/
export class RootSpanData extends BaseSpanData implements types.RootSpan {
export class RootSpanData extends BaseSpanData implements RootSpan {
readonly type = SpanType.ROOT;
// Locally-tracked list of children. Used only to determine, once this span
// ends, whether a child still needs to be published.
private children: ChildSpanData[] = [];

constructor(
trace: Trace, spanName: string, parentSpanId: string,
Expand All @@ -125,16 +130,30 @@ export class RootSpanData extends BaseSpanData implements types.RootSpan {
createChildSpan(options?: SpanOptions): Span {
options = options || {name: ''};
const skipFrames = options.skipFrames ? options.skipFrames + 1 : 1;
return new ChildSpanData(
const child = new ChildSpanData(
this.trace, /* Trace object */
options.name, /* Span name */
this.span.spanId, /* Parent's span ID */
skipFrames); /* # of frames to skip in stack trace */
this.children.push(child);
return child;
}

endSpan(timestamp?: Date) {
if (!!this.span.endTime) {
return;
}
super.endSpan(timestamp);
traceWriter.get().writeTrace(this.trace);
this.children.forEach(child => {
if (!child.span.endTime) {
// Child hasn't ended yet.
// Inform the child that it needs to self-publish.
child.shouldSelfPublish = true;
}
});
// We no longer need to keep track of our children.
this.children = [];
}
}

Expand All @@ -143,13 +162,31 @@ export class RootSpanData extends BaseSpanData implements types.RootSpan {
*/
export class ChildSpanData extends BaseSpanData {
readonly type = SpanType.CHILD;
// Whether this span should publish itself. This is meant to be set to true
// by the parent RootSpanData.
shouldSelfPublish = false;

constructor(
trace: Trace, spanName: string, parentSpanId: string,
skipFrames: number) {
super(trace, spanName, parentSpanId, skipFrames);
this.span.kind = SpanKind.RPC_CLIENT;
}

endSpan(timestamp?: Date) {
if (!!this.span.endTime) {
return;
}
super.endSpan(timestamp);
if (this.shouldSelfPublish) {
// Also, publish just this span.
traceWriter.get().writeTrace({
projectId: this.trace.projectId,
traceId: this.trace.traceId,
spans: [this.span]
});
}
}
}

// Helper function to generate static virtual trace spans.
Expand Down
128 changes: 81 additions & 47 deletions src/trace-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/

import * as common from '@google-cloud/common';
import {AxiosError} from 'axios';
import * as gcpMetadata from 'gcp-metadata';
import {OutgoingHttpHeaders} from 'http';
import * as os from 'os';
Expand Down Expand Up @@ -55,12 +54,50 @@ export interface LabelObject {
[key: string]: string;
}

export class TraceBuffer {
/**
* Buffered traces.
*/
private traces: Trace[] = [];
/**
* Number of buffered spans; this number must be at least as large as
* buffer.length.
*/
private numSpans = 0;

/**
* Add a new trace to the buffer.
* @param trace The trace to add.
*/
add(trace: Trace) {
this.traces.push(trace);
this.numSpans += trace.spans.length;
}

/**
* Gets the number of spans contained within buffered traces.
*/
getNumSpans() {
return this.numSpans;
}

/**
* Clears the buffer, returning its original contents.
*/
drain(): Trace[] {
const result = this.traces;
this.traces = [];
this.numSpans = 0;
return result;
}
}

/**
* A class representing a service that publishes traces in the background.
*/
export class TraceWriter extends common.Service {
/** Stringified traces to be published */
buffer: string[];
/** Traces to be published */
protected buffer: TraceBuffer;
/** Default labels to be attached to written spans */
defaultLabels: LabelObject;
/** Reference to global unhandled exception handler */
Expand Down Expand Up @@ -89,7 +126,7 @@ export class TraceWriter extends common.Service {
config);

this.logger = logger;
this.buffer = [];
this.buffer = new TraceBuffer();
this.defaultLabels = {};

this.isActive = true;
Expand Down Expand Up @@ -216,54 +253,31 @@ export class TraceWriter extends common.Service {
}

/**
* Ensures that all sub spans of the provided Trace object are
* closed and then queues the span data to be published.
* Queues a trace to be published. Spans with no end time are excluded.
*
* @param trace The trace to be queued.
*/
writeTrace(trace: Trace) {
for (const span of trace.spans) {
if (span.endTime === '') {
span.endTime = (new Date()).toISOString();
}
}
const publishableSpans = trace.spans.filter(span => !!span.endTime);

trace.spans.forEach(spanData => {
publishableSpans.forEach(spanData => {
if (spanData.kind === SpanKind.RPC_SERVER) {
// Copy properties from the default labels.
Object.assign(spanData.labels, this.defaultLabels);
}
});

const afterProjectId = (projectId: string) => {
trace.projectId = projectId;
this.buffer.push(JSON.stringify(trace));
this.logger.info(
`TraceWriter#writeTrace: buffer.size = ${this.buffer.length}`);

// Publish soon if the buffer is getting big
if (this.buffer.length >= this.config.bufferSize) {
this.logger.info(
'TraceWriter#writeTrace: Trace buffer full, flushing.');
setImmediate(() => this.flushBuffer());
}
};

// TODO(kjin): We should always be following the 'else' path.
// Any test that doesn't mock the Trace Writer will assume that traces get
// buffered synchronously. We need to refactor those tests to remove that
// assumption before we can make this fix.
if (this.projectId !== NO_PROJECT_ID_TOKEN) {
afterProjectId(this.projectId);
} else {
this.getProjectId().then(afterProjectId, (err: Error) => {
// Because failing to get a project ID means that the trace agent will
// get disabled, there is a very small window for this code path to be
// taken. For this reason we don't do anything more complex than just
// notifying that we are dropping the current trace.
this.logger.info(
'TraceWriter#queueTrace: No project ID, dropping trace.');
});
this.buffer.add({
traceId: trace.traceId,
projectId: trace.projectId,
spans: publishableSpans
});
this.logger.info(`TraceWriter#writeTrace: number of buffered spans = ${
this.buffer.getNumSpans()}`);
// Publish soon if the buffer is getting big
if (this.buffer.getNumSpans() >= this.config.bufferSize) {
this.logger.info('TraceWriter#writeTrace: Trace buffer full, flushing.');
setImmediate(() => this.flushBuffer());
}
}

Expand Down Expand Up @@ -292,15 +306,35 @@ export class TraceWriter extends common.Service {
* Serializes the buffered traces to be published asynchronously.
*/
private flushBuffer() {
if (this.buffer.length === 0) {
// Privatize and clear the buffer.
const flushedTraces = this.buffer.drain();
if (flushedTraces.length === 0) {
return;
}

// Privatize and clear the buffer.
const buffer = this.buffer;
this.buffer = [];
this.logger.debug('TraceWriter#flushBuffer: Flushing traces', buffer);
this.publish(`{"traces":[${buffer.join()}]}`);
const afterProjectId = (projectId: string) => {
flushedTraces.forEach(trace => trace.projectId = projectId);
this.logger.debug(
'TraceWriter#flushBuffer: Flushing traces', flushedTraces);
this.publish(JSON.stringify({traces: flushedTraces}));
};

// TODO(kjin): We should always be following the 'else' path.
// Any test that doesn't mock the Trace Writer will assume that traces get
// buffered synchronously. We need to refactor those tests to remove that
// assumption before we can make this fix.
if (this.projectId !== NO_PROJECT_ID_TOKEN) {
afterProjectId(this.projectId);
} else {
this.getProjectId().then(afterProjectId, (err: Error) => {
// Because failing to get a project ID means that the trace agent will
// get disabled, there is a very small window for this code path to be
// taken. For this reason we don't do anything more complex than just
// notifying that we are dropping the current traces.
this.logger.info(
'TraceWriter#flushBuffer: No project ID, dropping traces.');
});
}
}

/**
Expand Down
5 changes: 3 additions & 2 deletions test/plugins/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ function replaceWarnLogger(fn) {
* Cleans the tracer state between test runs.
*/
function cleanTraces() {
traceWriter.get().buffer = [];
traceWriter.get()['buffer'].drain();
}

function getTraces() {
return traceWriter.get().buffer.map(buffer => JSON.parse(buffer));
return traceWriter.get()['buffer']['traces'];
}

function getMatchingSpan(predicate) {
Expand Down Expand Up @@ -138,6 +138,7 @@ function createChildSpan(cb, duration) {
assert.ok(span);
var t = setTimeout(function() {
assert.strictEqual(span.type, SpanType.CHILD);
span.endSpan();
if (cb) {
cb();
}
Expand Down
2 changes: 2 additions & 0 deletions test/plugins/test-trace-http2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ maybeSkipHttp2('Trace Agent integration with http2', () => {
assert.ok(rootSpan.type === SpanType.ROOT);
const session = http2.connect(`http://localhost:${serverPort}`);
const s = session.request({':path': '/?foo=bar'});
s.on('data', () => {}); // enter flowing mode
s.end();
setTimeout(() => {
rootSpan.endSpan();
Expand All @@ -225,6 +226,7 @@ maybeSkipHttp2('Trace Agent integration with http2', () => {
assert.ok(rootSpan.type === SpanType.ROOT);
const session = http2.connect(`http://localhost:${serverPort}`);
const s = session.request({':path': '/'});
s.on('data', () => {}); // enter flowing mode
s.end();
setTimeout(() => {
rootSpan.endSpan();
Expand Down
Loading

0 comments on commit d1de959

Please sign in to comment.