Skip to content

Commit

Permalink
Use http keep-alive in collector exporter (#1661)
Browse files Browse the repository at this point in the history
* chore: use http keep-alive

* chore: rename connectionReuse -> keepAlive

* chore: update proto and node to support keep alive and http agent options

* Apply suggestions from code review

Co-authored-by: Bartlomiej Obecny <bobecny@gmail.com>

* chore: review changes

Co-authored-by: Bartlomiej Obecny <bobecny@gmail.com>
  • Loading branch information
srikanthccv and obecny authored Dec 14, 2020
1 parent d113742 commit e70a7c8
Show file tree
Hide file tree
Showing 14 changed files with 154 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import {
CollectorExporterNodeBase as CollectorExporterBaseMain,
collectorTypes,
CollectorExporterNodeConfigBase,
} from '@opentelemetry/exporter-collector';
import { ServiceClientType } from './types';

Expand Down Expand Up @@ -55,7 +56,7 @@ export abstract class CollectorExporterNodeBase<
this._sendingPromises.push(promise);
}

onInit(config: collectorTypes.CollectorExporterConfigBase): void {
onInit(config: CollectorExporterNodeConfigBase): void {
this._isShutdown = false;
// defer to next tick and lazy load to avoid loading protobufjs too early
// and making this impossible to be instrumented
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import {
collectorTypes,
toCollectorExportMetricServiceRequest,
CollectorExporterNodeConfigBase,
} from '@opentelemetry/exporter-collector';
import { MetricRecord, MetricExporter } from '@opentelemetry/metrics';
import { ServiceClientType } from './types';
Expand Down Expand Up @@ -47,16 +48,14 @@ export class CollectorMetricExporter
);
}

getDefaultUrl(config: collectorTypes.CollectorExporterConfigBase): string {
getDefaultUrl(config: CollectorExporterNodeConfigBase): string {
if (!config.url) {
return DEFAULT_COLLECTOR_URL;
}
return config.url;
}

getDefaultServiceName(
config: collectorTypes.CollectorExporterConfigBase
): string {
getDefaultServiceName(config: CollectorExporterNodeConfigBase): string {
return config.serviceName || DEFAULT_SERVICE_NAME;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { CollectorExporterNodeBase } from './CollectorExporterNodeBase';
import {
collectorTypes,
toCollectorExportTraceServiceRequest,
CollectorExporterNodeConfigBase,
} from '@opentelemetry/exporter-collector';
import { ServiceClientType } from './types';

Expand All @@ -40,16 +41,14 @@ export class CollectorTraceExporter
return toCollectorExportTraceServiceRequest(spans, this);
}

getDefaultUrl(config: collectorTypes.CollectorExporterConfigBase): string {
getDefaultUrl(config: CollectorExporterNodeConfigBase): string {
if (!config.url) {
return DEFAULT_COLLECTOR_URL;
}
return config.url;
}

getDefaultServiceName(
config: collectorTypes.CollectorExporterConfigBase
): string {
getDefaultServiceName(config: CollectorExporterNodeConfigBase): string {
return config.serviceName || DEFAULT_SERVICE_NAME;
}

Expand Down
3 changes: 2 additions & 1 deletion packages/opentelemetry-exporter-collector-proto/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import {
collectorTypes,
sendWithHttp,
CollectorExporterNodeConfigBase,
} from '@opentelemetry/exporter-collector';
import * as path from 'path';

Expand All @@ -33,7 +34,7 @@ export function getExportRequestProto(): Type | undefined {

export function onInit<ExportItem, ServiceRequest>(
collector: CollectorExporterNodeBase<ExportItem, ServiceRequest>,
_config: collectorTypes.CollectorExporterConfigBase
_config: CollectorExporterNodeConfigBase
): void {
const dir = path.resolve(__dirname, '..', 'protos');
const root = new protobufjs.Root();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
* limitations under the License.
*/

import {
collectorTypes,
CollectorExporterNodeConfigBase,
} from '@opentelemetry/exporter-collector';
import * as api from '@opentelemetry/api';
import * as metrics from '@opentelemetry/metrics';
import { collectorTypes } from '@opentelemetry/exporter-collector';
import * as core from '@opentelemetry/core';
import * as http from 'http';
import * as assert from 'assert';
Expand Down Expand Up @@ -48,7 +51,7 @@ const waitTimeMS = 20;

describe('CollectorMetricExporter - node with proto over http', () => {
let collectorExporter: CollectorMetricExporter;
let collectorExporterConfig: collectorTypes.CollectorExporterConfigBase;
let collectorExporterConfig: CollectorExporterNodeConfigBase;
let spyRequest: sinon.SinonSpy;
let spyWrite: sinon.SinonSpy;
let metrics: metrics.MetricRecord[];
Expand All @@ -65,6 +68,8 @@ describe('CollectorMetricExporter - node with proto over http', () => {
serviceName: 'bar',
attributes: {},
url: 'http://foo.bar.com',
keepAlive: true,
httpAgentOptions: { keepAliveMsecs: 2000 },
};
collectorExporter = new CollectorMetricExporter(collectorExporterConfig);
// Overwrites the start time to make tests consistent
Expand Down Expand Up @@ -120,6 +125,19 @@ describe('CollectorMetricExporter - node with proto over http', () => {
}, waitTimeMS);
});

it('should have keep alive and keepAliveMsecs option set', done => {
collectorExporter.export(metrics, () => {});

setTimeout(() => {
const args = spyRequest.args[0];
const options = args[0];
const agent = options.agent;
assert.strictEqual(agent.keepAlive, true);
assert.strictEqual(agent.options.keepAliveMsecs, 2000);
done();
});
});

it('should successfully send metrics', done => {
collectorExporter.export(metrics, () => {});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
* limitations under the License.
*/

import { collectorTypes } from '@opentelemetry/exporter-collector';
import {
collectorTypes,
CollectorExporterNodeConfigBase,
} from '@opentelemetry/exporter-collector';

import * as core from '@opentelemetry/core';
import { ReadableSpan } from '@opentelemetry/tracing';
Expand Down Expand Up @@ -43,7 +46,7 @@ const waitTimeMS = 20;

describe('CollectorTraceExporter - node with proto over http', () => {
let collectorExporter: CollectorTraceExporter;
let collectorExporterConfig: collectorTypes.CollectorExporterConfigBase;
let collectorExporterConfig: CollectorExporterNodeConfigBase;
let spyRequest: sinon.SinonSpy;
let spyWrite: sinon.SinonSpy;
let spans: ReadableSpan[];
Expand All @@ -60,6 +63,8 @@ describe('CollectorTraceExporter - node with proto over http', () => {
serviceName: 'bar',
attributes: {},
url: 'http://foo.bar.com',
keepAlive: true,
httpAgentOptions: { keepAliveMsecs: 2000 },
};
collectorExporter = new CollectorTraceExporter(collectorExporterConfig);
spans = [];
Expand Down Expand Up @@ -95,6 +100,19 @@ describe('CollectorTraceExporter - node with proto over http', () => {
}, waitTimeMS);
});

it('should have keep alive and keepAliveMsecs option set', done => {
collectorExporter.export(spans, () => {});

setTimeout(() => {
const args = spyRequest.args[0];
const options = args[0];
const agent = options.agent;
assert.strictEqual(agent.keepAlive, true);
assert.strictEqual(agent.options.keepAliveMsecs, 2000);
done();
});
});

it('should successfully send the spans', done => {
collectorExporter.export(spans, () => {});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@
* limitations under the License.
*/

import type * as http from 'http';
import type * as https from 'https';

import { CollectorExporterBase } from '../../CollectorExporterBase';
import { CollectorExporterConfigBase } from '../../types';
import { CollectorExporterNodeConfigBase } from './types';
import * as collectorTypes from '../../types';
import { parseHeaders } from '../../util';
import { sendWithHttp } from './util';
Expand All @@ -27,22 +30,35 @@ export abstract class CollectorExporterNodeBase<
ExportItem,
ServiceRequest
> extends CollectorExporterBase<
CollectorExporterConfigBase,
CollectorExporterNodeConfigBase,
ExportItem,
ServiceRequest
> {
DEFAULT_HEADERS: Record<string, string> = {};
headers: Record<string, string>;
constructor(config: CollectorExporterConfigBase = {}) {
keepAlive: boolean = true;
httpAgentOptions: http.AgentOptions | https.AgentOptions = {};
constructor(config: CollectorExporterNodeConfigBase = {}) {
super(config);
if ((config as any).metadata) {
this.logger.warn('Metadata cannot be set when using http');
}
this.headers =
parseHeaders(config.headers, this.logger) || this.DEFAULT_HEADERS;
if (typeof config.keepAlive === 'boolean') {
this.keepAlive = config.keepAlive;
}
if (config.httpAgentOptions) {
if (!this.keepAlive) {
this.logger.warn(
'httpAgentOptions is used only when keepAlive is true'
);
}
this.httpAgentOptions = Object.assign({}, config.httpAgentOptions);
}
}

onInit(_config: CollectorExporterConfigBase): void {
onInit(_config: CollectorExporterNodeConfigBase): void {
this._isShutdown = false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/

import { MetricRecord, MetricExporter } from '@opentelemetry/metrics';
import { CollectorExporterConfigBase } from '../../types';
import * as collectorTypes from '../../types';
import { CollectorExporterNodeConfigBase } from './types';
import { CollectorExporterNodeBase } from './CollectorExporterNodeBase';
import { toCollectorExportMetricServiceRequest } from '../../transformMetrics';

Expand Down Expand Up @@ -45,14 +45,14 @@ export class CollectorMetricExporter
);
}

getDefaultUrl(config: CollectorExporterConfigBase): string {
getDefaultUrl(config: CollectorExporterNodeConfigBase): string {
if (!config.url) {
return DEFAULT_COLLECTOR_URL;
}
return config.url;
}

getDefaultServiceName(config: CollectorExporterConfigBase): string {
getDefaultServiceName(config: CollectorExporterNodeConfigBase): string {
return config.serviceName || DEFAULT_SERVICE_NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/

import { ReadableSpan, SpanExporter } from '@opentelemetry/tracing';
import { CollectorExporterConfigBase } from '../../types';
import { CollectorExporterNodeBase } from './CollectorExporterNodeBase';
import { CollectorExporterNodeConfigBase } from './types';
import * as collectorTypes from '../../types';
import { toCollectorExportTraceServiceRequest } from '../../transform';

Expand All @@ -38,14 +38,14 @@ export class CollectorTraceExporter
return toCollectorExportTraceServiceRequest(spans, this, true);
}

getDefaultUrl(config: CollectorExporterConfigBase): string {
getDefaultUrl(config: CollectorExporterNodeConfigBase): string {
if (!config.url) {
return DEFAULT_COLLECTOR_URL;
}
return config.url;
}

getDefaultServiceName(config: CollectorExporterConfigBase): string {
getDefaultServiceName(config: CollectorExporterNodeConfigBase): string {
return config.serviceName || DEFAULT_SERVICE_NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ export * from './CollectorTraceExporter';
export * from './CollectorMetricExporter';
export * from './CollectorExporterNodeBase';
export * from './util';
export * from './types';
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import type * as http from 'http';
import type * as https from 'https';

import { CollectorExporterConfigBase } from '../../types';

/**
* Collector Exporter node base config
*/
export interface CollectorExporterNodeConfigBase
extends CollectorExporterConfigBase {
keepAlive?: boolean;
httpAgentOptions?: http.AgentOptions | https.AgentOptions;
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export function sendWithHttp<ExportItem, ServiceRequest>(
): void {
const parsedUrl = new url.URL(collector.url);

const options = {
const options: http.RequestOptions | https.RequestOptions = {
hostname: parsedUrl.hostname,
port: parsedUrl.port,
path: parsedUrl.pathname,
Expand All @@ -49,6 +49,14 @@ export function sendWithHttp<ExportItem, ServiceRequest>(
};

const request = parsedUrl.protocol === 'http:' ? http.request : https.request;
const Agent = parsedUrl.protocol === 'http:' ? http.Agent : https.Agent;
if (collector.keepAlive) {
options.agent = new Agent({
...collector.httpAgentOptions,
keepAlive: true,
});
}

const req = request(options, (res: http.IncomingMessage) => {
let data = '';
res.on('data', chunk => (data += chunk));
Expand Down
Loading

0 comments on commit e70a7c8

Please sign in to comment.