From f5ed5862bd97d4bc20c3b98f146eca3ec406e500 Mon Sep 17 00:00:00 2001 From: Naireen Hussain Date: Thu, 20 Feb 2025 17:47:29 -0500 Subject: [PATCH] Add histogram parsing in runner v2 (#34017) * Add histogram parsing in runner v2 * address comments --------- Co-authored-by: Naireen --- .../beam/model/pipeline/v1/metrics.proto | 55 +++++++ .../core/metrics/MonitoringInfoEncodings.java | 16 ++ .../metrics/MonitoringInfoEncodingsTest.java | 45 ++++++ .../apache/beam/sdk/util/HistogramData.java | 141 ++++++++++++++++++ 4 files changed, 257 insertions(+) diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto index 33bb5ae729f8..47306fbae023 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto @@ -628,6 +628,61 @@ message BoundedTrie { repeated string singleton = 3; } +// The message type used for encoding Histogram Data +message HistogramValue { + // Number of values recorded in this histogram. + optional int64 count = 1; + + // Describes the bucket boundaries used in the histogram. + optional BucketOptions bucket_options = 2; + + // The number of values in each bucket of the histogram, as described in + // `bucket_options`. `bucket_counts` should contain N values, where N is the + // number of buckets specified in `bucket_options`. If `bucket_counts` has + // fewer than N values, the remaining values are assumed to be 0. + repeated int64 bucket_counts = 3; + + // `BucketOptions` describes the bucket boundaries used in the histogram. + message BucketOptions { + // Linear buckets with the following boundaries for indices in 0 to n-1. + // - i in [0, n-1]: [start + (i)*width, start + (i+1)*width) + message Linear { + // Must be greater than 0. + // + // (-- api-linter: core::0140::prepositions=disabled + // aip.dev/not-precedent: `bucket_count` would cause confusion with + // `bucket_counts` field --) + optional int32 number_of_buckets = 1; + // Distance between bucket boundaries. Must be greater than 0. + optional double width = 2; + // Lower bound of the first bucket. + optional double start = 3; + } + + // Exponential buckets where the growth factor between buckets is + // `2**(2**-scale)`. e.g. for `scale=1` growth factor is + // `2**(2**(-1))=sqrt(2)`. `n` buckets will have the following boundaries. + // - 0th: [0, gf) + // - i in [1, n-1]: [gf^(i), gf^(i+1)) + message Base2Exponent { + // Must be greater than 0. + // + // (-- api-linter: core::0140::prepositions=disabled + // aip.dev/not-precedent: `bucket_count` would cause confusion with + // `bucket_counts` field --) + optional int32 number_of_buckets = 1; + // Must be between -3 and 3. This forces the growth factor of the bucket + // boundaries to be between `2^(1/8)` and `256`. + optional int32 scale = 2; + } + oneof bucket_type { + // Bucket boundaries grow linearly. + Linear linear = 1; + // Bucket boundaries grow exponentially. + Base2Exponent exponential = 2; + } + } +} // General monitored state information which contains structured information // which does not fit into a typical metric format. diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java index b08e00749c48..5de273a11434 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java @@ -21,12 +21,14 @@ import java.io.InputStream; import java.util.Set; import org.apache.beam.model.pipeline.v1.MetricsApi.BoundedTrie; +import org.apache.beam.model.pipeline.v1.MetricsApi.HistogramValue; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.DoubleCoder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.util.ByteStringOutputStream; +import org.apache.beam.sdk.util.HistogramData; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; @@ -179,4 +181,18 @@ public static double decodeDoubleCounter(ByteString payload) { throw new RuntimeException(e); } } + + /** Encodes to {@link MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */ + public static ByteString encodeInt64Histogram(HistogramData inputHistogram) { + return inputHistogram.toProto().toByteString(); + } + + /** Decodes to {@link MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */ + public static HistogramData decodeInt64Histogram(ByteString payload) { + try { + return new HistogramData(HistogramValue.parseFrom(payload)); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java index d4de79225abc..1f020c3a8ce1 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java @@ -22,6 +22,7 @@ import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge; +import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Histogram; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeStringSet; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeBoundedTrie; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeDoubleCounter; @@ -29,6 +30,7 @@ import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Counter; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Distribution; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Gauge; +import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Histogram; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeStringSet; import static org.junit.Assert.assertEquals; @@ -36,17 +38,28 @@ import java.util.Arrays; import java.util.Collections; import org.apache.beam.runners.core.metrics.BoundedTrieData.BoundedTrieNode; +import org.apache.beam.sdk.testing.ExpectedLogs; +import org.apache.beam.sdk.util.HistogramData; +import org.apache.beam.sdk.util.HistogramData.HistogramParsingException; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.joda.time.Instant; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; /** Tests for {@link MonitoringInfoEncodings}. */ @RunWith(JUnit4.class) public class MonitoringInfoEncodingsTest { + @Rule + public ExpectedLogs monitoringInfoCodingsExpectedLogs = + ExpectedLogs.none(MonitoringInfoEncodings.class); + + @Rule public ExpectedException thrown = ExpectedException.none(); + @Test public void testInt64DistributionEncoding() { DistributionData data = DistributionData.create(1L, 2L, 3L, 4L); @@ -143,4 +156,36 @@ public void testDoubleCounterEncoding() { assertEquals(ByteString.copyFrom(new byte[] {0x3f, (byte) 0xf0, 0, 0, 0, 0, 0, 0}), payload); assertEquals(1.0, decodeDoubleCounter(payload), 0.001); } + + @Test + public void testHistgramInt64EncodingLinearHist() { + HistogramData.BucketType buckets = HistogramData.LinearBuckets.of(0, 5, 5); + + HistogramData inputHistogram = new HistogramData(buckets); + inputHistogram.record(5, 10, 15, 20); + ByteString payload = encodeInt64Histogram(inputHistogram); + + assertEquals(inputHistogram, decodeInt64Histogram(payload)); + } + + @Test + public void testHistgramInt64EncodingExpHist() { + HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 10); + HistogramData inputHistogram = new HistogramData(buckets); + inputHistogram.record(2, 4, 8, 16, 32); + ByteString payload = encodeInt64Histogram(inputHistogram); + assertEquals(inputHistogram, decodeInt64Histogram(payload)); + } + + @Test + public void testHistgramInt64EncodingUnsupportedBucket() { + thrown.expect(HistogramParsingException.class); + thrown.expectMessage("Unable to encode Int64 Histogram, bucket is not recognized"); + + HistogramData.BucketType buckets = HistogramData.UnsupportedBuckets.of(); + + HistogramData inputHistogram = new HistogramData(buckets); + inputHistogram.record(2, 4, 8, 16, 32); + encodeInt64Histogram(inputHistogram); + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java index 65ccda06be65..0bfffdf29885 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java @@ -24,6 +24,12 @@ import java.util.Arrays; import java.util.Objects; import javax.annotation.concurrent.GuardedBy; +import org.apache.beam.model.pipeline.v1.MetricsApi.HistogramValue; +import org.apache.beam.model.pipeline.v1.MetricsApi.HistogramValue.BucketOptions; +import org.apache.beam.model.pipeline.v1.MetricsApi.HistogramValue.BucketOptions.Base2Exponent; +import org.apache.beam.model.pipeline.v1.MetricsApi.HistogramValue.BucketOptions.Linear; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.DoubleMath; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.IntMath; import org.checkerframework.checker.nullness.qual.Nullable; @@ -74,6 +80,43 @@ public HistogramData(BucketType bucketType) { this.sumOfSquaredDeviations = 0; } + /** + * Create a histogram from HistogramValue proto. + * + * @param histogramProto HistogramValue proto used to populate stats for the histogram. + */ + public HistogramData(HistogramValue histogramProto) { + int numBuckets; + if (histogramProto.getBucketOptions().hasLinear()) { + System.out.println("xxx its linear"); + double start = histogramProto.getBucketOptions().getLinear().getStart(); + double width = histogramProto.getBucketOptions().getLinear().getWidth(); + numBuckets = histogramProto.getBucketOptions().getLinear().getNumberOfBuckets(); + this.bucketType = LinearBuckets.of(start, width, numBuckets); + this.buckets = new long[bucketType.getNumBuckets()]; + + int idx = 0; + for (long val : histogramProto.getBucketCountsList()) { + this.buckets[idx] = val; + this.numBoundedBucketRecords += val; + idx++; + } + } else { + System.out.println("xxx its exp"); + // Assume it's a exponential histogram if its not linear + int scale = histogramProto.getBucketOptions().getExponential().getScale(); + numBuckets = histogramProto.getBucketOptions().getExponential().getNumberOfBuckets(); + this.bucketType = ExponentialBuckets.of(scale, numBuckets); + this.buckets = new long[bucketType.getNumBuckets()]; + int idx = 0; + for (long val : histogramProto.getBucketCountsList()) { + this.buckets[idx] = val; + this.numBoundedBucketRecords += val; + idx++; + } + } + } + public BucketType getBucketType() { return this.bucketType; } @@ -207,6 +250,10 @@ public synchronized HistogramData getAndReset() { return other; } + public synchronized long[] getBucketCount() { + return buckets; + } + public synchronized void record(double value) { double rangeTo = bucketType.getRangeTo(); double rangeFrom = bucketType.getRangeFrom(); @@ -240,6 +287,64 @@ private synchronized void updateStatistics(double value) { sumOfSquaredDeviations += (value - mean) * (value - oldMean); } + public static class HistogramParsingException extends RuntimeException { + public HistogramParsingException(String message) { + super(message); + } + } + + /** Converts this {@link HistogramData} to its proto {@link HistogramValue}. */ + public synchronized HistogramValue toProto() { + HistogramValue.Builder builder = HistogramValue.newBuilder(); + // try { + int numberOfBuckets = this.getBucketType().getNumBuckets(); + + if (this.getBucketType() instanceof HistogramData.LinearBuckets) { + System.out.println("xxx linear buckets"); + HistogramData.LinearBuckets buckets = (HistogramData.LinearBuckets) this.getBucketType(); + Linear.Builder linearBuilder = Linear.newBuilder(); + linearBuilder.setNumberOfBuckets(numberOfBuckets); + linearBuilder.setWidth(buckets.getWidth()); + linearBuilder.setStart(buckets.getStart()); + Linear linearOptions = linearBuilder.build(); + + BucketOptions.Builder bucketBuilder = BucketOptions.newBuilder(); + bucketBuilder.setLinear(linearOptions); + builder.setBucketOptions(bucketBuilder.build()); + + } else if (this.getBucketType() instanceof HistogramData.ExponentialBuckets) { + System.out.println("xxx exp buckets"); + HistogramData.ExponentialBuckets buckets = + (HistogramData.ExponentialBuckets) this.getBucketType(); + + Base2Exponent.Builder base2ExpBuilder = Base2Exponent.newBuilder(); + base2ExpBuilder.setNumberOfBuckets(numberOfBuckets); + base2ExpBuilder.setScale(buckets.getScale()); + Base2Exponent exponentialOptions = base2ExpBuilder.build(); + + BucketOptions.Builder bucketBuilder = BucketOptions.newBuilder(); + bucketBuilder.setExponential(exponentialOptions); + builder.setBucketOptions(bucketBuilder.build()); + } else { + throw new HistogramParsingException( + "Unable to encode Int64 Histogram, bucket is not recognized"); + } + + builder.setCount(this.getTotalCount()); + + for (long val : this.getBucketCount()) { + builder.addBucketCounts(val); + } + System.out.println("xxxx " + builder.toString()); + return builder.build(); + } + + // /** Creates a {@link HistogramData} instance from its proto {@link HistogramValue}. */ + // public static HistogramData fromProto(HistogramValue proto) { + // HistgramValue value = new HistgramValue(); + // return new HistogramValue(proto); + // } + /** * Increment the {@code numTopRecords} and update {@code topRecordsSum} when a new overflow value * is recorded. This function should only be called when a Histogram is recording a value greater @@ -573,6 +678,42 @@ public double getRangeTo() { // Note: equals() and hashCode() are implemented by the AutoValue. } + /** Used for testing unsupported Bucket formats. */ + @AutoValue + @Internal + @VisibleForTesting + public abstract static class UnsupportedBuckets implements BucketType { + + public static UnsupportedBuckets of() { + return new AutoValue_HistogramData_UnsupportedBuckets(0); + } + + @Override + public int getBucketIndex(double value) { + return 0; + } + + @Override + public double getBucketSize(int index) { + return 0; + } + + @Override + public double getAccumulatedBucketSize(int index) { + return 0; + } + + @Override + public double getRangeFrom() { + return 0; + } + + @Override + public double getRangeTo() { + return 0; + } + } + @Override public synchronized boolean equals(@Nullable Object object) { if (object instanceof HistogramData) {