Skip to content

Commit

Permalink
apm: add version checking for stats obfuscation to avoid double obfus… (
Browse files Browse the repository at this point in the history
#33765)

Co-authored-by: DeForest Richards <56796055+drichards-87@users.noreply.github.com>
  • Loading branch information
ajgajg1134 and drichards-87 authored Mar 3, 2025
1 parent 369a8db commit 78c8d30
Show file tree
Hide file tree
Showing 11 changed files with 102 additions and 16 deletions.
9 changes: 5 additions & 4 deletions comp/otelcol/otlp/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@ import (
"testing"
"time"

"github.com/DataDog/opentelemetry-mapping-go/pkg/inframetadata/payload"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source"
"github.com/DataDog/sketches-go/ddsketch"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"google.golang.org/protobuf/proto"

"github.com/DataDog/opentelemetry-mapping-go/pkg/inframetadata/payload"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source"
"github.com/DataDog/sketches-go/ddsketch"

configmock "github.com/DataDog/datadog-agent/pkg/config/mock"
pkgConfigModel "github.com/DataDog/datadog-agent/pkg/config/model"
pkgConfigSetup "github.com/DataDog/datadog-agent/pkg/config/setup"
Expand Down Expand Up @@ -391,7 +392,7 @@ type MockStatsProcessor struct {
}

// ProcessStats implements the stats processor interface
func (s *MockStatsProcessor) ProcessStats(in *pb.ClientStatsPayload, _, _ string) {
func (s *MockStatsProcessor) ProcessStats(in *pb.ClientStatsPayload, _, _, _ string) {
s.In = append(s.In, in)
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/obfuscate/obfuscate.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import (
"github.com/DataDog/datadog-go/v5/statsd"
)

// Version is an incrementing integer to identify this "version" of obfuscation logic. This is used to avoid obfuscation
// conflicts and ensure that clients of the obfuscator can decide where obfuscation should occur.
const Version = 1

// Obfuscator quantizes and obfuscates spans. The obfuscator is not safe for
// concurrent use.
type Obfuscator struct {
Expand Down
16 changes: 12 additions & 4 deletions pkg/trace/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ func (a *Agent) discardSpans(p *api.Payload) {
}
}

func (a *Agent) processStats(in *pb.ClientStatsPayload, lang, tracerVersion, containerID string) *pb.ClientStatsPayload {
func (a *Agent) processStats(in *pb.ClientStatsPayload, lang, tracerVersion, containerID, obfuscationVersion string) *pb.ClientStatsPayload {
enableContainers := a.conf.HasFeature("enable_cid_stats") || (a.conf.FargateOrchestrator != config.OrchestratorUnknown)
if !enableContainers || a.conf.HasFeature("disable_cid_stats") {
// only allow the ContainerID stats dimension if we're in a Fargate instance or it's
Expand All @@ -524,14 +524,22 @@ func (a *Agent) processStats(in *pb.ClientStatsPayload, lang, tracerVersion, con
if in.Lang == "" {
in.Lang = lang
}
shouldObfuscate := obfuscationVersion == ""
if !shouldObfuscate {
if versionInt, err := strconv.Atoi(obfuscationVersion); err != nil && versionInt < obfuscate.Version {
log.Debug("Tracer is using older version of obfuscation %d", versionInt)
}
}
for i, group := range in.Stats {
n := 0
for _, b := range group.Stats {
a.normalizeStatsGroup(b, lang)
if !a.Blacklister.AllowsStat(b) {
continue
}
a.obfuscateStatsGroup(b)
if shouldObfuscate {
a.obfuscateStatsGroup(b)
}
a.Replacer.ReplaceStatsGroup(b)
group.Stats[n] = b
n++
Expand Down Expand Up @@ -560,8 +568,8 @@ func mergeDuplicates(s *pb.ClientStatsBucket) {
}

// ProcessStats processes incoming client stats in from the given tracer.
func (a *Agent) ProcessStats(in *pb.ClientStatsPayload, lang, tracerVersion, containerID string) {
a.ClientStatsAggregator.In <- a.processStats(in, lang, tracerVersion, containerID)
func (a *Agent) ProcessStats(in *pb.ClientStatsPayload, lang, tracerVersion, containerID, obfuscationVersion string) {
a.ClientStatsAggregator.In <- a.processStats(in, lang, tracerVersion, containerID, obfuscationVersion)
}

// sample performs all sampling on the processedTrace modifying it as needed and returning if the trace should be kept
Expand Down
56 changes: 54 additions & 2 deletions pkg/trace/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ import (
"testing"
"time"

gzip "github.com/DataDog/datadog-agent/comp/trace/compression/impl-gzip"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

gzip "github.com/DataDog/datadog-agent/comp/trace/compression/impl-gzip"

"github.com/DataDog/datadog-agent/pkg/obfuscate"
pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
"github.com/DataDog/datadog-agent/pkg/trace/api"
Expand Down Expand Up @@ -2478,6 +2479,7 @@ func TestConvertStats(t *testing.T) {
lang string
tracerVersion string
containerID string
obfVersion string
out *pb.ClientStatsPayload
}{
{
Expand Down Expand Up @@ -2551,6 +2553,56 @@ func TestConvertStats(t *testing.T) {
},
},
},
{
name: "pre-obfuscated",
obfVersion: strconv.Itoa(obfuscate.Version + 1),
in: &pb.ClientStatsPayload{
Hostname: "tracer_hots",
Env: "tracer_env",
Version: "code_version",
Stats: []*pb.ClientStatsBucket{
{
Start: 1,
Duration: 2,
Stats: []*pb.ClientGroupedStats{
{
Service: "redis_service",
Name: "name-2",
Resource: "SET k v",
HTTPStatusCode: 400,
Type: "redis",
},
},
},
},
},
lang: "java",
tracerVersion: "v1",
containerID: "abc123",
out: &pb.ClientStatsPayload{
Hostname: "tracer_hots",
Env: "tracer_env",
Version: "code_version",
Lang: "java",
TracerVersion: "v1",
ContainerID: "",
Stats: []*pb.ClientStatsBucket{
{
Start: 1,
Duration: 2,
Stats: []*pb.ClientGroupedStats{
{
Service: "redis_service",
Name: "name_2",
Resource: "SET k v",
HTTPStatusCode: 200,
Type: "redis",
},
},
},
},
},
},
{
name: "containerID feature disabled, no fargate",
features: "disable_cid_stats",
Expand Down Expand Up @@ -2713,7 +2765,7 @@ func TestConvertStats(t *testing.T) {
conf: cfg,
}

out := a.processStats(testCase.in, testCase.lang, testCase.tracerVersion, testCase.containerID)
out := a.processStats(testCase.in, testCase.lang, testCase.tracerVersion, testCase.containerID, testCase.obfVersion)
assert.Equal(t, testCase.out, out)
})
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/trace/agent/fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func FuzzProcessStats(f *testing.F) {
if !equal(decPreProcess, pbStats) {
t.Fatalf("Inconsistent encoding/decoding before processing: (%v) is different from (%v)", decPreProcess, pbStats)
}
processedStats := agent.processStats(pbStats, lang, version, containerID)
processedStats := agent.processStats(pbStats, lang, version, containerID, "")
encPostProcess, err := encode(processedStats)
if err != nil {
t.Fatalf("processStats returned an invalid stats payload: %v", err)
Expand Down
7 changes: 4 additions & 3 deletions pkg/trace/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ func (r *HTTPReceiver) replyOK(req *http.Request, v Version, w http.ResponseWrit
type StatsProcessor interface {
// ProcessStats takes a stats payload and consumes it. It is considered to be originating
// from the given lang.
ProcessStats(p *pb.ClientStatsPayload, lang, tracerVersion, containerID string)
ProcessStats(p *pb.ClientStatsPayload, lang, tracerVersion, containerID, obfuscationVersion string)
}

// handleStats handles incoming stats payloads.
Expand Down Expand Up @@ -562,11 +562,12 @@ func (r *HTTPReceiver) handleStats(w http.ResponseWriter, req *http.Request) {
_ = r.statsd.Count("datadog.trace_agent.receiver.stats_bytes", rd.Count, ts.AsTags(), 1)
_ = r.statsd.Count("datadog.trace_agent.receiver.stats_buckets", int64(len(in.Stats)), ts.AsTags(), 1)

// Resolve ContainerID baased on HTTP headers
// Resolve ContainerID based on HTTP headers
lang := req.Header.Get(header.Lang)
tracerVersion := req.Header.Get(header.TracerVersion)
obfuscationVersion := req.Header.Get(header.TracerObfuscationVersion)
containerID := r.containerIDProvider.GetContainerID(req.Context(), req.Header)
r.statsProcessor.ProcessStats(in, lang, tracerVersion, containerID)
r.statsProcessor.ProcessStats(in, lang, tracerVersion, containerID, obfuscationVersion)
}

// handleTraces knows how to handle a bunch of traces
Expand Down
6 changes: 4 additions & 2 deletions pkg/trace/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var headerFields = map[string]string{

type noopStatsProcessor struct{}

func (noopStatsProcessor) ProcessStats(_ *pb.ClientStatsPayload, _, _, _ string) {}
func (noopStatsProcessor) ProcessStats(_ *pb.ClientStatsPayload, _, _, _, _ string) {}

func newTestReceiverFromConfig(conf *config.AgentConfig) *HTTPReceiver {
dynConf := sampler.NewDynamicConfig()
Expand Down Expand Up @@ -656,15 +656,17 @@ type mockStatsProcessor struct {
lastLang string
lastTracerVersion string
containerID string
obfVersion string
}

func (m *mockStatsProcessor) ProcessStats(p *pb.ClientStatsPayload, lang, tracerVersion, containerID string) {
func (m *mockStatsProcessor) ProcessStats(p *pb.ClientStatsPayload, lang, tracerVersion, containerID, obfVersion string) {
m.mu.Lock()
defer m.mu.Unlock()
m.lastP = p
m.lastLang = lang
m.lastTracerVersion = tracerVersion
m.containerID = containerID
m.obfVersion = obfVersion
}

func (m *mockStatsProcessor) Got() (p *pb.ClientStatsPayload, lang, tracerVersion, containerID string) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/trace/api/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (r *HTTPReceiver) makeInfoHandler() (hash string, handler http.HandlerFunc)
Config reducedConfig `json:"config"`
PeerTags []string `json:"peer_tags"`
SpanKindsStatsComputed []string `json:"span_kinds_stats_computed"`
ObfuscationVersion int `json:"obfuscation_version"`
}{
Version: r.conf.AgentVersion,
GitCommit: r.conf.GitCommit,
Expand All @@ -102,6 +103,7 @@ func (r *HTTPReceiver) makeInfoHandler() (hash string, handler http.HandlerFunc)
SpanEvents: true,
EvpProxyAllowedHeaders: EvpProxyAllowedHeaders,
SpanKindsStatsComputed: spanKindsStatsComputed,
ObfuscationVersion: obfuscate.Version,
Config: reducedConfig{
DefaultEnv: r.conf.DefaultEnv,
TargetTPS: r.conf.TargetTPS,
Expand Down
1 change: 1 addition & 0 deletions pkg/trace/api/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ func TestInfoHandler(t *testing.T) {
"evp_proxy_allowed_headers": nil,
"peer_tags": nil,
"span_kinds_stats_computed": nil,
"obfuscation_version": nil,
"config": map[string]interface{}{
"default_env": nil,
"target_tps": nil,
Expand Down
4 changes: 4 additions & 0 deletions pkg/trace/api/internal/header/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,8 @@ const (
// Any value other than 0, f, F, FALSE, False, false set in this header will cause the agent to send a 429 code to a client
// when the payload cannot be submitted.
SendRealHTTPStatus = "Datadog-Send-Real-Http-Status"

// TracerObfuscationVersion specifies the version of obfuscation done at the tracer, if any.
// This used to avoid "double obfuscating" data.
TracerObfuscationVersion = "Datadog-Obfuscation-Version"
)
11 changes: 11 additions & 0 deletions releasenotes/notes/traceagent-statsobf-6fd2cb51be47c0b9.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Each section from every release note are combined when the
# CHANGELOG.rst is rendered. So the text needs to be worded so that
# it does not depend on any information only available in another
# section. This may mean repeating some details, but each section
# must be readable independently of the other.
#
# Each section note must be formatted as reStructuredText.
---
enhancements:
- |
APM: Expose an "obfuscation_version" value via the /info endpoint. Accept a new header "Datadog-Obfuscation-Version" for incoming stats payloads; if any non-zero value is set, the trace-agent will not attempt to obfuscate these payloads as they have already been obfuscated by the tracer.

0 comments on commit 78c8d30

Please sign in to comment.