From 7e0240fc07323df3d10aa0977673a02e7ff39f6a Mon Sep 17 00:00:00 2001 From: Jon Bates Date: Wed, 17 Jan 2024 13:37:18 +0000 Subject: [PATCH] kafka receiver exposes some metrics by partition (fixes #30177) (#30268) **Description:** Fixes [#30177](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/30177) The Kafka receiver now exposes the following metrics according to partition. Beforehand, if a collector were consuming from 10 partitions, one of each metric would be rendered, with its value fluctuating according to the state of each partition's consumer. Now the metrics endpoint will expose 10 sets of metrics, each with a `partition` tag. * kafka_receiver_messages * kafka_receiver_current_offset * kafka_receiver_offset_lag **Testing:** * Unit tests were run * Stats endpoint observed manually for correctness * Scraped stats charted in Prometheus to ensure stability Example output: ``` otelcol_kafka_receiver_messages{name="",partition="0",service_instance_id="db3bae97-1856-4b49-85cb-8559db48f345",service_name="otelcontribcol",service_version="0.91.0-dev"} 29 otelcol_kafka_receiver_messages{name="",partition="1",service_instance_id="db3bae97-1856-4b49-85cb-8559db48f345",service_name="otelcontribcol",service_version="0.91.0-dev"} 32 otelcol_kafka_receiver_messages{name="",partition="10",service_instance_id="db3bae97-1856-4b49-85cb-8559db48f345",service_name="otelcontribcol",service_version="0.91.0-dev"} 32 otelcol_kafka_receiver_messages{name="",partition="11",service_instance_id="db3bae97-1856-4b49-85cb-8559db48f345",service_name="otelcontribcol",service_version="0.91.0-dev"} 28 otelcol_kafka_receiver_messages{name="",partition="12",service_instance_id="db3bae97-1856-4b49-85cb-8559db48f345",service_name="otelcontribcol",service_version="0.91.0-dev"} 36 otelcol_kafka_receiver_messages{name="",partition="13",service_instance_id="db3bae97-1856-4b49-85cb-8559db48f345",service_name="otelcontribcol",service_version="0.91.0-dev"} 38 ``` **Documentation:** None added Co-authored-by: Sean Marciniak <30928402+MovieStoreGuy@users.noreply.github.com> --- .chloggen/fix_kafka_receiver_metrics.yaml | 31 +++++++++++++++++++++++ receiver/kafkareceiver/kafka_receiver.go | 17 ++++++++++--- receiver/kafkareceiver/metrics.go | 20 ++++++++------- receiver/kafkareceiver/metrics_test.go | 29 +++++++++++++-------- 4 files changed, 74 insertions(+), 23 deletions(-) create mode 100644 .chloggen/fix_kafka_receiver_metrics.yaml diff --git a/.chloggen/fix_kafka_receiver_metrics.yaml b/.chloggen/fix_kafka_receiver_metrics.yaml new file mode 100644 index 000000000000..b814c7e98825 --- /dev/null +++ b/.chloggen/fix_kafka_receiver_metrics.yaml @@ -0,0 +1,31 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: kafkareceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: The Kafka receiver now exports some partition-specific metrics per-partition, with a `partition` tag + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [30177] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + The following metrics now render per partition: + - kafka_receiver_messages + - kafka_receiver_current_offset + - kafka_receiver_offset_lag + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index f82290dd3b9a..ef9e56541670 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -6,6 +6,7 @@ package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collect import ( "context" "fmt" + "strconv" "sync" "github.com/IBM/sarama" @@ -446,7 +447,10 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe } ctx := c.obsrecv.StartTracesOp(session.Context()) - statsTags := []tag.Mutator{tag.Upsert(tagInstanceName, c.id.String())} + statsTags := []tag.Mutator{ + tag.Upsert(tagInstanceName, c.id.String()), + tag.Upsert(tagPartition, strconv.Itoa(int(claim.Partition()))), + } _ = stats.RecordWithTags(ctx, statsTags, statMessageCount.M(1), statMessageOffset.M(message.Offset), @@ -526,7 +530,10 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS } ctx := c.obsrecv.StartMetricsOp(session.Context()) - statsTags := []tag.Mutator{tag.Upsert(tagInstanceName, c.id.String())} + statsTags := []tag.Mutator{ + tag.Upsert(tagInstanceName, c.id.String()), + tag.Upsert(tagPartition, strconv.Itoa(int(claim.Partition()))), + } _ = stats.RecordWithTags(ctx, statsTags, statMessageCount.M(1), statMessageOffset.M(message.Offset), @@ -610,9 +617,13 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess } ctx := c.obsrecv.StartLogsOp(session.Context()) + statsTags := []tag.Mutator{ + tag.Upsert(tagInstanceName, c.id.String()), + tag.Upsert(tagPartition, strconv.Itoa(int(claim.Partition()))), + } _ = stats.RecordWithTags( ctx, - []tag.Mutator{tag.Upsert(tagInstanceName, c.id.String())}, + statsTags, statMessageCount.M(1), statMessageOffset.M(message.Offset), statMessageOffsetLag.M(claim.HighWaterMarkOffset()-message.Offset-1)) diff --git a/receiver/kafkareceiver/metrics.go b/receiver/kafkareceiver/metrics.go index cdc31f500856..c1dc6d3e7c3f 100644 --- a/receiver/kafkareceiver/metrics.go +++ b/receiver/kafkareceiver/metrics.go @@ -11,6 +11,7 @@ import ( var ( tagInstanceName, _ = tag.NewKey("name") + tagPartition, _ = tag.NewKey("partition") statMessageCount = stats.Int64("kafka_receiver_messages", "Number of received messages", stats.UnitDimensionless) statMessageOffset = stats.Int64("kafka_receiver_current_offset", "Current message offset", stats.UnitDimensionless) @@ -26,13 +27,14 @@ var ( // metricViews return metric views for Kafka receiver. func metricViews() []*view.View { - tagKeys := []tag.Key{tagInstanceName} + partitionAgnosticTagKeys := []tag.Key{tagInstanceName} + partitionSpecificTagKeys := []tag.Key{tagInstanceName, tagPartition} countMessages := &view.View{ Name: statMessageCount.Name(), Measure: statMessageCount, Description: statMessageCount.Description(), - TagKeys: tagKeys, + TagKeys: partitionSpecificTagKeys, Aggregation: view.Sum(), } @@ -40,7 +42,7 @@ func metricViews() []*view.View { Name: statMessageOffset.Name(), Measure: statMessageOffset, Description: statMessageOffset.Description(), - TagKeys: tagKeys, + TagKeys: partitionSpecificTagKeys, Aggregation: view.LastValue(), } @@ -48,7 +50,7 @@ func metricViews() []*view.View { Name: statMessageOffsetLag.Name(), Measure: statMessageOffsetLag, Description: statMessageOffsetLag.Description(), - TagKeys: tagKeys, + TagKeys: partitionSpecificTagKeys, Aggregation: view.LastValue(), } @@ -56,7 +58,7 @@ func metricViews() []*view.View { Name: statPartitionStart.Name(), Measure: statPartitionStart, Description: statPartitionStart.Description(), - TagKeys: tagKeys, + TagKeys: partitionAgnosticTagKeys, Aggregation: view.Sum(), } @@ -64,7 +66,7 @@ func metricViews() []*view.View { Name: statPartitionClose.Name(), Measure: statPartitionClose, Description: statPartitionClose.Description(), - TagKeys: tagKeys, + TagKeys: partitionAgnosticTagKeys, Aggregation: view.Sum(), } @@ -72,7 +74,7 @@ func metricViews() []*view.View { Name: statUnmarshalFailedMetricPoints.Name(), Measure: statUnmarshalFailedMetricPoints, Description: statUnmarshalFailedMetricPoints.Description(), - TagKeys: tagKeys, + TagKeys: partitionAgnosticTagKeys, Aggregation: view.Sum(), } @@ -80,7 +82,7 @@ func metricViews() []*view.View { Name: statUnmarshalFailedLogRecords.Name(), Measure: statUnmarshalFailedLogRecords, Description: statUnmarshalFailedLogRecords.Description(), - TagKeys: tagKeys, + TagKeys: partitionAgnosticTagKeys, Aggregation: view.Sum(), } @@ -88,7 +90,7 @@ func metricViews() []*view.View { Name: statUnmarshalFailedSpans.Name(), Measure: statUnmarshalFailedSpans, Description: statUnmarshalFailedSpans.Description(), - TagKeys: tagKeys, + TagKeys: partitionAgnosticTagKeys, Aggregation: view.Sum(), } diff --git a/receiver/kafkareceiver/metrics_test.go b/receiver/kafkareceiver/metrics_test.go index f29cbf4914fa..27e533aa771a 100644 --- a/receiver/kafkareceiver/metrics_test.go +++ b/receiver/kafkareceiver/metrics_test.go @@ -9,19 +9,26 @@ import ( "github.com/stretchr/testify/assert" ) +type expectedView struct { + name string + tagCount int +} + func TestMetrics(t *testing.T) { metricViews := metricViews() - viewNames := []string{ - "kafka_receiver_messages", - "kafka_receiver_current_offset", - "kafka_receiver_offset_lag", - "kafka_receiver_partition_start", - "kafka_receiver_partition_close", - "kafka_receiver_unmarshal_failed_metric_points", - "kafka_receiver_unmarshal_failed_log_records", - "kafka_receiver_unmarshal_failed_spans", + viewNames := []expectedView{ + {name: "kafka_receiver_messages", tagCount: 2}, + {name: "kafka_receiver_current_offset", tagCount: 2}, + {name: "kafka_receiver_offset_lag", tagCount: 2}, + {name: "kafka_receiver_partition_start", tagCount: 1}, + {name: "kafka_receiver_partition_close", tagCount: 1}, + {name: "kafka_receiver_unmarshal_failed_metric_points", tagCount: 1}, + {name: "kafka_receiver_unmarshal_failed_log_records", tagCount: 1}, + {name: "kafka_receiver_unmarshal_failed_spans", tagCount: 1}, } - for i, viewName := range viewNames { - assert.Equal(t, viewName, metricViews[i].Name) + + for i, expectedView := range viewNames { + assert.Equal(t, expectedView.name, metricViews[i].Name) + assert.Equal(t, expectedView.tagCount, len(metricViews[i].TagKeys)) } }