Skip to content

Commit

Permalink
update naming
Browse files Browse the repository at this point in the history
  • Loading branch information
Naireen committed Feb 22, 2025
1 parent aa3fbb3 commit a80f5fc
Show file tree
Hide file tree
Showing 14 changed files with 47 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -379,13 +379,13 @@ message MonitoringInfoSpecs {
]
}];

USER_PER_WORKER_HISTOGRAM = 22 [(monitoring_info_spec) = {
urn: "beam:metric:user:per_worker_histogram_int64:v1",
type: "beam:metrics:per_worker_histogram_int64:v1",
USER_HISTOGRAM = 23 [(monitoring_info_spec) = {
urn: "beam:metric:user:histogram_int64:v1",
type: "beam:metrics:histogram_int64:v1",
required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
annotations: [{
key: "description",
value: "URN utilized to report per worker histogram metric."
value: "URN utilized to report histogram metric."
}]
}];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,21 @@ public class DefaultMetricResults extends MetricResults {
private final Iterable<MetricResult<GaugeResult>> gauges;
private final Iterable<MetricResult<StringSetResult>> stringSets;
private final Iterable<MetricResult<BoundedTrieResult>> boundedTries;
private final Iterable<MetricResult<HistogramData>> perWorkerHistograms;
private final Iterable<MetricResult<HistogramData>> histograms;

public DefaultMetricResults(
Iterable<MetricResult<Long>> counters,
Iterable<MetricResult<DistributionResult>> distributions,
Iterable<MetricResult<GaugeResult>> gauges,
Iterable<MetricResult<StringSetResult>> stringSets,
Iterable<MetricResult<BoundedTrieResult>> boundedTries,
Iterable<MetricResult<HistogramData>> perWorkerHistograms) {
Iterable<MetricResult<HistogramData>> histograms) {
this.counters = counters;
this.distributions = distributions;
this.gauges = gauges;
this.stringSets = stringSets;
this.boundedTries = boundedTries;
this.perWorkerHistograms = perWorkerHistograms;
this.histograms = histograms;
}

@Override
Expand All @@ -72,10 +72,6 @@ public MetricQueryResults queryMetrics(@Nullable MetricsFilter filter) {
Iterables.filter(
stringSets, stringSets -> MetricFiltering.matches(filter, stringSets.getKey())),
Iterables.filter(
boundedTries, boundedTries -> MetricFiltering.matches(filter, boundedTries.getKey())),
Iterables.filter(
perWorkerHistograms,
perWorkerHistogram -> MetricFiltering.matches(filter, perWorkerHistogram.getKey())));

histograms, histogram -> MetricFiltering.matches(filter, histogram.getKey())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public static <T> MetricUpdate<T> create(MetricKey key, T update) {
public abstract Iterable<MetricUpdate<BoundedTrieData>> boundedTrieUpdates();

/** All the histogram updates. */
public abstract Iterable<MetricUpdate<HistogramData>> perWorkerHistogramsUpdates();
public abstract Iterable<MetricUpdate<HistogramData>> histogramsUpdates();

/** Create a new {@link MetricUpdates} bundle. */
public static MetricUpdates create(
Expand All @@ -81,13 +81,9 @@ public static MetricUpdates create(
Iterable<MetricUpdate<GaugeData>> gaugeUpdates,
Iterable<MetricUpdate<StringSetData>> stringSetUpdates,
Iterable<MetricUpdate<BoundedTrieData>> boundedTrieUpdates,
Iterable<MetricUpdate<HistogramData>> perWorkerHistogramsUpdates) {
Iterable<MetricUpdate<HistogramData>> histogramsUpdates) {
return new AutoValue_MetricUpdates(
counterUpdates,
distributionUpdates,
gaugeUpdates,
stringSetUpdates, boundedTrieUpdates,
perWorkerHistogramsUpdates);
counterUpdates, distributionUpdates, gaugeUpdates, stringSetUpdates, boundedTrieUpdates, histogramsUpdates);
}

/** Returns true if there are no updates in this MetricUpdates object. */
Expand All @@ -97,6 +93,6 @@ public boolean isEmpty() {
&& Iterables.isEmpty(gaugeUpdates())
&& Iterables.isEmpty(stringSetUpdates())
&& Iterables.isEmpty(boundedTrieUpdates())
&& Iterables.isEmpty(perWorkerHistogramsUpdates());
&& Iterables.isEmpty(histogramsUpdates());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.BOUNDED_TRIE_TYPE;
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.HISTOGRAM_TYPE;
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.PER_WORKER_HISTOGRAM_TYPE;
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SET_STRING_TYPE;
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeBoundedTrie;
Expand Down Expand Up @@ -97,9 +97,6 @@ public class MetricsContainerImpl implements Serializable, MetricsContainer {

private MetricsMap<MetricName, StringSetCell> stringSets = new MetricsMap<>(StringSetCell::new);

private MetricsMap<KV<MetricName, HistogramData.BucketType>, HistogramCell> perWorkerHistograms =
new MetricsMap<>(HistogramCell::new);

private MetricsMap<KV<MetricName, HistogramData.BucketType>, HistogramCell> histograms =
new MetricsMap<>(HistogramCell::new);

Expand Down Expand Up @@ -247,20 +244,8 @@ public BoundedTrieCell getBoundedTrie(MetricName metricName) {
return boundedTries.tryGet(metricName);
}

/**
* Return the {@link Histogram} that should be used for implementing the given per-worker {@code
* metricName} in this container.
*/
@Override
public HistogramCell getPerWorkerHistogram(
MetricName metricName, HistogramData.BucketType bucketType) {
HistogramCell val = perWorkerHistograms.get(KV.of(metricName, bucketType));
return val;
}

public MetricsMap<KV<MetricName, HistogramData.BucketType>, HistogramCell>
getPerWorkerHistogram() {
return perWorkerHistograms;
public MetricsMap<KV<MetricName, HistogramData.BucketType>, HistogramCell> getHistogram() {
return histograms;
}

private <UpdateT, CellT extends MetricCell<UpdateT>>
Expand Down Expand Up @@ -303,7 +288,7 @@ public MetricUpdates getUpdates() {
extractUpdates(gauges),
extractUpdates(stringSets),
extractUpdates(boundedTries),
extractHistogramUpdates(perWorkerHistograms));
extractHistogramUpdates(histograms));
}

/** @return The MonitoringInfo metadata from the metric. */
Expand Down Expand Up @@ -434,8 +419,8 @@ public MetricUpdates getUpdates() {
private @Nullable SimpleMonitoringInfoBuilder histogramToMonitoringMetadata(MetricKey metricKey) {
return metricToMonitoringMetadata(
metricKey,
MonitoringInfoConstants.TypeUrns.PER_WORKER_HISTOGRAM_TYPE,
MonitoringInfoConstants.Urns.USER_PER_WORKER_HISTOGRAM);
MonitoringInfoConstants.TypeUrns.HISTOGRAM_TYPE,
MonitoringInfoConstants.Urns.USER_HISTOGRAM);
}

/**
Expand Down Expand Up @@ -507,7 +492,7 @@ public Iterable<MonitoringInfo> getMonitoringInfos() {
}
}

for (MetricUpdate<HistogramData> metricUpdate : metricUpdates.perWorkerHistogramsUpdates()) {
for (MetricUpdate<HistogramData> metricUpdate : metricUpdates.histogramsUpdates()) {
MonitoringInfo mi = histogramUpdateToMonitoringInfo(metricUpdate);
if (mi != null) {
monitoringInfos.add(mi);
Expand Down Expand Up @@ -566,7 +551,7 @@ public Map<String, ByteString> getMonitoringData(ShortIdMap shortIds) {
}
}
});
perWorkerHistograms.forEach(
histograms.forEach(
(metricName, histogramCell) -> {
if (histogramCell.getDirty().beforeCommit()) {
String shortId =
Expand Down Expand Up @@ -612,7 +597,7 @@ public void commitUpdates() {
gauges.forEachValue(gauge -> gauge.getDirty().afterCommit());
stringSets.forEachValue(sSets -> sSets.getDirty().afterCommit());
boundedTries.forEachValue(bTrie -> bTrie.getDirty().afterCommit());
perWorkerHistograms.forEachValue(
histograms.forEachValue(
histogram -> {
histogram.getDirty().afterCommit();
});
Expand Down Expand Up @@ -652,7 +637,7 @@ public MetricUpdates getCumulative() {
extractCumulatives(gauges),
extractCumulatives(stringSets),
extractCumulatives(boundedTries),
extractHistogramCumulatives(perWorkerHistograms));
extractHistogramCumulatives(histograms));
}

/** Update values of this {@link MetricsContainerImpl} by merging the value of another cell. */
Expand Down Expand Up @@ -696,10 +681,10 @@ private void updateForBoundedTrieType(MonitoringInfo monitoringInfo) {
boundedTrie.update(decodeBoundedTrie(monitoringInfo.getPayload()));
}

private void updateForPerWorkerHistogramInt64(MonitoringInfo monitoringInfo) {
private void updateForHistogramInt64(MonitoringInfo monitoringInfo) {
MetricName metricName = MonitoringInfoMetricName.of(monitoringInfo);
HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 17);
Histogram histogram = getPerWorkerHistogram(metricName, buckets);
Histogram histogram = getHistogram(metricName, buckets);
HistogramData data = decodeInt64Histogram(monitoringInfo.getPayload());
histogram.update(data);
}
Expand Down Expand Up @@ -732,8 +717,8 @@ public void update(Iterable<MonitoringInfo> monitoringInfos) {
updateForBoundedTrieType(monitoringInfo);
break;

case PER_WORKER_HISTOGRAM_TYPE:
updateForPerWorkerHistogramInt64(monitoringInfo); // use type, and not urn info
case HISTOGRAM_TYPE:
updateForHistogramInt64(monitoringInfo); // use type, and not urn info
break;
default:
LOG.warn("Unsupported metric type {}", monitoringInfo.getType());
Expand Down Expand Up @@ -785,14 +770,14 @@ public boolean equals(@Nullable Object object) {
&& Objects.equals(gauges, metricsContainerImpl.gauges)
&& Objects.equals(stringSets, metricsContainerImpl.stringSets)
&& Objects.equals(boundedTries, metricsContainerImpl.boundedTries)
&& Objects.equals(perWorkerHistograms, metricsContainerImpl.perWorkerHistograms);
&& Objects.equals(histograms, metricsContainerImpl.histograms);
}
return false;
}

@Override
public int hashCode() {
return Objects.hash(stepName, counters, distributions, gauges, stringSets, boundedTries, perWorkerHistograms);
return Objects.hash(stepName, counters, distributions, gauges, stringSets, boundedTries, histograms);
}

/**
Expand Down Expand Up @@ -924,21 +909,6 @@ public static MetricsContainerImpl deltaContainer(
deltaValueCell.incTopBucketCount(
currValue.getTopBucketCount() - prevValue.getTopBucketCount());
}
for (Map.Entry<KV<MetricName, HistogramData.BucketType>, HistogramCell> cell :
curr.perWorkerHistograms.entries()) {
HistogramData.BucketType bt = cell.getKey().getValue();
HistogramData prevValue = prev.perWorkerHistograms.get(cell.getKey()).getCumulative();
HistogramData currValue = cell.getValue().getCumulative();
HistogramCell deltaValueCell = deltaContainer.perWorkerHistograms.get(cell.getKey());
deltaValueCell.incBottomBucketCount(
currValue.getBottomBucketCount() - prevValue.getBottomBucketCount());
for (int i = 0; i < bt.getNumBuckets(); i++) {
Long bucketCountDelta = currValue.getCount(i) - prevValue.getCount(i);
deltaValueCell.incBucketCount(i, bucketCountDelta);
}
deltaValueCell.incTopBucketCount(
currValue.getTopBucketCount() - prevValue.getTopBucketCount());
}

for (Map.Entry<MetricName, StringSetCell> cell : curr.stringSets.entries()) {
// Simply take the most recent value for stringSets, no need to count deltas.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public static MetricResults asMetricResults(
Map<MetricKey, MetricResult<GaugeData>> gauges = new HashMap<>();
Map<MetricKey, MetricResult<StringSetData>> sets = new HashMap<>();
Map<MetricKey, MetricResult<BoundedTrieData>> boundedTries = new HashMap<>();
Map<MetricKey, MetricResult<HistogramData>> perWorkerHistograms = new HashMap<>();
Map<MetricKey, MetricResult<HistogramData>> histograms = new HashMap<>();

attemptedMetricsContainers.forEachMetricContainer(
container -> {
Expand All @@ -151,8 +151,7 @@ public static MetricResults asMetricResults(
mergeAttemptedResults(sets, cumulative.stringSetUpdates(), StringSetData::combine);
mergeAttemptedResults(
boundedTries, cumulative.boundedTrieUpdates(), BoundedTrieData::combine);
mergeAttemptedResults(
perWorkerHistograms, cumulative.perWorkerHistogramsUpdates(), HistogramData::combine);
mergeAttemptedResults(histograms, cumulative.histogramsUpdates(), HistogramData::combine);
});
committedMetricsContainers.forEachMetricContainer(
container -> {
Expand All @@ -164,8 +163,7 @@ public static MetricResults asMetricResults(
mergeCommittedResults(sets, cumulative.stringSetUpdates(), StringSetData::combine);
mergeCommittedResults(
boundedTries, cumulative.boundedTrieUpdates(), BoundedTrieData::combine);
mergeCommittedResults(
perWorkerHistograms, cumulative.perWorkerHistogramsUpdates(), HistogramData::combine);
mergeCommittedResults(histograms, cumulative.histogramsUpdates(), HistogramData::combine);
});

return new DefaultMetricResults(
Expand All @@ -182,7 +180,7 @@ public static MetricResults asMetricResults(
boundedTries.values().stream()
.map(result -> result.transform(BoundedTrieData::extractResult))
.collect(toList()),
perWorkerHistograms.values().stream()
histograms.values().stream()
.map(result -> result.transform(HistogramData::extractResult))
.collect(toList()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ public static final class Urns {
extractUrn(MonitoringInfoSpecs.Enum.USER_SET_STRING);
public static final String USER_BOUNDED_TRIE =
extractUrn(MonitoringInfoSpecs.Enum.USER_BOUNDED_TRIE);
public static final String USER_PER_WORKER_HISTOGRAM =
extractUrn(MonitoringInfoSpecs.Enum.USER_PER_WORKER_HISTOGRAM);
public static final String USER_HISTOGRAM = extractUrn(MonitoringInfoSpecs.Enum.USER_HISTOGRAM);
public static final String SAMPLED_BYTE_SIZE =
extractUrn(MonitoringInfoSpecs.Enum.SAMPLED_BYTE_SIZE);
public static final String WORK_COMPLETED = extractUrn(MonitoringInfoSpecs.Enum.WORK_COMPLETED);
Expand Down Expand Up @@ -170,8 +169,7 @@ public static final class TypeUrns {
public static final String PROGRESS_TYPE = "beam:metrics:progress:v1";
public static final String SET_STRING_TYPE = "beam:metrics:set_string:v1";
public static final String BOUNDED_TRIE_TYPE = "beam:metrics:bounded_trie:v1";
public static final String PER_WORKER_HISTOGRAM_TYPE =
"beam:metrics:per_worker_histogram_int64:v1";
public static final String HISTOGRAM_TYPE = "beam:metrics:histogram_int64:v1";

static {
// Validate that compile time constants match the values stored in the protos.
Expand Down Expand Up @@ -200,9 +198,7 @@ public static final class TypeUrns {
checkArgument(SET_STRING_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.SET_STRING_TYPE)));
checkArgument(
BOUNDED_TRIE_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.BOUNDED_TRIE_TYPE)));
checkArgument(
PER_WORKER_HISTOGRAM_TYPE.equals(
getUrn(MonitoringInfoTypeUrns.Enum.PER_WORKER_HISTOGRAM)));
checkArgument(HISTOGRAM_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.HISTOGRAM)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,11 @@ public SimpleMonitoringInfoBuilder setBoundedTrieValue(BoundedTrieData value) {
}

/**
* Encodes the value and sets the type to {@link
* MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM_TYPE}.
* Encodes the value and sets the type to {@link MonitoringInfoConstants.TypeUrns#HISTOGRAM_TYPE}.
*/
public SimpleMonitoringInfoBuilder setInt64HistogramValue(HistogramData data) {
this.builder.setPayload(encodeInt64Histogram(data));
this.builder.setType(MonitoringInfoConstants.TypeUrns.PER_WORKER_HISTOGRAM_TYPE);
this.builder.setType(MonitoringInfoConstants.TypeUrns.HISTOGRAM_TYPE);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,6 @@ public void testDeltaCounters() {
HistogramData.BucketType bucketType = HistogramData.LinearBuckets.of(0, 2, 5);
MetricName hName = MetricName.named("namespace", "histogram");
MetricName stringSetName = MetricName.named("namespace", "stringset");
MetricName pwhName = MetricName.named("namespace", "perWorkerHistogram");

MetricsContainerImpl prevContainer = new MetricsContainerImpl(null);
prevContainer.getCounter(cName).inc(2L);
Expand All @@ -453,10 +452,6 @@ public void testDeltaCounters() {
prevContainer.getHistogram(hName, bucketType).update(3);
prevContainer.getHistogram(hName, bucketType).update(20);

// Set PerWorkerBucketCounts to [0,1,1,0,0,0,0]
prevContainer.getPerWorkerHistogram(pwhName, bucketType).update(1);
prevContainer.getPerWorkerHistogram(pwhName, bucketType).update(3);

MetricsContainerImpl nextContainer = new MetricsContainerImpl(null);
nextContainer.getCounter(cName).inc(9L);
nextContainer.getGauge(gName).set(8L);
Expand All @@ -475,10 +470,6 @@ public void testDeltaCounters() {
nextContainer.getHistogram(hName, bucketType).update(20);
nextContainer.getHistogram(hName, bucketType).update(20);

// Set PerWorkerBucketCounts to [1,0,0,0,0,0,1]
nextContainer.getPerWorkerHistogram(pwhName, bucketType).update(-1);
nextContainer.getPerWorkerHistogram(pwhName, bucketType).update(20);

MetricsContainerImpl deltaContainer =
MetricsContainerImpl.deltaContainer(prevContainer, nextContainer);
// Expect counter value: 7 = 9 - 2
Expand All @@ -504,20 +495,6 @@ public void testDeltaCounters() {
}
assertEquals(
2, deltaContainer.getHistogram(hName, bucketType).getCumulative().getTopBucketCount());

// Expect per worker bucket counts: [1,0,0,0,0,0,1]
assertEquals(
1,
deltaContainer
.getPerWorkerHistogram(pwhName, bucketType)
.getCumulative()
.getBottomBucketCount());
assertEquals(
1,
deltaContainer
.getPerWorkerHistogram(pwhName, bucketType)
.getCumulative()
.getTopBucketCount());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public Iterable<MetricResult<BoundedTrieResult>> getBoundedTries() {
}

@Override
public Iterable<MetricResult<HistogramData>> getPerWorkerHistograms() {
public Iterable<MetricResult<HistogramData>> getHistograms() {
return Collections.emptyList();
}
}
Loading

0 comments on commit a80f5fc

Please sign in to comment.