diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java index 18c3729bffcf..53a9dab4d393 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java @@ -294,11 +294,11 @@ public MetricUpdates getUpdates() { .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, metricKey.stepName()); } - // Based on namespace, add per worker metrics label to enable separate runner based sink based - // processing. - if (metricName.getNamespace().equals("BigQuerySink") - || metricName.getNamespace().equals("KafkaSink")) { - builder.setLabel(MonitoringInfoConstants.Labels.PER_WORKER_METRIC, "true"); + // Add any metricKey labels to the monitoringInfoLabels + if (!metricName.getLabels().isEmpty()) { + for (Map.Entry entry : metricName.getLabels().entrySet()) { + builder.setLabel(entry.getKey(), entry.getValue()); + } } return builder; } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoMetricName.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoMetricName.java index 2be3aaec2987..797632b51824 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoMetricName.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoMetricName.java @@ -80,6 +80,7 @@ public String getUrn() { } /** @return The labels associated with this MonitoringInfo. */ + @Override public Map getLabels() { return this.labels; } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java index bc9ae87fff00..a226ce0eea69 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java @@ -242,15 +242,16 @@ public void testMonitoringInfosArePopulatedForUserCounters() { } @Test - public void testMonitoringInfosLabelsArePopulatedForSinkCounter() { + public void testMonitoringInfosLabelsArePopulatedForMetricNamesWithLabels() { MetricsContainerImpl testObject = new MetricsContainerImpl("step1"); - CounterCell c1 = testObject.getCounter(MetricName.named("KafkaSink", "name1")); + + HashMap labelMap = new HashMap<>(); + labelMap.put("PER_WORKER_METRIC", "true"); + CounterCell c1 = testObject.getCounter(MetricName.named("KafkaSink", "name1", labelMap)); CounterCell c2 = testObject.getCounter(MetricName.named("BigQuerySink", "name2")); - CounterCell c3 = testObject.getCounter(MetricName.named("PS", "name3")); c1.inc(2L); c2.inc(4L); - c3.inc(5L); SimpleMonitoringInfoBuilder builder1 = new SimpleMonitoringInfoBuilder(); builder1 @@ -266,27 +267,15 @@ public void testMonitoringInfosLabelsArePopulatedForSinkCounter() { .setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64) .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "BigQuerySink") .setLabel(MonitoringInfoConstants.Labels.NAME, "name2") - .setLabel(MonitoringInfoConstants.Labels.PER_WORKER_METRIC, "true") .setInt64SumValue(4) .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1"); - // Not in an supported namespace, so extra metadata isn't added. - SimpleMonitoringInfoBuilder builder3 = new SimpleMonitoringInfoBuilder(); - builder3 - .setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64) - .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "PS") - .setLabel(MonitoringInfoConstants.Labels.NAME, "name3") - .setInt64SumValue(5) - .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1"); - ArrayList actualMonitoringInfos = new ArrayList(); for (MonitoringInfo mi : testObject.getMonitoringInfos()) { actualMonitoringInfos.add(mi); } - assertThat( - actualMonitoringInfos, - containsInAnyOrder(builder1.build(), builder2.build(), builder3.build())); + assertThat(actualMonitoringInfos, containsInAnyOrder(builder1.build(), builder2.build())); } @Test diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/LabeledMetricNameUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/LabeledMetricNameUtils.java index 72b303632331..ef22d4e53e83 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/LabeledMetricNameUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/LabeledMetricNameUtils.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.metrics; import com.google.auto.value.AutoValue; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -42,6 +43,7 @@ public class LabeledMetricNameUtils { */ public static class MetricNameBuilder { private final StringBuilder labeledNameBuilder; + private HashMap metricLabels = new HashMap(); private MetricNameBuilder(String baseName) { this.labeledNameBuilder = new StringBuilder(baseName + METRIC_NAME_DELIMITER); @@ -63,8 +65,12 @@ public void addLabel(String key, String value) { .append(LABEL_DELIMITER); } + public void addMetricLabel(String key, String value) { + this.metricLabels.put(key, value); + } + public MetricName build(String metricNamespace) { - return MetricName.named(metricNamespace, labeledNameBuilder.toString()); + return MetricName.named(metricNamespace, labeledNameBuilder.toString(), this.metricLabels); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java index 31607d3330e0..74732bd0248a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java @@ -22,6 +22,8 @@ import com.google.auto.value.AutoValue; import com.google.auto.value.extension.memoized.Memoized; import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; /** @@ -39,6 +41,9 @@ public abstract class MetricName implements Serializable { /** The name of this metric. */ public abstract String getName(); + /** Associated labels for the metric. */ + public Map getLabels(){} + @Override @Memoized public String toString() { @@ -48,12 +53,24 @@ public String toString() { public static MetricName named(String namespace, String name) { checkArgument(!Strings.isNullOrEmpty(namespace), "Metric namespace must be non-empty"); checkArgument(!Strings.isNullOrEmpty(name), "Metric name must be non-empty"); - return new AutoValue_MetricName(namespace, name); + return new AutoValue_MetricName(namespace, name, new HashMap()); + } + + public static MetricName named(String namespace, String name, HashMap labels) { + checkArgument(!Strings.isNullOrEmpty(namespace), "Metric namespace must be non-empty"); + checkArgument(!Strings.isNullOrEmpty(name), "Metric name must be non-empty"); + return new AutoValue_MetricName(namespace, name, labels); } public static MetricName named(Class namespace, String name) { checkArgument(namespace != null, "Metric namespace must be non-null"); checkArgument(!Strings.isNullOrEmpty(name), "Metric name must be non-empty"); - return new AutoValue_MetricName(namespace.getName(), name); + return new AutoValue_MetricName(namespace.getName(), name, new HashMap()); + } + + public static MetricName named(Class namespace, String name, HashMap labels) { + checkArgument(namespace != null, "Metric namespace must be non-null"); + checkArgument(!Strings.isNullOrEmpty(name), "Metric name must be non-empty"); + return new AutoValue_MetricName(namespace.getName(), name, labels); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java index 92ab977674f3..7af782099c20 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java @@ -164,6 +164,14 @@ public static Gauge gauge(Class namespace, String name) { return new DelegatingGauge(MetricName.named(namespace, name)); } + /** + * Create a metric that can have its new value set, and is aggregated by taking the last reported + * value. + */ + public static Gauge gauge(MetricName metricName) { + return new DelegatingGauge(metricName); + } + /** Create a metric that accumulates and reports set of unique string values. */ public static StringSet stringSet(String namespace, String name) { return new DelegatingStringSet(MetricName.named(namespace, name)); diff --git a/sdks/java/io/kafka/build.gradle b/sdks/java/io/kafka/build.gradle index 04563c478d6d..28760addd89e 100644 --- a/sdks/java/io/kafka/build.gradle +++ b/sdks/java/io/kafka/build.gradle @@ -52,6 +52,7 @@ dependencies { provided library.java.jackson_dataformat_csv permitUnusedDeclared library.java.jackson_dataformat_csv implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation project(path: ":runners:core-java") implementation project(path: ":model:pipeline", configuration: "shadow") implementation project(":sdks:java:extensions:avro") implementation project(":sdks:java:extensions:protobuf") diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java index c26c516eabb6..bbc68488e4b6 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java @@ -173,10 +173,9 @@ private void recordBacklogBytesInternal() { */ @Override public void recordBacklogBytes(String topicName, int partitionId, long backlogBytes) { - Gauge perPartion = - Metrics.gauge( - "KafkaSink", KafkaSinkMetrics.getMetricGaugeName(topicName, partitionId).getName()); - perPartion.set(backlogBytes); + Gauge perPartition = + Metrics.gauge(KafkaSinkMetrics.getMetricGaugeName(topicName, partitionId)); + perPartition.set(backlogBytes); } /** diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java index 6ef6594766ed..1e4149dab541 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.kafka; +import org.apache.beam.runners.core.metrics.MonitoringInfoConstants; import org.apache.beam.sdk.metrics.DelegatingGauge; import org.apache.beam.sdk.metrics.DelegatingHistogram; import org.apache.beam.sdk.metrics.Gauge; @@ -114,6 +115,7 @@ public static MetricName getMetricGaugeName(String topic, int partitionId) { LabeledMetricNameUtils.MetricNameBuilder.baseNameBuilder(ESTIMATED_BACKLOG_SIZE); nameBuilder.addLabel(PARTITION_ID, String.valueOf(partitionId)); nameBuilder.addLabel(TOPIC_LABEL, topic); + nameBuilder.addMetricLabel(MonitoringInfoConstants.Labels.PER_WORKER_METRIC, "true"); return nameBuilder.build(METRICS_NAMESPACE); } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index 22b2a4d2df36..6d5b706a987a 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -332,6 +332,7 @@ public long getSplitBacklogBytes() { if (pBacklog == UnboundedReader.BACKLOG_UNKNOWN) { return UnboundedReader.BACKLOG_UNKNOWN; } + backlogBytes += pBacklog; } return backlogBytes; diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index 3d6cc910d009..b114dcc16714 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -549,6 +549,7 @@ public ProcessContinuation processElement( } } } + backlogBytes.set( (long) (BigDecimal.valueOf( diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetricsTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetricsTest.java index 625a75c5316b..f384f3add9c2 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetricsTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetricsTest.java @@ -20,6 +20,8 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import java.util.HashMap; +import org.apache.beam.sdk.metrics.Gauge; import org.apache.beam.sdk.metrics.Histogram; import org.apache.beam.sdk.metrics.MetricName; import org.junit.Test; @@ -40,4 +42,18 @@ public void testCreatingHistogram() throws Exception { MetricName.named("KafkaSink", "RpcLatency*rpc_method:POLL;topic_name:topic1;"); assertThat(histogram.getName(), equalTo(histogramName)); } + + @Test + public void testCreatingBacklogGauge() throws Exception { + + Gauge gauge = KafkaSinkMetrics.createBacklogGauge("topic", /*partitionId*/ 0); + + HashMap labelMap = new HashMap<>(); + labelMap.put("PER_WORKER_METRIC", "true"); + MetricName gaugeName = + MetricName.named( + "KafkaSink", "EstimatedBacklogSize*partition_id:0;topic_name:topic;", labelMap); + + assertThat(gauge.getName(), equalTo(gaugeName)); + } }