Skip to content

Commit

Permalink
Merge branch 'main' into karma-sourcemap
Browse files Browse the repository at this point in the history
  • Loading branch information
legendecas authored Feb 23, 2022
2 parents df9d4e9 + 144e11a commit a903e6b
Show file tree
Hide file tree
Showing 10 changed files with 177 additions and 84 deletions.
17 changes: 7 additions & 10 deletions examples/prometheus/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,13 @@
const { MeterProvider } = require('@opentelemetry/sdk-metrics-base');
const { PrometheusExporter } = require('@opentelemetry/exporter-prometheus');

const exporter = new PrometheusExporter(
{
startServer: true,
},
() => {
console.log(
`prometheus scrape endpoint: http://localhost:${PrometheusExporter.DEFAULT_OPTIONS.port}${PrometheusExporter.DEFAULT_OPTIONS.endpoint}`,
);
},
);
const { endpoint, port } = PrometheusExporter.DEFAULT_OPTIONS;

const exporter = new PrometheusExporter({}, () => {
console.log(
`prometheus scrape endpoint: http://localhost:${port}${endpoint}`,
);
});

const meter = new MeterProvider({
exporter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,13 @@ const { PrometheusExporter } = require('@opentelemetry/exporter-prometheus');
// Optional and only needed to see the internal diagnostic logging (during development)
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG);

const exporter = new PrometheusExporter(
{
startServer: true,
},
() => {
console.log(
`prometheus scrape endpoint: http://localhost:${PrometheusExporter.DEFAULT_OPTIONS.port}${PrometheusExporter.DEFAULT_OPTIONS.endpoint}`,
);
},
);
const { endpoint, port } = PrometheusExporter.DEFAULT_OPTIONS;

const exporter = new PrometheusExporter({}, () => {
console.log(
`prometheus scrape endpoint: http://localhost:${port}${endpoint}`,
);
});

const meter = new MeterProvider({
exporter,
Expand All @@ -34,12 +31,12 @@ meter.createObservableGauge('cpu_core_usage', {

function getAsyncValue() {
return new Promise((resolve) => {
setTimeout(()=> {
setTimeout(() => {
resolve(Math.random());
}, 100);
});
}

setInterval(function(){
setInterval(function () {
console.log("simulating an app being kept open")
}, 5000);
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,34 @@

import { AggregationTemporality } from './AggregationTemporality';
import { MetricData } from './MetricData';
import {
ExportResult,
ExportResultCode,
} from '@opentelemetry/core';


// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#metricexporter

// TODO should this just be an interface and exporters can implement their own shutdown?
export abstract class MetricExporter {
protected _shutdown = false;
export interface PushMetricExporter {

abstract export(batch: MetricData[]): Promise<void>;
export(batch: MetricData[], resultCallback: (result: ExportResult) => void): void;

abstract forceFlush(): Promise<void>;
forceFlush(): Promise<void>;

abstract getPreferredAggregationTemporality(): AggregationTemporality;
getPreferredAggregationTemporality(): AggregationTemporality;

async shutdown(): Promise<void> {
if (this._shutdown) {
return;
}
shutdown(): Promise<void>;

// Setting _shutdown before flushing might prevent some exporters from flushing
// Waiting until flushing is complete might allow another flush to occur during shutdown
const flushPromise = this.forceFlush();
this._shutdown = true;
await flushPromise;
}

isShutdown() {
return this._shutdown;
}
}

export class ConsoleMetricExporter extends MetricExporter {
async export(_batch: MetricData[]) {
throw new Error('Method not implemented');
export class ConsoleMetricExporter implements PushMetricExporter {
protected _shutdown = true;

export(_batch: MetricData[], resultCallback: (result: ExportResult) => void) {
return resultCallback({
code: ExportResultCode.FAILED,
error: new Error('Method not implemented')
});
}

getPreferredAggregationTemporality() {
Expand All @@ -58,4 +52,8 @@ export class ConsoleMetricExporter extends MetricExporter {

// nothing to do
async forceFlush() {}

async shutdown() {
this._shutdown = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
*/

import * as api from '@opentelemetry/api';
import { ExportResultCode, globalErrorHandler } from '@opentelemetry/core';
import { MetricReader } from './MetricReader';
import { MetricExporter } from './MetricExporter';
import { PushMetricExporter } from './MetricExporter';
import { callWithTimeout, TimeoutError } from '../utils';

export type PeriodicExportingMetricReaderOptions = {
exporter: MetricExporter
exporter: PushMetricExporter
exportIntervalMillis?: number,
exportTimeoutMillis?: number
};
Expand All @@ -32,7 +33,7 @@ export type PeriodicExportingMetricReaderOptions = {
export class PeriodicExportingMetricReader extends MetricReader {
private _interval?: ReturnType<typeof setInterval>;

private _exporter: MetricExporter;
private _exporter: PushMetricExporter;

private readonly _exportInterval: number;

Expand Down Expand Up @@ -62,7 +63,20 @@ export class PeriodicExportingMetricReader extends MetricReader {

private async _runOnce(): Promise<void> {
const metrics = await this.collect({});
await this._exporter.export(metrics);
return new Promise((resolve, reject) => {
this._exporter.export(metrics, result => {
if (result.code !== ExportResultCode.SUCCESS) {
reject(
result.error ??
new Error(
`PeriodicExportingMetricReader: metrics export failed (error ${result.error})`
)
);
} else {
resolve();
}
});
});
}

protected override onInitialized(): void {
Expand All @@ -76,7 +90,7 @@ export class PeriodicExportingMetricReader extends MetricReader {
return;
}

api.diag.error('Unexpected error during export: %s', err);
globalErrorHandler(err);
}
}, this._exportInterval);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,45 @@

import { PeriodicExportingMetricReader } from '../../src/export/PeriodicExportingMetricReader';
import { AggregationTemporality } from '../../src/export/AggregationTemporality';
import { MetricExporter } from '../../src';
import { PushMetricExporter } from '../../src';
import { MetricData } from '../../src/export/MetricData';
import * as assert from 'assert';
import * as sinon from 'sinon';
import { MetricProducer } from '../../src/export/MetricProducer';
import { TimeoutError } from '../../src/utils';
import { ExportResult, ExportResultCode } from '@opentelemetry/core';
import { assertRejects } from '../test-utils';

const MAX_32_BIT_INT = 2 ** 31 - 1;

class TestMetricExporter extends MetricExporter {
class TestMetricExporter implements PushMetricExporter {
public exportTime = 0;
public forceFlushTime = 0;
public throwException = false;
public failureResult = false;
private _batches: MetricData[][] = [];
private _shutdown: boolean = false;

async export(batch: MetricData[]): Promise<void> {
export(batch: MetricData[], resultCallback: (result: ExportResult) => void): void {
this._batches.push(batch);

if (this.throwException) {
throw new Error('Error during export');
}
await new Promise(resolve => setTimeout(resolve, this.exportTime));
setTimeout(() => {
if (this.failureResult) {
resultCallback({code: ExportResultCode.FAILED, error: new Error('some error') });
} else {
resultCallback({code: ExportResultCode.SUCCESS });
}
}, this.exportTime);
}

async shutdown(): Promise<void> {
if (this._shutdown) return;
const flushPromise = this.forceFlush();
this._shutdown = true;
await flushPromise;
}

async forceFlush(): Promise<void> {
Expand Down Expand Up @@ -176,6 +192,24 @@ describe('PeriodicExportingMetricReader', () => {
await reader.shutdown();
});

it('should keep running on export failure', async () => {
const exporter = new TestMetricExporter();
exporter.failureResult = true;
const reader = new PeriodicExportingMetricReader({
exporter: exporter,
exportIntervalMillis: 30,
exportTimeoutMillis: 20
});

reader.setMetricProducer(new TestMetricProducer());

const result = await exporter.waitForNumberOfExports(2);
assert.deepStrictEqual(result, [[], []]);

exporter.failureResult = false;
await reader.shutdown();
});

it('should keep exporting on export timeouts', async () => {
const exporter = new TestMetricExporter();
// set time longer than timeout.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,22 @@ import * as sinon from 'sinon';
import { MeterProvider } from '../../src';
import { AggregationTemporality } from '../../src/export/AggregationTemporality';
import { MetricData, PointDataType } from '../../src/export/MetricData';
import { MetricExporter } from '../../src/export/MetricExporter';
import { PushMetricExporter } from '../../src/export/MetricExporter';
import { MeterProviderSharedState } from '../../src/state/MeterProviderSharedState';
import { MetricCollector } from '../../src/state/MetricCollector';
import { defaultInstrumentationLibrary, defaultResource, assertMetricData, assertPointData } from '../util';
import { TestMetricReader } from '../export/TestMetricReader';
import { ExportResult, ExportResultCode } from '@opentelemetry/core';

class TestMetricExporter extends MetricExporter {
class TestMetricExporter implements PushMetricExporter {
metricDataList: MetricData[] = [];
async export(batch: MetricData[]): Promise<void> {
async export(batch: MetricData[], resultCallback: (result: ExportResult) => void): Promise<void> {
this.metricDataList.push(...batch);
resultCallback({code: ExportResultCode.SUCCESS});
}

async shutdown(): Promise<void> {}

async forceFlush(): Promise<void> {}

getPreferredAggregationTemporality(): AggregationTemporality {
Expand Down Expand Up @@ -63,7 +67,8 @@ describe('MetricCollector', () => {
});

describe('collect', () => {
function setupInstruments(exporter: MetricExporter) {

function setupInstruments(exporter: PushMetricExporter) {
const meterProvider = new MeterProvider({ resource: defaultResource });

const reader = new TestMetricReader(exporter.getPreferredAggregationTemporality());
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
"husky": "4.3.8",
"lerna": "3.22.1",
"lerna-changelog": "1.0.1",
"linkinator": "2.16.2",
"linkinator": "3.0.3",
"markdownlint-cli": "0.29.0",
"semver": "7.3.5",
"typedoc": "0.22.10",
Expand Down
34 changes: 11 additions & 23 deletions packages/opentelemetry-sdk-trace-base/src/Tracer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,29 +71,32 @@ export class Tracer implements api.Tracer {
return api.trace.wrapSpanContext(api.INVALID_SPAN_CONTEXT);
}

const parentContext = getParent(options, context);
// remove span from context in case a root span is requested via options
if (options.root) {
context = api.trace.deleteSpan(context);
}

const parentSpanContext = api.trace.getSpanContext(context);
const spanId = this._idGenerator.generateSpanId();
let traceId;
let traceState;
let parentSpanId;
if (!parentContext || !api.trace.isSpanContextValid(parentContext)) {
if (!parentSpanContext || !api.trace.isSpanContextValid(parentSpanContext)) {
// New root span.
traceId = this._idGenerator.generateTraceId();
} else {
// New child span.
traceId = parentContext.traceId;
traceState = parentContext.traceState;
parentSpanId = parentContext.spanId;
traceId = parentSpanContext.traceId;
traceState = parentSpanContext.traceState;
parentSpanId = parentSpanContext.spanId;
}

const spanKind = options.kind ?? api.SpanKind.INTERNAL;
const links = options.links ?? [];
const attributes = sanitizeAttributes(options.attributes);
// make sampling decision
const samplingResult = this._sampler.shouldSample(
options.root
? api.trace.setSpanContext(context, api.INVALID_SPAN_CONTEXT)
: context,
context,
traceId,
name,
spanKind,
Expand Down Expand Up @@ -228,18 +231,3 @@ export class Tracer implements api.Tracer {
return this._tracerProvider.getActiveSpanProcessor();
}
}

/**
* Get the parent to assign to a started span. If options.parent is null,
* do not assign a parent.
*
* @param options span options
* @param context context to check for parent
*/
function getParent(
options: api.SpanOptions,
context: api.Context
): api.SpanContext | undefined {
if (options.root) return undefined;
return api.trace.getSpanContext(context);
}
Loading

0 comments on commit a903e6b

Please sign in to comment.