Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(opentelemetry): Bucket spans for cleanup #14154

Merged
merged 9 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions packages/node/test/integration/transactions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -595,8 +595,14 @@ describe('Integration | Transactions', () => {

jest.advanceTimersByTime(1);

// Child-spans have been added to the exporter, but they are pending since they are waiting for their parant
expect(exporter['_finishedSpans'].length).toBe(2);
// Child-spans have been added to the exporter, but they are pending since they are waiting for their parent
const finishedSpans1 = [];
exporter['_finishedSpanBuckets'].forEach((bucket: any) => {
if (bucket) {
finishedSpans1.push(...bucket.spans);
}
});
expect(finishedSpans1.length).toBe(2);
expect(beforeSendTransaction).toHaveBeenCalledTimes(0);

// Now wait for 5 mins
Expand All @@ -608,18 +614,21 @@ describe('Integration | Transactions', () => {
jest.advanceTimersByTime(1);

// Old spans have been cleared away
expect(exporter['_finishedSpans'].length).toBe(0);
const finishedSpans2 = [];
exporter['_finishedSpanBuckets'].forEach((bucket: any) => {
if (bucket) {
finishedSpans2.push(...bucket.spans);
}
});
expect(finishedSpans2.length).toBe(0);

// Called once for the 'other span'
expect(beforeSendTransaction).toHaveBeenCalledTimes(1);

expect(logs).toEqual(
expect.arrayContaining([
'SpanExporter has 1 unsent spans remaining',
'SpanExporter has 2 unsent spans remaining',
'SpanExporter exported 1 spans, 2 unsent spans remaining',
`SpanExporter dropping span inner span 1 (${innerSpan1Id}) because it is pending for more than 5 minutes.`,
`SpanExporter dropping span inner span 2 (${innerSpan2Id}) because it is pending for more than 5 minutes.`,
'SpanExporter dropped 2 spans because they were pending for more than 300 seconds.',
'SpanExporter exported 1 spans, 0 unsent spans remaining',
]),
);
});
Expand Down
148 changes: 85 additions & 63 deletions packages/opentelemetry/src/spanExporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,60 +35,105 @@ type SpanNodeCompleted = SpanNode & { span: ReadableSpan };
const MAX_SPAN_COUNT = 1000;
const DEFAULT_TIMEOUT = 300; // 5 min

interface FinishedSpanBucket {
timestampInS: number;
spans: Set<ReadableSpan>;
}

/**
* A Sentry-specific exporter that converts OpenTelemetry Spans to Sentry Spans & Transactions.
*/
export class SentrySpanExporter {
private _flushTimeout: ReturnType<typeof setTimeout> | undefined;
private _finishedSpans: ReadableSpan[];
private _timeout: number;

public constructor(options?: { timeout?: number }) {
this._finishedSpans = [];
this._timeout = options?.timeout || DEFAULT_TIMEOUT;
// private _finishedSpans: ReadableSpan[];
lforst marked this conversation as resolved.
Show resolved Hide resolved
private _finishedSpanBuckets: (FinishedSpanBucket | undefined)[];
mydea marked this conversation as resolved.
Show resolved Hide resolved
private _finishedSpanBucketSize: number;
private _spansToBucketEntry: WeakMap<ReadableSpan, FinishedSpanBucket>;
private _lastCleanupTimestampInS: number;

public constructor(options?: {
/** Lower bound of time in seconds until spans that are buffered but have not been sent as part of a transaction get cleared from memory. */
timeout?: number;
}) {
this._finishedSpanBucketSize = options?.timeout || DEFAULT_TIMEOUT;
this._finishedSpanBuckets = new Array(this._finishedSpanBucketSize).fill(undefined);
this._lastCleanupTimestampInS = Math.floor(Date.now() / 1000);
this._spansToBucketEntry = new WeakMap();
}

/** Export a single span. */
public export(span: ReadableSpan): void {
this._finishedSpans.push(span);

// If the span has a local parent ID, we don't need to export anything just yet
if (getLocalParentId(span)) {
const openSpanCount = this._finishedSpans.length;
DEBUG_BUILD && logger.log(`SpanExporter has ${openSpanCount} unsent spans remaining`);
this._cleanupOldSpans();
return;
const currentTimestampInS = Math.floor(Date.now() / 1000);

if (this._lastCleanupTimestampInS !== currentTimestampInS) {
let droppedSpanCount = 0;
this._finishedSpanBuckets.forEach((bucket, i) => {
if (bucket && bucket.timestampInS <= currentTimestampInS - this._finishedSpanBucketSize) {
droppedSpanCount += bucket.spans.size;
this._finishedSpanBuckets[i] = undefined;
}
});
if (droppedSpanCount > 0) {
DEBUG_BUILD &&
logger.log(
`SpanExporter dropped ${droppedSpanCount} spans because they were pending for more than ${this._finishedSpanBucketSize} seconds.`,
);
}
this._lastCleanupTimestampInS = currentTimestampInS;
}

this._clearTimeout();

// If we got a parent span, we try to send the span tree
// Wait a tick for this, to ensure we avoid race conditions
this._flushTimeout = setTimeout(() => {
this.flush();
}, 1);
const currentBucketIndex = currentTimestampInS % this._finishedSpanBucketSize;
const currentBucket = this._finishedSpanBuckets[currentBucketIndex] || {
timestampInS: currentTimestampInS,
spans: new Set(),
};
this._finishedSpanBuckets[currentBucketIndex] = currentBucket;
currentBucket.spans.add(span);
this._spansToBucketEntry.set(span, currentBucket);

// If the span doesn't have a local parent ID (it's a root span), we're gonna flush all the ended spans
if (!getLocalParentId(span)) {
this._clearTimeout();

// If we got a parent span, we try to send the span tree
// Wait a tick for this, to ensure we avoid race conditions
this._flushTimeout = setTimeout(() => {
this.flush();
}, 1);
}
}

/** Try to flush any pending spans immediately. */
public flush(): void {
this._clearTimeout();

const openSpanCount = this._finishedSpans.length;
const finishedSpans: ReadableSpan[] = [];
this._finishedSpanBuckets.forEach(bucket => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

l: Can we instead add a get _finishSpans(): ReadableSpan[] getter on the exporter class and use this? This makes it easier to test this as well as we can just access this in tests too? 🤔

Copy link
Member

@lforst lforst Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will not add a public function for this. This is an implementation detail that should not be exposed. I can however extract this functionality into a private fn, which I also do not particularly like since we do this operation exactly one time (apart from in tests, which is whatever).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I do not mean a public function, but just making it a private method on the class, then it is also not exposed but we can use it in tests. right now we duplicate the logic in the test so if that runs apart we may not notice (it's really a nit so feel free to disregard). In the tests we could then do, instead of:

const finishedSpans1 = [];
    exporter['_finishedSpanBuckets'].forEach((bucket: any) => {
      if (bucket) {
        finishedSpans1.push(...bucket.spans);
      }
    });

just

const finishedSpans1 = exporter['_finishedSpans'];

or something like this.

if (bucket) {
finishedSpans.push(...bucket.spans);
}
});

const sentSpans = maybeSend(finishedSpans);

const remainingSpans = maybeSend(this._finishedSpans);
const sentSpanCount = sentSpans.size;

const remainingOpenSpanCount = remainingSpans.length;
const sentSpanCount = openSpanCount - remainingOpenSpanCount;
const remainingOpenSpanCount = finishedSpans.length - sentSpanCount;

DEBUG_BUILD &&
logger.log(`SpanExporter exported ${sentSpanCount} spans, ${remainingOpenSpanCount} unsent spans remaining`);

this._cleanupOldSpans(remainingSpans);
sentSpans.forEach(span => {
const bucketEntry = this._spansToBucketEntry.get(span);
if (bucketEntry) {
bucketEntry.spans.delete(span);
}
});
}

/** Clear the exporter. */
public clear(): void {
this._finishedSpans = [];
this._finishedSpanBuckets = this._finishedSpanBuckets.fill(undefined);
this._clearTimeout();
}

Expand All @@ -99,52 +144,33 @@ export class SentrySpanExporter {
this._flushTimeout = undefined;
}
}

/**
* Remove any span that is older than 5min.
* We do this to avoid leaking memory.
*/
private _cleanupOldSpans(spans = this._finishedSpans): void {
const currentTimeSeconds = Date.now() / 1000;
this._finishedSpans = spans.filter(span => {
const shouldDrop = shouldCleanupSpan(span, currentTimeSeconds, this._timeout);
DEBUG_BUILD &&
shouldDrop &&
logger.log(
`SpanExporter dropping span ${span.name} (${
span.spanContext().spanId
}) because it is pending for more than 5 minutes.`,
);
return !shouldDrop;
});
}
}

/**
* Send the given spans, but only if they are part of a finished transaction.
*
* Returns the unsent spans.
* Returns the sent spans.
* Spans remain unsent when their parent span is not yet finished.
* This will happen regularly, as child spans are generally finished before their parents.
* But it _could_ also happen because, for whatever reason, a parent span was lost.
* In this case, we'll eventually need to clean this up.
*/
function maybeSend(spans: ReadableSpan[]): ReadableSpan[] {
function maybeSend(spans: ReadableSpan[]): Set<ReadableSpan> {
const grouped = groupSpansWithParents(spans);
const remaining = new Set(grouped);
const sentSpans = new Set<ReadableSpan>();

const rootNodes = getCompletedRootNodes(grouped);

rootNodes.forEach(root => {
remaining.delete(root);
const span = root.span;
sentSpans.add(span);
const transactionEvent = createTransactionForOtelSpan(span);

// We'll recursively add all the child spans to this array
const spans = transactionEvent.spans || [];

root.children.forEach(child => {
createAndFinishSpanForOtelSpan(child, spans, remaining);
createAndFinishSpanForOtelSpan(child, spans, sentSpans);
});

// spans.sort() mutates the array, but we do not use this anymore after this point
Expand All @@ -162,9 +188,7 @@ function maybeSend(spans: ReadableSpan[]): ReadableSpan[] {
captureEvent(transactionEvent);
});

return Array.from(remaining)
.map(node => node.span)
.filter((span): span is ReadableSpan => !!span);
return sentSpans;
}

function nodeIsCompletedRootNode(node: SpanNode): node is SpanNodeCompleted {
Expand All @@ -175,11 +199,6 @@ function getCompletedRootNodes(nodes: SpanNode[]): SpanNodeCompleted[] {
return nodes.filter(nodeIsCompletedRootNode);
}

function shouldCleanupSpan(span: ReadableSpan, currentTimeSeconds: number, maxStartTimeOffsetSeconds: number): boolean {
const cutoff = currentTimeSeconds - maxStartTimeOffsetSeconds;
return spanTimeInputToSeconds(span.startTime) < cutoff;
}

function parseSpan(span: ReadableSpan): { op?: string; origin?: SpanOrigin; source?: TransactionSource } {
const attributes = span.attributes;

Expand Down Expand Up @@ -260,16 +279,19 @@ function createTransactionForOtelSpan(span: ReadableSpan): TransactionEvent {
return transactionEvent;
}

function createAndFinishSpanForOtelSpan(node: SpanNode, spans: SpanJSON[], remaining: Set<SpanNode>): void {
remaining.delete(node);
function createAndFinishSpanForOtelSpan(node: SpanNode, spans: SpanJSON[], sentSpans: Set<ReadableSpan>): void {
const span = node.span;

if (span) {
sentSpans.add(span);
}

const shouldDrop = !span;

// If this span should be dropped, we still want to create spans for the children of this
if (shouldDrop) {
node.children.forEach(child => {
createAndFinishSpanForOtelSpan(child, spans, remaining);
createAndFinishSpanForOtelSpan(child, spans, sentSpans);
});
return;
}
Expand Down Expand Up @@ -308,7 +330,7 @@ function createAndFinishSpanForOtelSpan(node: SpanNode, spans: SpanJSON[], remai
spans.push(spanJSON);

node.children.forEach(child => {
createAndFinishSpanForOtelSpan(child, spans, remaining);
createAndFinishSpanForOtelSpan(child, spans, sentSpans);
});
}

Expand Down
55 changes: 34 additions & 21 deletions packages/opentelemetry/test/integration/transactions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -460,24 +460,22 @@ describe('Integration | Transactions', () => {
throw new Error('No exporter found, aborting test...');
}

let innerSpan1Id: string | undefined;
let innerSpan2Id: string | undefined;

void startSpan({ name: 'test name' }, async () => {
const subSpan = startInactiveSpan({ name: 'inner span 1' });
innerSpan1Id = subSpan.spanContext().spanId;
subSpan.end();

startSpan({ name: 'inner span 2' }, innerSpan => {
innerSpan2Id = innerSpan.spanContext().spanId;
});
startInactiveSpan({ name: 'inner span 1' }).end();
startInactiveSpan({ name: 'inner span 2' }).end();

// Pretend this is pending for 10 minutes
await new Promise(resolve => setTimeout(resolve, 10 * 60 * 1000));
});

// Child-spans have been added to the exporter, but they are pending since they are waiting for their parant
expect(exporter['_finishedSpans'].length).toBe(2);
// Child-spans have been added to the exporter, but they are pending since they are waiting for their parent
const finishedSpans1 = [];
exporter['_finishedSpanBuckets'].forEach(bucket => {
if (bucket) {
finishedSpans1.push(...bucket.spans);
}
});
expect(finishedSpans1.length).toBe(2);
expect(beforeSendTransaction).toHaveBeenCalledTimes(0);

// Now wait for 5 mins
Expand All @@ -489,18 +487,21 @@ describe('Integration | Transactions', () => {
jest.advanceTimersByTime(1);

// Old spans have been cleared away
expect(exporter['_finishedSpans'].length).toBe(0);
const finishedSpans2 = [];
exporter['_finishedSpanBuckets'].forEach(bucket => {
if (bucket) {
finishedSpans2.push(...bucket.spans);
}
});
expect(finishedSpans2.length).toBe(0);

// Called once for the 'other span'
expect(beforeSendTransaction).toHaveBeenCalledTimes(1);

expect(logs).toEqual(
expect.arrayContaining([
'SpanExporter has 1 unsent spans remaining',
'SpanExporter has 2 unsent spans remaining',
'SpanExporter exported 1 spans, 2 unsent spans remaining',
`SpanExporter dropping span inner span 1 (${innerSpan1Id}) because it is pending for more than 5 minutes.`,
`SpanExporter dropping span inner span 2 (${innerSpan2Id}) because it is pending for more than 5 minutes.`,
'SpanExporter dropped 2 spans because they were pending for more than 300 seconds.',
'SpanExporter exported 1 spans, 0 unsent spans remaining',
]),
);
});
Expand Down Expand Up @@ -553,7 +554,13 @@ describe('Integration | Transactions', () => {
expect(transactions[0]?.spans).toHaveLength(2);

// No spans are pending
expect(exporter['_finishedSpans'].length).toBe(0);
const finishedSpans = [];
exporter['_finishedSpanBuckets'].forEach(bucket => {
if (bucket) {
finishedSpans.push(...bucket.spans);
}
});
expect(finishedSpans.length).toBe(0);
});

it('discards child spans that are finished after their parent span', async () => {
Expand Down Expand Up @@ -607,8 +614,14 @@ describe('Integration | Transactions', () => {
expect(transactions[0]?.spans).toHaveLength(1);

// subSpan2 is pending (and will eventually be cleaned up)
expect(exporter['_finishedSpans'].length).toBe(1);
expect(exporter['_finishedSpans'][0]?.name).toBe('inner span 2');
const finishedSpans: any = [];
exporter['_finishedSpanBuckets'].forEach(bucket => {
if (bucket) {
finishedSpans.push(...bucket.spans);
}
});
expect(finishedSpans.length).toBe(1);
expect(finishedSpans[0]?.name).toBe('inner span 2');
});

it('uses & inherits DSC on span trace state', async () => {
Expand Down