From beb75cb0a4633fd22f4fa2b7374ac5568fbcd9ec Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Tue, 14 Jan 2025 14:34:46 -0500 Subject: [PATCH] Honor tenant max_trace_bytes setting --- modules/blockbuilder/blockbuilder_test.go | 1 + modules/blockbuilder/tenant_store.go | 18 ++++++++++++++++++ modules/blockbuilder/writeable_block.go | 1 + 3 files changed, 20 insertions(+) diff --git a/modules/blockbuilder/blockbuilder_test.go b/modules/blockbuilder/blockbuilder_test.go index 14bdc646772..3c11a3b6208 100644 --- a/modules/blockbuilder/blockbuilder_test.go +++ b/modules/blockbuilder/blockbuilder_test.go @@ -402,6 +402,7 @@ type mockOverrides struct { dc backend.DedicatedColumns } +func (m *mockOverrides) MaxBytesPerTrace(_ string) int { return 0 } func (m *mockOverrides) DedicatedColumns(_ string) backend.DedicatedColumns { return m.dc } func newKafkaClient(t testing.TB, config ingest.KafkaConfig) *kgo.Client { diff --git a/modules/blockbuilder/tenant_store.go b/modules/blockbuilder/tenant_store.go index 5755c153848..d53428c4a25 100644 --- a/modules/blockbuilder/tenant_store.go +++ b/modules/blockbuilder/tenant_store.go @@ -11,9 +11,11 @@ import ( "github.com/go-kit/log/level" "github.com/google/uuid" "github.com/grafana/tempo/modules/blockbuilder/util" + "github.com/grafana/tempo/modules/overrides" "github.com/grafana/tempo/pkg/livetraces" "github.com/grafana/tempo/pkg/model" "github.com/grafana/tempo/pkg/tempopb" + "github.com/grafana/tempo/pkg/tracesizes" "github.com/grafana/tempo/tempodb" "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding" @@ -31,6 +33,8 @@ var metricBlockBuilderFlushedBlocks = promauto.NewCounterVec( }, []string{"tenant"}, ) +const reasonTraceTooLarge = "trace_too_large" + // TODO - This needs locking type tenantStore struct { tenantID string @@ -50,6 +54,7 @@ type tenantStore struct { walBlocks []common.WALBlock liveTraces *livetraces.LiveTraces + traceSizes *tracesizes.Tracker } func newTenantStore(tenantID string, partitionID, endTimestamp uint64, cfg BlockConfig, logger log.Logger, wal *wal.WAL, enc encoding.VersionedEncoding, o Overrides) (*tenantStore, error) { @@ -64,6 +69,7 @@ func newTenantStore(tenantID string, partitionID, endTimestamp uint64, cfg Block blocksMtx: sync.Mutex{}, enc: enc, liveTraces: livetraces.New(), + traceSizes: tracesizes.New(), } return s, s.resetHeadBlock() @@ -112,7 +118,19 @@ func (s *tenantStore) resetHeadBlock() error { } func (s *tenantStore) AppendTrace(traceID []byte, tr *tempopb.Trace, ts time.Time) error { + maxSz := s.overrides.MaxBytesPerTrace(s.tenantID) + for _, b := range tr.ResourceSpans { + if maxSz > 0 && !s.traceSizes.Allow(traceID, b.Size(), maxSz) { + // Record dropped spans due to trace too large + count := 0 + for _, ss := range b.ScopeSpans { + count += len(ss.Spans) + } + overrides.RecordDiscardedSpans(count, reasonTraceTooLarge, s.tenantID) + continue + } + s.liveTraces.PushWithTimestamp(ts, traceID, b, 0) } return nil diff --git a/modules/blockbuilder/writeable_block.go b/modules/blockbuilder/writeable_block.go index ef028ab02f3..361d163fcb2 100644 --- a/modules/blockbuilder/writeable_block.go +++ b/modules/blockbuilder/writeable_block.go @@ -15,6 +15,7 @@ import ( // Overrides is just the set of overrides needed here. type Overrides interface { + MaxBytesPerTrace(string) int DedicatedColumns(string) backend.DedicatedColumns }