Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(opentelemetry-sampler-aws-xray): add rule cache and rule matching #1463

Closed
1 change: 1 addition & 0 deletions packages/opentelemetry-sampler-aws-xray/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
"@opentelemetry/core": "^1.8.0",
"@opentelemetry/sdk-trace-base": "^1.8.0",
"@opentelemetry/semantic-conventions": "^1.0.0",
"@opentelemetry/resources": "^1.12.0",
"axios": "^1.3.5"
},
"devDependencies": {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import {
Sampler,
SamplingResult,
AlwaysOnSampler,
} from '@opentelemetry/sdk-trace-base';
import { Context, Link, Attributes, SpanKind } from '@opentelemetry/api';

export class AwsXrayFallbackSampler implements Sampler {
private _alwaysOnSampler = new AlwaysOnSampler();

shouldSample(
context: Context,
traceId: string,
spanName: string,
spanKind: SpanKind,
attributes: Attributes,
links: Link[]
): SamplingResult {
// This will be updated to be a rate limiting sampler in the next PR
return this._alwaysOnSampler.shouldSample();
}

public toString = (): string => {
return 'AwsXrayFallbackSampler';
};
}
83 changes: 63 additions & 20 deletions packages/opentelemetry-sampler-aws-xray/src/remote-sampler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,20 @@
* limitations under the License.
*/

import { Sampler, SamplingResult } from '@opentelemetry/sdk-trace-base';
import {
Sampler,
SamplingDecision,
SamplingResult,
} from '@opentelemetry/sdk-trace-base';
import { diag, DiagLogger } from '@opentelemetry/api';
import {
SamplingRule,
AWSXRaySamplerConfig,
SamplingRuleRecord,
} from './types';
diag,
DiagLogger,
Context,
Link,
Attributes,
SpanKind,
} from '@opentelemetry/api';
import { SamplingRule } from './sampling-rule';
import { AWSXRaySamplerConfig, SamplingRuleRecord } from './types';
import axios from 'axios';
import { RuleCache } from './rule-cache';
import { AwsXrayFallbackSampler } from './aws-xray-fallback-sampler';

// 5 minute interval on sampling rules fetch (default polling interval)
const DEFAULT_INTERVAL_MS = 5 * 60 * 1000;
Expand All @@ -37,30 +39,73 @@ const SAMPLING_RULES_PATH = '/GetSamplingRules';
export class AWSXRayRemoteSampler implements Sampler {
private _pollingInterval: number;
private _awsProxyEndpoint: string;
private _samplerDiag: DiagLogger;
private _ruleCache: RuleCache;
private _fallBackSampler: AwsXrayFallbackSampler;
private samplerDiag: DiagLogger;

constructor(samplerConfig: AWSXRaySamplerConfig) {
this._pollingInterval =
samplerConfig.pollingIntervalMs ?? DEFAULT_INTERVAL_MS;
this._awsProxyEndpoint = samplerConfig.endpoint
? samplerConfig.endpoint
: DEFAULT_AWS_PROXY_ENDPOINT;
this._fallBackSampler = new AwsXrayFallbackSampler();

if (this._pollingInterval <= 0) {
throw new TypeError('pollingInterval must be a positive integer');
}

this._samplerDiag = diag.createComponentLogger({
this.samplerDiag = diag.createComponentLogger({
namespace: '@opentelemetry/sampler-aws-xray',
});

this._ruleCache = new RuleCache(samplerConfig.resource);

// execute first get Sampling rules update using polling interval
this.startRulePoller();
}

shouldSample(): SamplingResult {
// Implementation to be added
return { decision: SamplingDecision.NOT_RECORD };
shouldSample(
context: Context,
traceId: string,
spanName: string,
spanKind: SpanKind,
attributes: Attributes,
links: Link[]
): SamplingResult {
if (this._ruleCache.isExpired()) {
// go to fallback sampler .shouldSample
return this._fallBackSampler.shouldSample(
context,
traceId,
spanName,
spanKind,
attributes,
links
);
}

const matchedRule = this._ruleCache.getMatchedRule(attributes);

// TODO: update after verifying if default rule will always match,
// this means that this method will always return return { decision: matchedRule.sample(attributes) }
// as long as the rule cache has not expired.
if (matchedRule) {
return { decision: matchedRule.sample(attributes) };
}

this.samplerDiag.warn(
'Using fallback sampler as no rule match was found. This is likely due to a bug, since default rule should always match'
);

return this._fallBackSampler.shouldSample(
carolabadeer marked this conversation as resolved.
Show resolved Hide resolved
context,
traceId,
spanName,
spanKind,
attributes,
links
);
}

toString(): string {
Expand Down Expand Up @@ -88,14 +133,12 @@ export class AWSXRayRemoteSampler implements Sampler {

samplingRules =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can declare the samplingRule as const here instead of updating a let variable from above. This is more idiomatic IMO

responseJson?.SamplingRuleRecords.map(
(record: SamplingRuleRecord) => record.SamplingRule
(record: SamplingRuleRecord) => new SamplingRule(record.SamplingRule)
).filter(Boolean) ?? [];

// TODO: pass samplingRules to rule cache, temporarily logging the samplingRules array
this._samplerDiag.debug('sampling rules: ', samplingRules);
this._ruleCache.updateRules(samplingRules);
} catch (error) {
// Log error
this._samplerDiag.warn('Error fetching sampling rules: ', error);
this.samplerDiag.warn('Error fetching sampling rules: ', error);
}
};

Expand Down
24 changes: 24 additions & 0 deletions packages/opentelemetry-sampler-aws-xray/src/reservoir.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

export class Reservoir {
// This class will be implemented in the next PR
private _quota: number;

constructor(quota: number) {
this._quota = quota;
}
}
90 changes: 90 additions & 0 deletions packages/opentelemetry-sampler-aws-xray/src/rule-cache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { SamplingRule } from './sampling-rule';
import { Attributes } from '@opentelemetry/api';
import { Resource } from '@opentelemetry/resources';

const RuleCacheTTL_SECONDS = 60 * 60; // The cache expires 1 hour after the last refresh time. (in seconds)

export class RuleCache {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use the term Cache? What exactly is it caching?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a cache that holds the sampling rules that are not expired. This is the terminology we use in X-Ray for it

public rules: SamplingRule[];
private lastUpdatedEpoch: number | undefined = undefined;
private samplerResource: Resource;

constructor(samplerResource: Resource) {
this.rules = [];
this.samplerResource = samplerResource;
}

public isExpired = (): boolean => {
if (this.lastUpdatedEpoch === undefined) {
return true;
}

const now = Math.floor(new Date().getTime() / 1000); // getTime returns milliseconds -> /1000 gives seconds
return now > this.lastUpdatedEpoch + RuleCacheTTL_SECONDS;
};

public getMatchedRule = (
attributes: Attributes
): SamplingRule | undefined => {
return this.rules.find(
rule =>
rule.matches(attributes, this.samplerResource) ||
rule.samplingRule.ruleName === 'Default'
);
};

private sortRulesByPriority = () => {
this.rules.sort((rule1: SamplingRule, rule2: SamplingRule): number => {
const value = rule1.samplingRule.priority - rule2.samplingRule.priority;

if (value !== 0) return value;

return rule1.samplingRule.ruleName.localeCompare(
rule2.samplingRule.ruleName
);
});
};

public updateRules = (newRules: SamplingRule[]): void => {
// store previous rules
const oldRules: { [key: string]: SamplingRule } = {};

this.rules.forEach((rule: SamplingRule) => {
// use Object.assign to create a new copy of the rules object to store by value
carolabadeer marked this conversation as resolved.
Show resolved Hide resolved
oldRules[rule.samplingRule.ruleName] = Object.assign({}, rule);
});

// update rules in the cache
this.rules = newRules;
carolabadeer marked this conversation as resolved.
Show resolved Hide resolved

this.rules.forEach((rule: SamplingRule) => {
const oldRule: SamplingRule = oldRules[rule.samplingRule.ruleName];

if (oldRule) {
rule.reservoir = oldRule.reservoir;
rule.statistics = oldRule.statistics;
}
});

// sort rules by priority and make lastUpdated = now
this.sortRulesByPriority();
this.lastUpdatedEpoch = Math.floor(new Date().getTime() / 1000);

return;
};
}
79 changes: 79 additions & 0 deletions packages/opentelemetry-sampler-aws-xray/src/sampling-rule.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { ISamplingRule } from './types';
import { wildcardMatch, attributeMatch } from './utils';
import { SamplingDecision } from '@opentelemetry/sdk-trace-base';
import { Attributes, AttributeValue } from '@opentelemetry/api';
import {
SemanticAttributes,
SemanticResourceAttributes,
} from '@opentelemetry/semantic-conventions';
import { Resource } from '@opentelemetry/resources';
import { Reservoir } from './reservoir';
import { Statistics } from './statistics';

export class SamplingRule implements ISamplingRule {
public samplingRule: any;
public reservoir: Reservoir;
public statistics: Statistics;

constructor(samplingRule: any) {
this.samplingRule = samplingRule;

this.reservoir = new Reservoir(this.samplingRule.reservoirSize);
this.statistics = new Statistics();
}

public matches = (attributes: Attributes, resource: Resource): boolean => {
const host: AttributeValue | undefined =
attributes[SemanticAttributes.HTTP_HOST];
const httpMethod = attributes[SemanticAttributes.HTTP_METHOD];
const serviceName = attributes[SemanticResourceAttributes.SERVICE_NAME];
const urlPath = attributes[SemanticAttributes.HTTP_URL];
const serviceType =
resource.attributes[SemanticResourceAttributes.CLOUD_PLATFORM];
const resourceARN =
resource.attributes[SemanticResourceAttributes.AWS_ECS_CLUSTER_ARN] ||
resource.attributes[SemanticResourceAttributes.AWS_EKS_CLUSTER_ARN] ||
resource.attributes[SemanticResourceAttributes.FAAS_ID];

// "Default" is a reserved keyword from X-Ray back-end.
if (this.samplingRule.ruleName === 'Default') {
return true;
}

return (
attributeMatch(attributes, this.samplingRule.attributes) &&
(!host || wildcardMatch(this.samplingRule.host, host)) &&
(!httpMethod ||
wildcardMatch(this.samplingRule.httpMethod, httpMethod)) &&
(!serviceName ||
wildcardMatch(this.samplingRule.sericeName, serviceName)) &&
(!urlPath || wildcardMatch(this.samplingRule.urlPath, urlPath)) &&
(!serviceType ||
wildcardMatch(this.samplingRule.serviceType, serviceType)) &&
(!serviceType ||
wildcardMatch(this.samplingRule.resourceARN, resourceARN))
);
};

public sample = (attributes: Attributes): SamplingDecision => {
// Returning a drop sample decision
// TODO: use reservoir in next PR to make this decision
return SamplingDecision.NOT_RECORD;
};
}
Loading