From 9522e1ba6ddaef82ec506149e743d2f8bedf62fd Mon Sep 17 00:00:00 2001 From: Zach Leslie Date: Wed, 6 Nov 2024 19:43:30 +0000 Subject: [PATCH] Add `invalid_utf8` to reasons spans could be rejected --- integration/e2e/metrics_generator_test.go | 22 ++++++- modules/generator/instance.go | 4 +- .../processor/spanmetrics/spanmetrics.go | 30 +++++++++- .../processor/spanmetrics/spanmetrics_test.go | 59 +++++++++++++------ 4 files changed, 90 insertions(+), 25 deletions(-) diff --git a/integration/e2e/metrics_generator_test.go b/integration/e2e/metrics_generator_test.go index c660cf5a773..5d3fd9747c8 100644 --- a/integration/e2e/metrics_generator_test.go +++ b/integration/e2e/metrics_generator_test.go @@ -130,6 +130,24 @@ func TestMetricsGenerator(t *testing.T) { }) require.NoError(t, err) + // also send one with an invalid label value + err = c.EmitBatch(context.Background(), &thrift.Batch{ + Process: &thrift.Process{ServiceName: "app"}, + Spans: []*thrift.Span{ + { + TraceIdLow: traceIDLow, + TraceIdHigh: traceIDHigh, + SpanId: r.Int63(), + ParentSpanId: parentSpanID, + OperationName: "\xff\xff", + StartTime: time.Now().Add(10 * 24 * time.Hour).UnixMicro(), + Duration: int64(1 * time.Second / time.Microsecond), + Tags: []*thrift.Tag{{Key: "span.kind", VStr: stringPtr("server")}}, + }, + }, + }) + require.NoError(t, err) + // Fetch metrics from Prometheus once they are received var metricFamilies map[string]*io_prometheus_client.MetricFamily for { @@ -191,8 +209,8 @@ func TestMetricsGenerator(t *testing.T) { assert.Equal(t, 1.0, sumValues(metricFamilies, "traces_spanmetrics_latency_sum", lbls)) // Verify metrics - assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(4), "tempo_metrics_generator_spans_received_total")) - assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(2), "tempo_metrics_generator_spans_discarded_total")) + assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(5), "tempo_metrics_generator_spans_received_total")) + assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(3), "tempo_metrics_generator_spans_discarded_total")) assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(25), "tempo_metrics_generator_registry_active_series")) assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(1000), "tempo_metrics_generator_registry_max_active_series")) assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(25), "tempo_metrics_generator_registry_series_added_total")) diff --git a/modules/generator/instance.go b/modules/generator/instance.go index 3056a63bf65..a3c8a0db6a3 100644 --- a/modules/generator/instance.go +++ b/modules/generator/instance.go @@ -61,6 +61,7 @@ var ( const ( reasonOutsideTimeRangeSlack = "outside_metrics_ingestion_slack" reasonSpanMetricsFiltered = "span_metrics_filtered" + reasonInvalidUTF8 = "invalid_utf8" ) type instance struct { @@ -290,7 +291,8 @@ func (i *instance) addProcessor(processorName string, cfg ProcessorConfig) error switch processorName { case spanmetrics.Name: filteredSpansCounter := metricSpansDiscarded.WithLabelValues(i.instanceID, reasonSpanMetricsFiltered) - newProcessor, err = spanmetrics.New(cfg.SpanMetrics, i.registry, filteredSpansCounter) + invalidUTF8Counter := metricSpansDiscarded.WithLabelValues(i.instanceID, reasonInvalidUTF8) + newProcessor, err = spanmetrics.New(cfg.SpanMetrics, i.registry, filteredSpansCounter, invalidUTF8Counter) if err != nil { return err } diff --git a/modules/generator/processor/spanmetrics/spanmetrics.go b/modules/generator/processor/spanmetrics/spanmetrics.go index 492edabd7c5..954c2ddf6e2 100644 --- a/modules/generator/processor/spanmetrics/spanmetrics.go +++ b/modules/generator/processor/spanmetrics/spanmetrics.go @@ -2,7 +2,9 @@ package spanmetrics import ( "context" + "fmt" "time" + "unicode/utf8" "github.com/prometheus/prometheus/util/strutil" "go.opentelemetry.io/otel" @@ -40,12 +42,13 @@ type Processor struct { filter *spanfilter.SpanFilter filteredSpansCounter prometheus.Counter + invalidUTF8Counter prometheus.Counter // for testing now func() time.Time } -func New(cfg Config, reg registry.Registry, spanDiscardCounter prometheus.Counter) (gen.Processor, error) { +func New(cfg Config, reg registry.Registry, filteredSpansCounter, invalidUTF8Counter prometheus.Counter) (gen.Processor, error) { labels := make([]string, 0, 4+len(cfg.Dimensions)) if cfg.IntrinsicDimensions.Service { @@ -72,13 +75,19 @@ func New(cfg Config, reg registry.Registry, spanDiscardCounter prometheus.Counte labels = append(labels, sanitizeLabelNameWithCollisions(m.Name)) } + err := validateLabelValues(labels) + if err != nil { + return nil, err + } + p := &Processor{ Cfg: cfg, registry: reg, spanMetricsTargetInfo: reg.NewGauge(targetInfo), now: time.Now, labels: labels, - filteredSpansCounter: spanDiscardCounter, + filteredSpansCounter: filteredSpansCounter, + invalidUTF8Counter: invalidUTF8Counter, } if cfg.Subprocessors[Latency] { @@ -96,7 +105,6 @@ func New(cfg Config, reg registry.Registry, spanDiscardCounter prometheus.Counte return nil, err } - p.filteredSpansCounter = spanDiscardCounter p.filter = filter return p, nil } @@ -203,6 +211,12 @@ func (p *Processor) aggregateMetricsForSpan(svcName string, jobName string, inst spanMultiplier := processor_util.GetSpanMultiplier(p.Cfg.SpanMultiplierKey, span, rs) + err := validateLabelValues(labelValues) + if err != nil { + p.invalidUTF8Counter.Inc() + return + } + registryLabelValues := p.registry.NewLabelValueCombo(labels, labelValues) if p.Cfg.Subprocessors[Count] { @@ -259,6 +273,16 @@ func sanitizeLabelNameWithCollisions(name string) string { return sanitized } +func validateLabelValues(v []string) error { + for _, value := range v { + if !utf8.ValidString(value) { + return fmt.Errorf("invalid utf8 string: %s", value) + } + } + + return nil +} + func isIntrinsicDimension(name string) bool { return processor_util.Contains(name, []string{dimJob, dimSpanName, dimSpanKind, dimStatusCode, dimStatusMessage, dimInstance}) } diff --git a/modules/generator/processor/spanmetrics/spanmetrics_test.go b/modules/generator/processor/spanmetrics/spanmetrics_test.go index 6b8dc937044..e52180c00ae 100644 --- a/modules/generator/processor/spanmetrics/spanmetrics_test.go +++ b/modules/generator/processor/spanmetrics/spanmetrics_test.go @@ -9,6 +9,7 @@ import ( "strings" "testing" + "github.com/davecgh/go-spew/spew" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/model/labels" @@ -35,12 +36,13 @@ func TestSpanMetrics(t *testing.T) { testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) cfg.HistogramBuckets = []float64{0.5, 1} - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -72,13 +74,14 @@ func TestSpanMetrics(t *testing.T) { func TestSpanMetricsTargetInfoEnabled(t *testing.T) { testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) cfg.HistogramBuckets = []float64{0.5, 1} cfg.EnableTargetInfo = true - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -110,6 +113,7 @@ func TestSpanMetrics_dimensions(t *testing.T) { testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) @@ -118,7 +122,7 @@ func TestSpanMetrics_dimensions(t *testing.T) { cfg.IntrinsicDimensions.StatusMessage = true cfg.Dimensions = []string{"foo", "bar", "does-not-exist"} - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -166,6 +170,7 @@ func TestSpanMetrics_collisions(t *testing.T) { testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) @@ -173,7 +178,7 @@ func TestSpanMetrics_collisions(t *testing.T) { cfg.Dimensions = []string{"span.kind", "span_name"} cfg.IntrinsicDimensions.SpanKind = false - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -215,12 +220,14 @@ func TestSpanMetrics_collisions(t *testing.T) { func TestJobLabelWithNamespaceAndInstanceID(t *testing.T) { testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") + cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) cfg.HistogramBuckets = []float64{0.5, 1} cfg.EnableTargetInfo = true - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -263,6 +270,7 @@ func TestJobLabelWithNamespaceAndInstanceID(t *testing.T) { func TestSpanMetrics_applyFilterPolicy(t *testing.T) { filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cases := []struct { filterPolicies []filterconfig.FilterPolicy @@ -353,7 +361,7 @@ func TestSpanMetrics_applyFilterPolicy(t *testing.T) { cfg.FilterPolicies = tc.filterPolicies testRegistry := registry.NewTestRegistry() - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -407,13 +415,14 @@ func TestJobLabelWithNamespaceAndNoServiceName(t *testing.T) { // but service will still be there testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) cfg.HistogramBuckets = []float64{0.5, 1} cfg.EnableTargetInfo = true - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -459,13 +468,14 @@ func TestJobLabelWithNamespaceAndNoServiceName(t *testing.T) { func TestLabelsWithDifferentBatches(t *testing.T) { testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) cfg.HistogramBuckets = []float64{0.5, 1} cfg.EnableTargetInfo = true - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -537,13 +547,14 @@ func TestTargetInfoEnabled(t *testing.T) { // if the only labels are job and instance then target_info should not exist testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) cfg.EnableTargetInfo = true cfg.HistogramBuckets = []float64{0.5, 1} - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -584,13 +595,14 @@ func TestTargetInfoEnabled(t *testing.T) { func TestTargetInfoDisabled(t *testing.T) { testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) cfg.EnableTargetInfo = false cfg.HistogramBuckets = []float64{0.5, 1} - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -629,6 +641,7 @@ func TestTargetInfoWithExclusion(t *testing.T) { // if the only labels are job and instance then target_info should not exist testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) @@ -636,7 +649,7 @@ func TestTargetInfoWithExclusion(t *testing.T) { cfg.TargetInfoExcludedDimensions = []string{"container", "container.id"} cfg.HistogramBuckets = []float64{0.5, 1} - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -690,13 +703,14 @@ func TestTargetInfoSanitizeLabelName(t *testing.T) { // if the only labels are job and instance then target_info should not exist testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) cfg.EnableTargetInfo = true cfg.HistogramBuckets = []float64{0.5, 1} - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -739,13 +753,14 @@ func TestTargetInfoWithJobAndInstanceOnly(t *testing.T) { // if the only labels are job and instance then target_info should not exist testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) cfg.HistogramBuckets = []float64{0.5, 1} cfg.EnableTargetInfo = true - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -773,12 +788,13 @@ func TestTargetInfoNoJobAndNoInstance(t *testing.T) { // if both job and instance are missing, target info should not exist testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) cfg.HistogramBuckets = []float64{0.5, 1} - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -820,13 +836,14 @@ func TestTargetInfoNoJobAndNoInstance(t *testing.T) { func TestTargetInfoWithDifferentBatches(t *testing.T) { testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) cfg.EnableTargetInfo = true cfg.HistogramBuckets = []float64{0.5, 1} - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -890,6 +907,7 @@ func TestTargetInfoWithDifferentBatches(t *testing.T) { func TestSpanMetricsDimensionMapping(t *testing.T) { testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) @@ -904,7 +922,7 @@ func TestSpanMetricsDimensionMapping(t *testing.T) { }, } - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -949,6 +967,7 @@ func TestSpanMetricsDimensionMapping(t *testing.T) { func TestSpanMetricsDimensionMappingMissingLabels(t *testing.T) { testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) @@ -976,7 +995,7 @@ func TestSpanMetricsDimensionMappingMissingLabels(t *testing.T) { }, } - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -1027,12 +1046,13 @@ func TestSpanMetricsDimensionMappingMissingLabels(t *testing.T) { func TestSpanMetricsNegativeLatency(t *testing.T) { testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) cfg.HistogramBuckets = []float64{0.5, 1} - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -1158,13 +1178,14 @@ func BenchmarkSpanMetrics_applyFilterPolicyMedium(b *testing.B) { func benchmarkFilterPolicy(b *testing.B, policies []filterconfig.FilterPolicy, batch *trace_v1.ResourceSpans) { filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") testRegistry := registry.NewTestRegistry() cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) cfg.FilterPolicies = policies - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(b, err) defer p.Shutdown(context.Background()) b.ResetTimer()