diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto index fc9e33b41da..9d4459ac13c 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto @@ -379,13 +379,13 @@ message MonitoringInfoSpecs { ] }]; - USER_PER_WORKER_HISTOGRAM = 22 [(monitoring_info_spec) = { - urn: "beam:metric:user:per_worker_histogram_int64:v1", - type: "beam:metrics:per_worker_histogram_int64:v1", + USER_HISTOGRAM = 23 [(monitoring_info_spec) = { + urn: "beam:metric:user:histogram_int64:v1", + type: "beam:metrics:histogram_int64:v1", required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"], annotations: [{ key: "description", - value: "URN utilized to report per worker histogram metric." + value: "URN utilized to report histogram metric." }] }]; } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DefaultMetricResults.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DefaultMetricResults.java index 183757e55af..8d562781422 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DefaultMetricResults.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DefaultMetricResults.java @@ -45,7 +45,7 @@ public class DefaultMetricResults extends MetricResults { private final Iterable> gauges; private final Iterable> stringSets; private final Iterable> boundedTries; - private final Iterable> perWorkerHistograms; + private final Iterable> histograms; public DefaultMetricResults( Iterable> counters, @@ -53,13 +53,13 @@ public DefaultMetricResults( Iterable> gauges, Iterable> stringSets, Iterable> boundedTries, - Iterable> perWorkerHistograms) { + Iterable> histograms) { this.counters = counters; this.distributions = distributions; this.gauges = gauges; this.stringSets = stringSets; this.boundedTries = boundedTries; - this.perWorkerHistograms = perWorkerHistograms; + this.histograms = histograms; } @Override @@ -72,10 +72,6 @@ public MetricQueryResults queryMetrics(@Nullable MetricsFilter filter) { Iterables.filter( stringSets, stringSets -> MetricFiltering.matches(filter, stringSets.getKey())), Iterables.filter( - boundedTries, boundedTries -> MetricFiltering.matches(filter, boundedTries.getKey())), - Iterables.filter( - perWorkerHistograms, - perWorkerHistogram -> MetricFiltering.matches(filter, perWorkerHistogram.getKey()))); - + histograms, histogram -> MetricFiltering.matches(filter, histogram.getKey()))); } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUpdates.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUpdates.java index 22800f9ce5f..8914766e973 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUpdates.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUpdates.java @@ -72,7 +72,7 @@ public static MetricUpdate create(MetricKey key, T update) { public abstract Iterable> boundedTrieUpdates(); /** All the histogram updates. */ - public abstract Iterable> perWorkerHistogramsUpdates(); + public abstract Iterable> histogramsUpdates(); /** Create a new {@link MetricUpdates} bundle. */ public static MetricUpdates create( @@ -81,13 +81,9 @@ public static MetricUpdates create( Iterable> gaugeUpdates, Iterable> stringSetUpdates, Iterable> boundedTrieUpdates, - Iterable> perWorkerHistogramsUpdates) { + Iterable> histogramsUpdates) { return new AutoValue_MetricUpdates( - counterUpdates, - distributionUpdates, - gaugeUpdates, - stringSetUpdates, boundedTrieUpdates, - perWorkerHistogramsUpdates); + counterUpdates, distributionUpdates, gaugeUpdates, stringSetUpdates, boundedTrieUpdates, histogramsUpdates); } /** Returns true if there are no updates in this MetricUpdates object. */ @@ -97,6 +93,6 @@ public boolean isEmpty() { && Iterables.isEmpty(gaugeUpdates()) && Iterables.isEmpty(stringSetUpdates()) && Iterables.isEmpty(boundedTrieUpdates()) - && Iterables.isEmpty(perWorkerHistogramsUpdates()); + && Iterables.isEmpty(histogramsUpdates()); } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java index 6d6b399dad8..c4eb26d9fef 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java @@ -19,8 +19,8 @@ import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.BOUNDED_TRIE_TYPE; import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE; +import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.HISTOGRAM_TYPE; import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE; -import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.PER_WORKER_HISTOGRAM_TYPE; import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SET_STRING_TYPE; import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeBoundedTrie; @@ -97,9 +97,6 @@ public class MetricsContainerImpl implements Serializable, MetricsContainer { private MetricsMap stringSets = new MetricsMap<>(StringSetCell::new); - private MetricsMap, HistogramCell> perWorkerHistograms = - new MetricsMap<>(HistogramCell::new); - private MetricsMap, HistogramCell> histograms = new MetricsMap<>(HistogramCell::new); @@ -247,20 +244,8 @@ public BoundedTrieCell getBoundedTrie(MetricName metricName) { return boundedTries.tryGet(metricName); } - /** - * Return the {@link Histogram} that should be used for implementing the given per-worker {@code - * metricName} in this container. - */ - @Override - public HistogramCell getPerWorkerHistogram( - MetricName metricName, HistogramData.BucketType bucketType) { - HistogramCell val = perWorkerHistograms.get(KV.of(metricName, bucketType)); - return val; - } - - public MetricsMap, HistogramCell> - getPerWorkerHistogram() { - return perWorkerHistograms; + public MetricsMap, HistogramCell> getHistogram() { + return histograms; } private > @@ -303,7 +288,7 @@ public MetricUpdates getUpdates() { extractUpdates(gauges), extractUpdates(stringSets), extractUpdates(boundedTries), - extractHistogramUpdates(perWorkerHistograms)); + extractHistogramUpdates(histograms)); } /** @return The MonitoringInfo metadata from the metric. */ @@ -434,8 +419,8 @@ public MetricUpdates getUpdates() { private @Nullable SimpleMonitoringInfoBuilder histogramToMonitoringMetadata(MetricKey metricKey) { return metricToMonitoringMetadata( metricKey, - MonitoringInfoConstants.TypeUrns.PER_WORKER_HISTOGRAM_TYPE, - MonitoringInfoConstants.Urns.USER_PER_WORKER_HISTOGRAM); + MonitoringInfoConstants.TypeUrns.HISTOGRAM_TYPE, + MonitoringInfoConstants.Urns.USER_HISTOGRAM); } /** @@ -507,7 +492,7 @@ public Iterable getMonitoringInfos() { } } - for (MetricUpdate metricUpdate : metricUpdates.perWorkerHistogramsUpdates()) { + for (MetricUpdate metricUpdate : metricUpdates.histogramsUpdates()) { MonitoringInfo mi = histogramUpdateToMonitoringInfo(metricUpdate); if (mi != null) { monitoringInfos.add(mi); @@ -566,7 +551,7 @@ public Map getMonitoringData(ShortIdMap shortIds) { } } }); - perWorkerHistograms.forEach( + histograms.forEach( (metricName, histogramCell) -> { if (histogramCell.getDirty().beforeCommit()) { String shortId = @@ -612,7 +597,7 @@ public void commitUpdates() { gauges.forEachValue(gauge -> gauge.getDirty().afterCommit()); stringSets.forEachValue(sSets -> sSets.getDirty().afterCommit()); boundedTries.forEachValue(bTrie -> bTrie.getDirty().afterCommit()); - perWorkerHistograms.forEachValue( + histograms.forEachValue( histogram -> { histogram.getDirty().afterCommit(); }); @@ -652,7 +637,7 @@ public MetricUpdates getCumulative() { extractCumulatives(gauges), extractCumulatives(stringSets), extractCumulatives(boundedTries), - extractHistogramCumulatives(perWorkerHistograms)); + extractHistogramCumulatives(histograms)); } /** Update values of this {@link MetricsContainerImpl} by merging the value of another cell. */ @@ -696,10 +681,10 @@ private void updateForBoundedTrieType(MonitoringInfo monitoringInfo) { boundedTrie.update(decodeBoundedTrie(monitoringInfo.getPayload())); } - private void updateForPerWorkerHistogramInt64(MonitoringInfo monitoringInfo) { + private void updateForHistogramInt64(MonitoringInfo monitoringInfo) { MetricName metricName = MonitoringInfoMetricName.of(monitoringInfo); HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 17); - Histogram histogram = getPerWorkerHistogram(metricName, buckets); + Histogram histogram = getHistogram(metricName, buckets); HistogramData data = decodeInt64Histogram(monitoringInfo.getPayload()); histogram.update(data); } @@ -732,8 +717,8 @@ public void update(Iterable monitoringInfos) { updateForBoundedTrieType(monitoringInfo); break; - case PER_WORKER_HISTOGRAM_TYPE: - updateForPerWorkerHistogramInt64(monitoringInfo); // use type, and not urn info + case HISTOGRAM_TYPE: + updateForHistogramInt64(monitoringInfo); // use type, and not urn info break; default: LOG.warn("Unsupported metric type {}", monitoringInfo.getType()); @@ -785,14 +770,14 @@ public boolean equals(@Nullable Object object) { && Objects.equals(gauges, metricsContainerImpl.gauges) && Objects.equals(stringSets, metricsContainerImpl.stringSets) && Objects.equals(boundedTries, metricsContainerImpl.boundedTries) - && Objects.equals(perWorkerHistograms, metricsContainerImpl.perWorkerHistograms); + && Objects.equals(histograms, metricsContainerImpl.histograms); } return false; } @Override public int hashCode() { - return Objects.hash(stepName, counters, distributions, gauges, stringSets, boundedTries, perWorkerHistograms); + return Objects.hash(stepName, counters, distributions, gauges, stringSets, boundedTries, histograms); } /** @@ -924,21 +909,6 @@ public static MetricsContainerImpl deltaContainer( deltaValueCell.incTopBucketCount( currValue.getTopBucketCount() - prevValue.getTopBucketCount()); } - for (Map.Entry, HistogramCell> cell : - curr.perWorkerHistograms.entries()) { - HistogramData.BucketType bt = cell.getKey().getValue(); - HistogramData prevValue = prev.perWorkerHistograms.get(cell.getKey()).getCumulative(); - HistogramData currValue = cell.getValue().getCumulative(); - HistogramCell deltaValueCell = deltaContainer.perWorkerHistograms.get(cell.getKey()); - deltaValueCell.incBottomBucketCount( - currValue.getBottomBucketCount() - prevValue.getBottomBucketCount()); - for (int i = 0; i < bt.getNumBuckets(); i++) { - Long bucketCountDelta = currValue.getCount(i) - prevValue.getCount(i); - deltaValueCell.incBucketCount(i, bucketCountDelta); - } - deltaValueCell.incTopBucketCount( - currValue.getTopBucketCount() - prevValue.getTopBucketCount()); - } for (Map.Entry cell : curr.stringSets.entries()) { // Simply take the most recent value for stringSets, no need to count deltas. diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java index e33a316c24e..66713529c3d 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java @@ -139,7 +139,7 @@ public static MetricResults asMetricResults( Map> gauges = new HashMap<>(); Map> sets = new HashMap<>(); Map> boundedTries = new HashMap<>(); - Map> perWorkerHistograms = new HashMap<>(); + Map> histograms = new HashMap<>(); attemptedMetricsContainers.forEachMetricContainer( container -> { @@ -151,8 +151,7 @@ public static MetricResults asMetricResults( mergeAttemptedResults(sets, cumulative.stringSetUpdates(), StringSetData::combine); mergeAttemptedResults( boundedTries, cumulative.boundedTrieUpdates(), BoundedTrieData::combine); - mergeAttemptedResults( - perWorkerHistograms, cumulative.perWorkerHistogramsUpdates(), HistogramData::combine); + mergeAttemptedResults(histograms, cumulative.histogramsUpdates(), HistogramData::combine); }); committedMetricsContainers.forEachMetricContainer( container -> { @@ -164,8 +163,7 @@ public static MetricResults asMetricResults( mergeCommittedResults(sets, cumulative.stringSetUpdates(), StringSetData::combine); mergeCommittedResults( boundedTries, cumulative.boundedTrieUpdates(), BoundedTrieData::combine); - mergeCommittedResults( - perWorkerHistograms, cumulative.perWorkerHistogramsUpdates(), HistogramData::combine); + mergeCommittedResults(histograms, cumulative.histogramsUpdates(), HistogramData::combine); }); return new DefaultMetricResults( @@ -182,7 +180,7 @@ public static MetricResults asMetricResults( boundedTries.values().stream() .map(result -> result.transform(BoundedTrieData::extractResult)) .collect(toList()), - perWorkerHistograms.values().stream() + histograms.values().stream() .map(result -> result.transform(HistogramData::extractResult)) .collect(toList())); } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java index 772e5430918..f7e4a39e942 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java @@ -56,8 +56,7 @@ public static final class Urns { extractUrn(MonitoringInfoSpecs.Enum.USER_SET_STRING); public static final String USER_BOUNDED_TRIE = extractUrn(MonitoringInfoSpecs.Enum.USER_BOUNDED_TRIE); - public static final String USER_PER_WORKER_HISTOGRAM = - extractUrn(MonitoringInfoSpecs.Enum.USER_PER_WORKER_HISTOGRAM); + public static final String USER_HISTOGRAM = extractUrn(MonitoringInfoSpecs.Enum.USER_HISTOGRAM); public static final String SAMPLED_BYTE_SIZE = extractUrn(MonitoringInfoSpecs.Enum.SAMPLED_BYTE_SIZE); public static final String WORK_COMPLETED = extractUrn(MonitoringInfoSpecs.Enum.WORK_COMPLETED); @@ -170,8 +169,7 @@ public static final class TypeUrns { public static final String PROGRESS_TYPE = "beam:metrics:progress:v1"; public static final String SET_STRING_TYPE = "beam:metrics:set_string:v1"; public static final String BOUNDED_TRIE_TYPE = "beam:metrics:bounded_trie:v1"; - public static final String PER_WORKER_HISTOGRAM_TYPE = - "beam:metrics:per_worker_histogram_int64:v1"; + public static final String HISTOGRAM_TYPE = "beam:metrics:histogram_int64:v1"; static { // Validate that compile time constants match the values stored in the protos. @@ -200,9 +198,7 @@ public static final class TypeUrns { checkArgument(SET_STRING_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.SET_STRING_TYPE))); checkArgument( BOUNDED_TRIE_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.BOUNDED_TRIE_TYPE))); - checkArgument( - PER_WORKER_HISTOGRAM_TYPE.equals( - getUrn(MonitoringInfoTypeUrns.Enum.PER_WORKER_HISTOGRAM))); + checkArgument(HISTOGRAM_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.HISTOGRAM))); } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java index 2416df36454..c5031308711 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java @@ -173,12 +173,11 @@ public SimpleMonitoringInfoBuilder setBoundedTrieValue(BoundedTrieData value) { } /** - * Encodes the value and sets the type to {@link - * MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM_TYPE}. + * Encodes the value and sets the type to {@link MonitoringInfoConstants.TypeUrns#HISTOGRAM_TYPE}. */ public SimpleMonitoringInfoBuilder setInt64HistogramValue(HistogramData data) { this.builder.setPayload(encodeInt64Histogram(data)); - this.builder.setType(MonitoringInfoConstants.TypeUrns.PER_WORKER_HISTOGRAM_TYPE); + this.builder.setType(MonitoringInfoConstants.TypeUrns.HISTOGRAM_TYPE); return this; } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java index c89978e2e4c..69f7312f99a 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java @@ -441,7 +441,6 @@ public void testDeltaCounters() { HistogramData.BucketType bucketType = HistogramData.LinearBuckets.of(0, 2, 5); MetricName hName = MetricName.named("namespace", "histogram"); MetricName stringSetName = MetricName.named("namespace", "stringset"); - MetricName pwhName = MetricName.named("namespace", "perWorkerHistogram"); MetricsContainerImpl prevContainer = new MetricsContainerImpl(null); prevContainer.getCounter(cName).inc(2L); @@ -453,10 +452,6 @@ public void testDeltaCounters() { prevContainer.getHistogram(hName, bucketType).update(3); prevContainer.getHistogram(hName, bucketType).update(20); - // Set PerWorkerBucketCounts to [0,1,1,0,0,0,0] - prevContainer.getPerWorkerHistogram(pwhName, bucketType).update(1); - prevContainer.getPerWorkerHistogram(pwhName, bucketType).update(3); - MetricsContainerImpl nextContainer = new MetricsContainerImpl(null); nextContainer.getCounter(cName).inc(9L); nextContainer.getGauge(gName).set(8L); @@ -475,10 +470,6 @@ public void testDeltaCounters() { nextContainer.getHistogram(hName, bucketType).update(20); nextContainer.getHistogram(hName, bucketType).update(20); - // Set PerWorkerBucketCounts to [1,0,0,0,0,0,1] - nextContainer.getPerWorkerHistogram(pwhName, bucketType).update(-1); - nextContainer.getPerWorkerHistogram(pwhName, bucketType).update(20); - MetricsContainerImpl deltaContainer = MetricsContainerImpl.deltaContainer(prevContainer, nextContainer); // Expect counter value: 7 = 9 - 2 @@ -504,20 +495,6 @@ public void testDeltaCounters() { } assertEquals( 2, deltaContainer.getHistogram(hName, bucketType).getCumulative().getTopBucketCount()); - - // Expect per worker bucket counts: [1,0,0,0,0,0,1] - assertEquals( - 1, - deltaContainer - .getPerWorkerHistogram(pwhName, bucketType) - .getCumulative() - .getBottomBucketCount()); - assertEquals( - 1, - deltaContainer - .getPerWorkerHistogram(pwhName, bucketType) - .getCumulative() - .getTopBucketCount()); } @Test diff --git a/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/CustomMetricQueryResults.java b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/CustomMetricQueryResults.java index 177fe1c22ee..72ba34c9255 100644 --- a/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/CustomMetricQueryResults.java +++ b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/CustomMetricQueryResults.java @@ -96,7 +96,7 @@ public Iterable> getBoundedTries() { } @Override - public Iterable> getPerWorkerHistograms() { + public Iterable> getHistograms() { return Collections.emptyList(); } } diff --git a/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSinkTest.java b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSinkTest.java index 2dde00d2310..f34b28bbba1 100644 --- a/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSinkTest.java +++ b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSinkTest.java @@ -97,7 +97,7 @@ public void testWriteMetricsWithCommittedSupported() throws Exception { + "\"namespace\":\"ns1\"},\"step\":\"s2\"}],\"gauges\":[{\"attempted\":{\"timestamp\":" + "\"1970-01-05T00:04:22.800Z\",\"value\":120},\"committed\":{\"timestamp\":" + "\"1970-01-05T00:04:22.800Z\",\"value\":100},\"name\":{\"name\":\"n3\",\"namespace\":" - + "\"ns1\"},\"step\":\"s3\"}],\"perWorkerHistograms\":[],\"stringSets\":[{\"attempted\":{\"stringSet\":[\"cd" + + "\"ns1\"},\"step\":\"s3\"}],\"histograms\":[],\"stringSets\":[{\"attempted\":{\"stringSet\":[\"cd" + "\"]},\"committed\":{\"stringSet\":[\"ab\"]},\"name\":{\"name\":\"n3\"," + "\"namespace\":\"ns1\"},\"step\":\"s3\"}]}"; assertEquals("Wrong number of messages sent to HTTP server", 1, messages.size()); @@ -121,7 +121,7 @@ public void testWriteMetricsWithCommittedUnSupported() throws Exception { + "{\"count\":4,\"max\":9,\"mean\":6.25,\"min\":3,\"sum\":25},\"name\":{\"name\":\"n2\"" + ",\"namespace\":\"ns1\"},\"step\":\"s2\"}],\"gauges\":[{\"attempted\":{\"timestamp\":" + "\"1970-01-05T00:04:22.800Z\",\"value\":120},\"name\":{\"name\":\"n3\",\"namespace\":" - + "\"ns1\"},\"step\":\"s3\"}],\"perWorkerHistograms\":[],\"stringSets\":[{\"attempted\":{\"stringSet\":[\"cd\"]}," + + "\"ns1\"},\"step\":\"s3\"}],\"histograms\":[],\"stringSets\":[{\"attempted\":{\"stringSet\":[\"cd\"]}," + "\"name\":{\"name\":\"n3\",\"namespace\":\"ns1\"},\"step\":\"s3\"}]}"; assertEquals("Wrong number of messages sent to HTTP server", 1, messages.size()); assertEquals("Wrong messages sent to HTTP server", expected, messages.get(0)); diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/FailedRunningPipelineResults.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/FailedRunningPipelineResults.java index fab4973d59a..ffedd1465d1 100644 --- a/runners/jet/src/main/java/org/apache/beam/runners/jet/FailedRunningPipelineResults.java +++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/FailedRunningPipelineResults.java @@ -99,7 +99,7 @@ public Iterable> getBoundedTries() { } @Override - public Iterable> getPerWorkerHistograms() { + public Iterable> getHistograms() { return Collections.emptyList(); } }; diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java index 0717d94982d..29900c8ea02 100644 --- a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java +++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java @@ -113,7 +113,7 @@ private static class QueryResults extends MetricQueryResults { private final Iterable> gauges; private final Iterable> stringSets; private final Iterable> boundedTries; - private final Iterable> perWorkerHistograms; + private final Iterable> histograms; private QueryResults( Iterable> counters, @@ -126,7 +126,7 @@ private QueryResults( this.gauges = gauges; this.stringSets = stringSets; this.boundedTries = boundedTries; - this.perWorkerHistograms = Collections.emptyList(); // not implemented + this.histograms = Collections.emptyList(); // not implemented } @Override @@ -155,8 +155,8 @@ public Iterable> getBoundedTries() { } @Override - public Iterable> getPerWorkerHistograms() { - return perWorkerHistograms; + public Iterable> getHistograms() { + return histograms; } } diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricsContainer.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricsContainer.java index 540aa1048cc..03254943aa3 100644 --- a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricsContainer.java +++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricsContainer.java @@ -179,7 +179,7 @@ public Iterable> boundedTrieUpdates() { } @Override - public Iterable> perWorkerHistogramsUpdates() { + public Iterable> histogramsUpdates() { return Collections.emptyList(); // not implemented } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java index 9b00f8849e1..ff8490721d0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java @@ -42,7 +42,7 @@ public abstract class MetricQueryResults { public abstract Iterable> getBoundedTries(); /** Return the metric results for the sets that matched the filter. */ - public abstract Iterable> getPerWorkerHistograms(); + public abstract Iterable> getHistograms(); static void printMetrics(String type, Iterable> metrics, StringBuilder sb) { List> metricsList = ImmutableList.copyOf(metrics);