Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Naireen committed Feb 21, 2025
1 parent 83718b3 commit 1d17345
Show file tree
Hide file tree
Showing 12 changed files with 70 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,11 @@ public MetricUpdates getUpdates() {
.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, metricKey.stepName());
}

// Based on namespace, add per worker metrics label to enable separate runner based sink based
// processing.
if (metricName.getNamespace().equals("BigQuerySink")
|| metricName.getNamespace().equals("KafkaSink")) {
builder.setLabel(MonitoringInfoConstants.Labels.PER_WORKER_METRIC, "true");
// Add any metricKey labels to the monitoringInfoLabels
if (!metricName.getLabels().isEmpty()) {
for (Map.Entry<String, String> entry : metricName.getLabels().entrySet()) {
builder.setLabel(entry.getKey(), entry.getValue());
}
}
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public String getUrn() {
}

/** @return The labels associated with this MonitoringInfo. */
@Override
public Map<String, String> getLabels() {
return this.labels;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,15 +242,16 @@ public void testMonitoringInfosArePopulatedForUserCounters() {
}

@Test
public void testMonitoringInfosLabelsArePopulatedForSinkCounter() {
public void testMonitoringInfosLabelsArePopulatedForMetricNamesWithLabels() {
MetricsContainerImpl testObject = new MetricsContainerImpl("step1");
CounterCell c1 = testObject.getCounter(MetricName.named("KafkaSink", "name1"));

HashMap<String, String> labelMap = new HashMap<>();
labelMap.put("PER_WORKER_METRIC", "true");
CounterCell c1 = testObject.getCounter(MetricName.named("KafkaSink", "name1", labelMap));
CounterCell c2 = testObject.getCounter(MetricName.named("BigQuerySink", "name2"));
CounterCell c3 = testObject.getCounter(MetricName.named("PS", "name3"));

c1.inc(2L);
c2.inc(4L);
c3.inc(5L);

SimpleMonitoringInfoBuilder builder1 = new SimpleMonitoringInfoBuilder();
builder1
Expand All @@ -266,27 +267,15 @@ public void testMonitoringInfosLabelsArePopulatedForSinkCounter() {
.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "BigQuerySink")
.setLabel(MonitoringInfoConstants.Labels.NAME, "name2")
.setLabel(MonitoringInfoConstants.Labels.PER_WORKER_METRIC, "true")
.setInt64SumValue(4)
.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1");

// Not in an supported namespace, so extra metadata isn't added.
SimpleMonitoringInfoBuilder builder3 = new SimpleMonitoringInfoBuilder();
builder3
.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "PS")
.setLabel(MonitoringInfoConstants.Labels.NAME, "name3")
.setInt64SumValue(5)
.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1");

ArrayList<MonitoringInfo> actualMonitoringInfos = new ArrayList<MonitoringInfo>();
for (MonitoringInfo mi : testObject.getMonitoringInfos()) {
actualMonitoringInfos.add(mi);
}

assertThat(
actualMonitoringInfos,
containsInAnyOrder(builder1.build(), builder2.build(), builder3.build()));
assertThat(actualMonitoringInfos, containsInAnyOrder(builder1.build(), builder2.build()));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.metrics;

import com.google.auto.value.AutoValue;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -42,6 +43,7 @@ public class LabeledMetricNameUtils {
*/
public static class MetricNameBuilder {
private final StringBuilder labeledNameBuilder;
private HashMap<String, String> metricLabels = new HashMap<String, String>();

private MetricNameBuilder(String baseName) {
this.labeledNameBuilder = new StringBuilder(baseName + METRIC_NAME_DELIMITER);
Expand All @@ -63,8 +65,12 @@ public void addLabel(String key, String value) {
.append(LABEL_DELIMITER);
}

public void addMetricLabel(String key, String value) {
this.metricLabels.put(key, value);
}

public MetricName build(String metricNamespace) {
return MetricName.named(metricNamespace, labeledNameBuilder.toString());
return MetricName.named(metricNamespace, labeledNameBuilder.toString(), this.metricLabels);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.google.auto.value.AutoValue;
import com.google.auto.value.extension.memoized.Memoized;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;

/**
Expand All @@ -39,6 +41,9 @@ public abstract class MetricName implements Serializable {
/** The name of this metric. */
public abstract String getName();

/** Associated labels for the metric. */
public Map<String, String> getLabels(){}

@Override
@Memoized
public String toString() {
Expand All @@ -48,12 +53,24 @@ public String toString() {
public static MetricName named(String namespace, String name) {
checkArgument(!Strings.isNullOrEmpty(namespace), "Metric namespace must be non-empty");
checkArgument(!Strings.isNullOrEmpty(name), "Metric name must be non-empty");
return new AutoValue_MetricName(namespace, name);
return new AutoValue_MetricName(namespace, name, new HashMap<String, String>());
}

public static MetricName named(String namespace, String name, HashMap<String, String> labels) {
checkArgument(!Strings.isNullOrEmpty(namespace), "Metric namespace must be non-empty");
checkArgument(!Strings.isNullOrEmpty(name), "Metric name must be non-empty");
return new AutoValue_MetricName(namespace, name, labels);
}

public static MetricName named(Class<?> namespace, String name) {
checkArgument(namespace != null, "Metric namespace must be non-null");
checkArgument(!Strings.isNullOrEmpty(name), "Metric name must be non-empty");
return new AutoValue_MetricName(namespace.getName(), name);
return new AutoValue_MetricName(namespace.getName(), name, new HashMap<String, String>());
}

public static MetricName named(Class<?> namespace, String name, HashMap<String, String> labels) {
checkArgument(namespace != null, "Metric namespace must be non-null");
checkArgument(!Strings.isNullOrEmpty(name), "Metric name must be non-empty");
return new AutoValue_MetricName(namespace.getName(), name, labels);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,14 @@ public static Gauge gauge(Class<?> namespace, String name) {
return new DelegatingGauge(MetricName.named(namespace, name));
}

/**
* Create a metric that can have its new value set, and is aggregated by taking the last reported
* value.
*/
public static Gauge gauge(MetricName metricName) {
return new DelegatingGauge(metricName);
}

/** Create a metric that accumulates and reports set of unique string values. */
public static StringSet stringSet(String namespace, String name) {
return new DelegatingStringSet(MetricName.named(namespace, name));
Expand Down
1 change: 1 addition & 0 deletions sdks/java/io/kafka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ dependencies {
provided library.java.jackson_dataformat_csv
permitUnusedDeclared library.java.jackson_dataformat_csv
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":runners:core-java")
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(":sdks:java:extensions:avro")
implementation project(":sdks:java:extensions:protobuf")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,9 @@ private void recordBacklogBytesInternal() {
*/
@Override
public void recordBacklogBytes(String topicName, int partitionId, long backlogBytes) {
Gauge perPartion =
Metrics.gauge(
"KafkaSink", KafkaSinkMetrics.getMetricGaugeName(topicName, partitionId).getName());
perPartion.set(backlogBytes);
Gauge perPartition =
Metrics.gauge(KafkaSinkMetrics.getMetricGaugeName(topicName, partitionId));
perPartition.set(backlogBytes);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.kafka;

import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.sdk.metrics.DelegatingGauge;
import org.apache.beam.sdk.metrics.DelegatingHistogram;
import org.apache.beam.sdk.metrics.Gauge;
Expand Down Expand Up @@ -114,6 +115,7 @@ public static MetricName getMetricGaugeName(String topic, int partitionId) {
LabeledMetricNameUtils.MetricNameBuilder.baseNameBuilder(ESTIMATED_BACKLOG_SIZE);
nameBuilder.addLabel(PARTITION_ID, String.valueOf(partitionId));
nameBuilder.addLabel(TOPIC_LABEL, topic);
nameBuilder.addMetricLabel(MonitoringInfoConstants.Labels.PER_WORKER_METRIC, "true");
return nameBuilder.build(METRICS_NAMESPACE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ public long getSplitBacklogBytes() {
if (pBacklog == UnboundedReader.BACKLOG_UNKNOWN) {
return UnboundedReader.BACKLOG_UNKNOWN;
}
backlogBytes += pBacklog;
}

return backlogBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ public ProcessContinuation processElement(
}
}
}

backlogBytes.set(
(long)
(BigDecimal.valueOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

import java.util.HashMap;
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.Histogram;
import org.apache.beam.sdk.metrics.MetricName;
import org.junit.Test;
Expand All @@ -40,4 +42,18 @@ public void testCreatingHistogram() throws Exception {
MetricName.named("KafkaSink", "RpcLatency*rpc_method:POLL;topic_name:topic1;");
assertThat(histogram.getName(), equalTo(histogramName));
}

@Test
public void testCreatingBacklogGauge() throws Exception {

Gauge gauge = KafkaSinkMetrics.createBacklogGauge("topic", /*partitionId*/ 0);

HashMap<String, String> labelMap = new HashMap<>();
labelMap.put("PER_WORKER_METRIC", "true");
MetricName gaugeName =
MetricName.named(
"KafkaSink", "EstimatedBacklogSize*partition_id:0;topic_name:topic;", labelMap);

assertThat(gauge.getName(), equalTo(gaugeName));
}
}

0 comments on commit 1d17345

Please sign in to comment.