diff --git a/cmd/tempo/app/modules.go b/cmd/tempo/app/modules.go index 8a53d8a0054..930392ce2f1 100644 --- a/cmd/tempo/app/modules.go +++ b/cmd/tempo/app/modules.go @@ -338,6 +338,10 @@ func (t *App) initGenerator() (services.Service, error) { } func (t *App) initBlockBuilder() (services.Service, error) { + if !t.cfg.Ingest.Enabled { + return services.NewIdleService(nil, nil), nil + } + t.cfg.BlockBuilder.IngestStorageConfig = t.cfg.Ingest t.cfg.BlockBuilder.IngestStorageConfig.Kafka.ConsumerGroup = blockbuilder.ConsumerGroup diff --git a/docs/sources/tempo/configuration/manifest.md b/docs/sources/tempo/configuration/manifest.md index ed0e82bbb57..9e8513cb678 100644 --- a/docs/sources/tempo/configuration/manifest.md +++ b/docs/sources/tempo/configuration/manifest.md @@ -183,6 +183,22 @@ distributor: cost_attribution: max_cardinality: 10000 stale_duration: 15m0s + kafka_write_path_enabled: false + kafka_config: + address: "" + topic: "" + client_id: "" + dial_timeout: 0s + write_timeout: 0s + consumer_group: "" + consumer_group_offset_commit_interval: 0s + last_produced_offset_retry_timeout: 0s + auto_create_topic_enabled: false + auto_create_topic_default_partitions: 0 + producer_max_record_size_bytes: 0 + producer_max_buffered_bytes: 0 + target_consumer_lag_at_startup: 0s + max_consumer_lag_at_startup: 0s extend_writes: true retry_after_on_resource_exhausted: 0s max_span_attr_byte: 2048 @@ -439,6 +455,40 @@ ingester: address: "" port: 0 id: hostname + partition_ring: + kvstore: + store: memberlist + prefix: collectors/ + consul: + host: localhost:8500 + acl_token: "" + http_client_timeout: 20s + consistent_reads: false + watch_rate_limit: 1 + watch_burst_size: 1 + cas_retry_delay: 1s + etcd: + endpoints: [] + dial_timeout: 10s + max_retries: 10 + tls_enabled: false + tls_cert_path: "" + tls_key_path: "" + tls_ca_path: "" + tls_server_name: "" + tls_insecure_skip_verify: false + tls_cipher_suites: "" + tls_min_version: "" + username: "" + password: "" + multi: + primary: "" + secondary: "" + mirror_enabled: false + mirror_timeout: 2s + min_partition_owners_count: 1 + min_partition_owners_duration: 10s + delete_inactive_partition_after: 13h0m0s concurrent_flushes: 4 flush_check_period: 10s flush_op_timeout: 5m0s @@ -600,6 +650,28 @@ metrics_generator: metrics_ingestion_time_range_slack: 30s query_timeout: 30s override_ring_key: metrics-generator + assigned_partitions: [] +ingest: + enabled: false + kafka: + address: localhost:9092 + topic: "" + client_id: "" + dial_timeout: 2s + write_timeout: 10s + consumer_group: "" + consumer_group_offset_commit_interval: 1s + last_produced_offset_retry_timeout: 10s + auto_create_topic_enabled: true + auto_create_topic_default_partitions: 1000 + producer_max_record_size_bytes: 15983616 + producer_max_buffered_bytes: 1073741824 + target_consumer_lag_at_startup: 2s + max_consumer_lag_at_startup: 15s +block_builder: + assigned_partitions: [] + consume_cycle_duration: 5m0s + lookback_on_no_commit: 12h0m0s storage: trace: pool: diff --git a/modules/blockbuilder/partition_writer.go b/modules/blockbuilder/partition_writer.go index f35b24a44d3..349180c5af6 100644 --- a/modules/blockbuilder/partition_writer.go +++ b/modules/blockbuilder/partition_writer.go @@ -53,7 +53,7 @@ func (p *writer) pushBytes(tenant string, req *tempopb.PushBytesRequest) error { "msg", "pushing bytes", "tenant", tenant, "num_traces", len(req.Traces), - "id", util.TraceIDToHexString(req.Ids[0].Slice), + "id", util.TraceIDToHexString(req.Ids[0]), ) i, err := p.instanceForTenant(tenant) @@ -67,7 +67,7 @@ func (p *writer) pushBytes(tenant string, req *tempopb.PushBytesRequest) error { return fmt.Errorf("failed to unmarshal trace: %w", err) } - if err := i.AppendTrace(req.Ids[j].Slice, tr, 0, 0); err != nil { + if err := i.AppendTrace(req.Ids[j], tr, 0, 0); err != nil { return err } } diff --git a/modules/distributor/distributor.go b/modules/distributor/distributor.go index fd0388e57f8..160d8c9b8a3 100644 --- a/modules/distributor/distributor.go +++ b/modules/distributor/distributor.go @@ -593,12 +593,12 @@ func (d *Distributor) sendToKafka(ctx context.Context, userID string, keys []uin req := &tempopb.PushBytesRequest{ Traces: make([]tempopb.PreallocBytes, len(indexes)), - Ids: make([]tempopb.PreallocBytes, len(indexes)), + Ids: make([][]byte, len(indexes)), } for i, j := range indexes { req.Traces[i].Slice = marshalledTraces[j][0:] - req.Ids[i].Slice = traces[j].id + req.Ids[i] = traces[j].id } // The partition ID is stored in the ring.InstanceDesc ID. diff --git a/pkg/ingest/encoding.go b/pkg/ingest/encoding.go index 42d14fcfd0a..b5b9cf5efab 100644 --- a/pkg/ingest/encoding.go +++ b/pkg/ingest/encoding.go @@ -25,7 +25,7 @@ func encoderPoolGet() *tempopb.PushBytesRequest { return &tempopb.PushBytesRequest{ Traces: make([]tempopb.PreallocBytes, 0, 10), - Ids: make([]tempopb.PreallocBytes, 0, 10), + Ids: make([][]byte, 0, 10), } } @@ -54,7 +54,7 @@ func Encode(partitionID int32, tenantID string, req *tempopb.PushBytesRequest, m currentSize := 0 for i, entry := range req.Traces { - l := entry.Size() + req.Ids[i].Size() + l := entry.Size() + len(req.Ids[i]) // Size of the entry in the req entrySize := 1 + l + sovPush(uint64(l)) diff --git a/pkg/ingest/encoding_test.go b/pkg/ingest/encoding_test.go index cf64e8607bf..9933a464acb 100644 --- a/pkg/ingest/encoding_test.go +++ b/pkg/ingest/encoding_test.go @@ -44,7 +44,7 @@ func TestEncoderDecoder(t *testing.T) { } var decodedEntries []tempopb.PreallocBytes - var decodedIDs []tempopb.PreallocBytes + var decodedIDs [][]byte for _, record := range records { decoder.Reset() @@ -115,12 +115,12 @@ func BenchmarkEncodeDecode(b *testing.B) { func generateRequest(entries, lineLength int) *tempopb.PushBytesRequest { stream := &tempopb.PushBytesRequest{ Traces: make([]tempopb.PreallocBytes, entries), - Ids: make([]tempopb.PreallocBytes, entries), + Ids: make([][]byte, entries), } for i := 0; i < entries; i++ { stream.Traces[i].Slice = generateRandomString(lineLength) - stream.Ids[i].Slice = generateRandomString(lineLength) + stream.Ids[i] = generateRandomString(lineLength) } return stream diff --git a/pkg/util/test/req.go b/pkg/util/test/req.go index 22f9c0ad175..7e38e194342 100644 --- a/pkg/util/test/req.go +++ b/pkg/util/test/req.go @@ -373,10 +373,10 @@ func MakePushBytesRequest(t *testing.T, requests int, traceID []byte) *tempopb.P req := &tempopb.PushBytesRequest{ Traces: make([]tempopb.PreallocBytes, 0), - Ids: make([]tempopb.PreallocBytes, 0), + Ids: make([][]byte, 0), } req.Traces = append(req.Traces, tempopb.PreallocBytes{Slice: b}) - req.Ids = append(req.Ids, tempopb.PreallocBytes{Slice: traceID}) + req.Ids = append(req.Ids, traceID) return req }