Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(opentelemetry-sampler-aws-xray): add rule cache and rule matching #1463

Closed
42 changes: 42 additions & 0 deletions packages/opentelemetry-sampler-aws-xray/src/fallback-sampler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import {
Sampler,
SamplingResult,
AlwaysOnSampler,
} from '@opentelemetry/sdk-trace-base';
import { Context, Link, Attributes, SpanKind } from '@opentelemetry/api';

export class FallbackSampler implements Sampler {
private _alwaysOnSampler = new AlwaysOnSampler();

shouldSample(
context: Context,
traceId: string,
spanName: string,
spanKind: SpanKind,
attributes: Attributes,
links: Link[]
): SamplingResult {
// This will be updated to be a rate limiting sampler in the next PR
return this._alwaysOnSampler.shouldSample();
}

public toString = (): string => {
return 'FallbackSampler';
carolabadeer marked this conversation as resolved.
Show resolved Hide resolved
};
}
72 changes: 60 additions & 12 deletions packages/opentelemetry-sampler-aws-xray/src/remote-sampler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,22 @@

import {
Sampler,
SamplingDecision,
SamplingResult,
} from '@opentelemetry/sdk-trace-base';
import { diag, DiagLogger } from '@opentelemetry/api';
import {
SamplingRule,
AWSXRaySamplerConfig,
SamplingRuleRecord,
} from './types';
diag,
DiagLogger,
Context,
Link,
Attributes,
SpanKind,
} from '@opentelemetry/api';
import { SamplingRule } from './sampling-rule';
import { Resource } from '@opentelemetry/resources';
carolabadeer marked this conversation as resolved.
Show resolved Hide resolved
import { AWSXRaySamplerConfig, SamplingRuleRecord } from './types';
import axios from 'axios';
import { RuleCache } from './rule-cache';
import { FallbackSampler } from './fallback-sampler';

// 5 minute interval on sampling rules fetch (default polling interval)
const DEFAULT_INTERVAL_MS = 5 * 60 * 1000;
Expand All @@ -37,6 +43,9 @@ const SAMPLING_RULES_PATH = '/GetSamplingRules';
export class AWSXRayRemoteSampler implements Sampler {
private _pollingInterval: number;
private _awsProxyEndpoint: string;
private _resource: Resource;
private _ruleCache: RuleCache;
private _fallBackSampler: FallbackSampler;
private _samplerDiag: DiagLogger;

constructor(samplerConfig: AWSXRaySamplerConfig) {
Expand All @@ -45,6 +54,9 @@ export class AWSXRayRemoteSampler implements Sampler {
this._awsProxyEndpoint = samplerConfig.endpoint
? samplerConfig.endpoint
: DEFAULT_AWS_PROXY_ENDPOINT;
this._resource = samplerConfig.resource;
this._ruleCache = new RuleCache();
this._fallBackSampler = new FallbackSampler();

if (this._pollingInterval <= 0) {
throw new TypeError('pollingInterval must be a positive integer');
Expand All @@ -58,9 +70,46 @@ export class AWSXRayRemoteSampler implements Sampler {
this.startRulePoller();
}

shouldSample(): SamplingResult {
// Implementation to be added
return { decision: SamplingDecision.NOT_RECORD };
shouldSample(
context: Context,
traceId: string,
spanName: string,
spanKind: SpanKind,
attributes: Attributes,
links: Link[]
): SamplingResult {
if (this._ruleCache.isExpired()) {
// go to fallback sampler .shouldSample
return this._fallBackSampler.shouldSample(
context,
traceId,
spanName,
spanKind,
attributes,
links
);
}

const matchedRule = this._ruleCache.getMatchedRule(
attributes,
this._resource
);

// TODO: update after verifying if default rule will always match,
// this means that this method will always return return { decision: matchedRule.sample(attributes) }
// as long as the rule cache has not expired.
if (matchedRule) {
return { decision: matchedRule.sample(attributes) };
}

return this._fallBackSampler.shouldSample(
carolabadeer marked this conversation as resolved.
Show resolved Hide resolved
context,
traceId,
spanName,
spanKind,
attributes,
links
);
}

toString(): string {
Expand Down Expand Up @@ -88,11 +137,10 @@ export class AWSXRayRemoteSampler implements Sampler {

samplingRules =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can declare the samplingRule as const here instead of updating a let variable from above. This is more idiomatic IMO

responseJson?.SamplingRuleRecords.map(
(record: SamplingRuleRecord) => record.SamplingRule
(record: SamplingRuleRecord) => new SamplingRule(record.SamplingRule)
).filter(Boolean) ?? [];

// TODO: pass samplingRules to rule cache, temporarily logging the samplingRules array
this._samplerDiag.debug('sampling rules: ', samplingRules);
this._ruleCache.updateRules(samplingRules);
} catch (error) {
// Log error
this._samplerDiag.warn('Error fetching sampling rules: ', error);
Expand Down
24 changes: 24 additions & 0 deletions packages/opentelemetry-sampler-aws-xray/src/reservoir.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

export class Reservoir {
// This class will be implemented in the next PR
private _quota: number;

constructor(quota: number) {
this._quota = quota;
}
}
91 changes: 91 additions & 0 deletions packages/opentelemetry-sampler-aws-xray/src/rule-cache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { SamplingRule } from './sampling-rule';
import { Attributes } from '@opentelemetry/api';
import { Resource } from '@opentelemetry/resources';

const RuleCacheTTL_SECONDS = 60 * 60; // The cache expires 1 hour after the last refresh time. (in seconds)

export class RuleCache {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use the term Cache? What exactly is it caching?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a cache that holds the sampling rules that are not expired. This is the terminology we use in X-Ray for it

public rules: SamplingRule[];
private lastUpdated: number | undefined = undefined;

constructor() {
this.rules = [];
}

public isExpired = (): boolean => {
if (this.lastUpdated === undefined) {
return true;
}

const now = Math.floor(new Date().getTime() / 1000); // getTime returns milliseconds -> /1000 gives seconds
return now > this.lastUpdated + RuleCacheTTL_SECONDS;
};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be called very often I assume.
You can consider a more performant version which will set a timeout once the config is updated and this timer will invalidate the cache by setting this.rules to []. If a new config arrives later, you can cancel the timer by it's handler.


public getMatchedRule = (
attributes: Attributes,
resource: Resource
carolabadeer marked this conversation as resolved.
Show resolved Hide resolved
): SamplingRule | undefined => {
if (this.isExpired()) {
carolabadeer marked this conversation as resolved.
Show resolved Hide resolved
return undefined;
}

return this.rules.find(
rule => rule.matches(attributes, resource) || rule.RuleName === 'Default'
);
};

private _sortRulesByPriority = () => {
carolabadeer marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function introduces a side-effect by manipulating the values in this.rules which isn't trivial. You can consider doing it without side-effects and just receiving an array and returning it sorted, which is more readable IMO.

this.rules.sort((rule1: SamplingRule, rule2: SamplingRule): number => {
const value = rule1.Priority - rule2.Priority;

if (value !== 0) return value;

if (rule1.RuleName > rule2.RuleName) {
return 1;
} else return -1;
carolabadeer marked this conversation as resolved.
Show resolved Hide resolved
});
};

public updateRules = (newRules: SamplingRule[]): void => {
// store previous rules
const oldRules: { [key: string]: SamplingRule } = {};

this.rules.forEach((rule: SamplingRule) => {
// use Object.assign to create a new copy of the rules object to store by value
carolabadeer marked this conversation as resolved.
Show resolved Hide resolved
oldRules[rule.RuleName] = Object.assign({}, rule);
carolabadeer marked this conversation as resolved.
Show resolved Hide resolved
});
carolabadeer marked this conversation as resolved.
Show resolved Hide resolved

// update rules in the cache
this.rules = newRules;
carolabadeer marked this conversation as resolved.
Show resolved Hide resolved

this.rules.forEach((rule: SamplingRule) => {
const oldRule: SamplingRule = oldRules[rule.RuleName];

if (oldRule) {
rule.reservoir = oldRule.reservoir;
rule.statistics = oldRule.statistics;
}
});

// sort rules by priority and make lastUpdated = now
this._sortRulesByPriority();
this.lastUpdated = Math.floor(new Date().getTime() / 1000);
carolabadeer marked this conversation as resolved.
Show resolved Hide resolved

return;
};
}
92 changes: 92 additions & 0 deletions packages/opentelemetry-sampler-aws-xray/src/sampling-rule.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { ISamplingRule } from './types';
import { wildcardMatch, attributeMatch } from './utils';
import { SamplingDecision } from '@opentelemetry/sdk-trace-base';
import { Attributes, AttributeValue } from '@opentelemetry/api';
import {
SemanticAttributes,
SemanticResourceAttributes,
} from '@opentelemetry/semantic-conventions';
import { Resource } from '@opentelemetry/resources';
import { Reservoir } from './reservoir';
import { Statistics } from './statistics';

export class SamplingRule implements ISamplingRule {
public RuleName: string;
public Priority: number;
public ReservoirSize: number;
public FixedRate: number;
public ServiceName: string;
public ServiceType: string;
public Host: string;
public HTTPMethod: string;
public URLPath: string;
public ResourceARN: string;
public Attributes?: { [key: string]: string };
public Version: number;
public reservoir: Reservoir;
public statistics: Statistics;
carolabadeer marked this conversation as resolved.
Show resolved Hide resolved

constructor(samplingRule: any) {
this.RuleName = samplingRule.ruleName;
this.Priority = samplingRule.priority;
this.ReservoirSize = samplingRule.reservoirSize;
this.FixedRate = samplingRule.fixedRate;
this.ServiceName = samplingRule.serviceName;
this.ServiceType = samplingRule.serviceType;
this.Host = samplingRule.host;
this.HTTPMethod = samplingRule.httpMethod;
this.URLPath = samplingRule.urlPath;
this.ResourceARN = samplingRule.resourceARN;
this.Version = samplingRule.version;
this.Attributes = samplingRule.attributes;
carolabadeer marked this conversation as resolved.
Show resolved Hide resolved

this.reservoir = new Reservoir(this.ReservoirSize);
this.statistics = new Statistics();
}

public matches = (attributes: Attributes, resource: Resource): boolean => {
const host: AttributeValue | undefined =
attributes[SemanticAttributes.HTTP_HOST];
const httpMethod = attributes[SemanticAttributes.HTTP_METHOD];
const serviceName = attributes[SemanticResourceAttributes.SERVICE_NAME];
const urlPath = attributes[SemanticAttributes.HTTP_URL];
const serviceType =
resource.attributes[SemanticResourceAttributes.CLOUD_PLATFORM];

// "Default" is a reserved keyword from X-Ray back-end.
if (this.RuleName === 'Default') {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's used twice in the code (also in rule-cache.ts), consider extracting this const to consts.ts and copying the comment there.

return true;
}

return (
attributeMatch(attributes, this.Attributes) &&
(!host || wildcardMatch(this.Host, host)) &&
(!httpMethod || wildcardMatch(this.HTTPMethod, httpMethod)) &&
(!serviceName || wildcardMatch(this.ServiceName, serviceName)) &&
(!urlPath || wildcardMatch(this.URLPath, urlPath)) &&
(!serviceType || wildcardMatch(this.ServiceType, serviceType))
carolabadeer marked this conversation as resolved.
Show resolved Hide resolved
);
};

public sample = (attributes: Attributes): SamplingDecision => {
// Returning a drop sample decision
// TODO: use reservoir in next PR to make this decision
return SamplingDecision.NOT_RECORD;
};
}
Loading