From 4004f44dd89a73dfb91306740a5dbb664e86ce7d Mon Sep 17 00:00:00 2001 From: Xianming Lei <31424839+leixm@users.noreply.github.com> Date: Wed, 16 Nov 2022 18:02:30 +0800 Subject: [PATCH] [ISSUE-309][FEATURE] Support ShuffleServer latency metrics. (#327) ### What changes were proposed in this pull request? For https://github.com/apache/incubator-uniffle/issues/309, support ShuffleServer latency metrics. ### Why are the changes needed? Accurately determine whether the current service load has caused a large delay to the client's read and write. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Co-authored-by: leixianming --- .../uniffle/common/metrics/GRPCMetrics.java | 27 +++++++++ .../common/metrics/MetricsManager.java | 11 ++++ .../apache/uniffle/common/util/Constants.java | 2 + .../impl/grpc/ShuffleServerGrpcClient.java | 9 ++- proto/src/main/proto/Rss.proto | 3 + .../server/ShuffleServerGrpcMetrics.java | 26 ++++++++ .../server/ShuffleServerGrpcService.java | 42 ++++++++++++- .../server/ShuffleServerGrpcMetricsTest.java | 59 +++++++++++++++++++ .../server/ShuffleServerMetricsTest.java | 2 +- 9 files changed, 175 insertions(+), 6 deletions(-) create mode 100644 server/src/test/java/org/apache/uniffle/server/ShuffleServerGrpcMetricsTest.java diff --git a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java b/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java index 0e3fc7fda0..7d8b484666 100644 --- a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java +++ b/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java @@ -23,6 +23,9 @@ import io.prometheus.client.CollectorRegistry; import io.prometheus.client.Counter; import io.prometheus.client.Gauge; +import io.prometheus.client.Summary; + +import org.apache.uniffle.common.util.Constants; public abstract class GRPCMetrics { // Grpc server internal executor metrics @@ -36,6 +39,8 @@ public abstract class GRPCMetrics { private boolean isRegister = false; protected Map counterMap = Maps.newConcurrentMap(); protected Map gaugeMap = Maps.newConcurrentMap(); + protected Map transportTimeSummaryMap = Maps.newConcurrentMap(); + protected Map processTimeSummaryMap = Maps.newConcurrentMap(); protected Gauge gaugeGrpcOpen; protected Counter counterGrpcTotal; protected MetricsManager metricsManager; @@ -100,6 +105,20 @@ public void decCounter(String methodName) { } } + public void recordTransportTime(String methodName, long transportTimeInMillionSecond) { + Summary summary = transportTimeSummaryMap.get(methodName); + if (summary != null) { + summary.observe(transportTimeInMillionSecond / Constants.MILLION_SECONDS_PER_SECOND); + } + } + + public void recordProcessTime(String methodName, long processTimeInMillionSecond) { + Summary summary = processTimeSummaryMap.get(methodName); + if (summary != null) { + summary.observe(processTimeInMillionSecond / Constants.MILLION_SECONDS_PER_SECOND); + } + } + public CollectorRegistry getCollectorRegistry() { return metricsManager.getCollectorRegistry(); } @@ -119,4 +138,12 @@ public Gauge getGaugeGrpcOpen() { public Counter getCounterGrpcTotal() { return counterGrpcTotal; } + + public Map getTransportTimeSummaryMap() { + return transportTimeSummaryMap; + } + + public Map getProcessTimeSummaryMap() { + return processTimeSummaryMap; + } } diff --git a/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java b/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java index b09c7ccf14..2981ca22f3 100644 --- a/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java +++ b/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java @@ -21,9 +21,12 @@ import io.prometheus.client.Counter; import io.prometheus.client.Gauge; import io.prometheus.client.Histogram; +import io.prometheus.client.Summary; public class MetricsManager { private CollectorRegistry collectorRegistry; + private static final double[] QUANTILES = {0.50, 0.75, 0.90, 0.95, 0.99}; + private static final double QUANTILE_ERROR = 0.01; public MetricsManager() { this(null); @@ -64,4 +67,12 @@ public Histogram addHistogram(String name, double[] buckets, String... labels) { public Histogram addHistogram(String name, String help, double[] buckets, String[] labels) { return Histogram.build().name(name).buckets(buckets).labelNames(labels).help(help).register(collectorRegistry); } + + public Summary addSummary(String name) { + Summary.Builder builder = Summary.build().name(name).help("Summary " + name); + for (int i = 0; i < QUANTILES.length; i++) { + builder = builder.quantile(QUANTILES[i], QUANTILE_ERROR); + } + return builder.register(collectorRegistry); + } } diff --git a/common/src/main/java/org/apache/uniffle/common/util/Constants.java b/common/src/main/java/org/apache/uniffle/common/util/Constants.java index 1921c31242..f21597ab81 100644 --- a/common/src/main/java/org/apache/uniffle/common/util/Constants.java +++ b/common/src/main/java/org/apache/uniffle/common/util/Constants.java @@ -65,4 +65,6 @@ public class Constants { public static int MR_REDUCE_LIMIT_DEFAULT_VALUE = 0; public static final String MR_SLOW_START = "mapreduce.job.reduce.slowstart.completedmaps"; public static double MR_SLOW_START_DEFAULT_VALUE = 0.05; + + public static final double MILLION_SECONDS_PER_SECOND = 1E3D; } diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java index 3d54e50ae1..4adf822911 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java @@ -307,13 +307,14 @@ public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest requ throw new RssException(String.format( "requirePreAllocation failed! size[%s], host[%s], port[%s]", allocateSize, host, port)); } + long start = System.currentTimeMillis(); SendShuffleDataRequest rpcRequest = SendShuffleDataRequest.newBuilder() .setAppId(appId) .setShuffleId(stb.getKey()) .setRequireBufferId(requireId) .addAllShuffleData(shuffleData) + .setTimestamp(start) .build(); - long start = System.currentTimeMillis(); SendShuffleDataResponse response = getBlockingStub().sendShuffleData(rpcRequest); LOG.info("Do sendShuffleData to {}:{} rpc cost:" + (System.currentTimeMillis() - start) + " ms for " + allocateSize + " bytes with " + finalBlockNum + " blocks", host, port); @@ -522,6 +523,7 @@ public RssGetShuffleResultResponse getShuffleResultForMultiPart(RssGetShuffleRes @Override public RssGetShuffleDataResponse getShuffleData(RssGetShuffleDataRequest request) { + long start = System.currentTimeMillis(); GetLocalShuffleDataRequest rpcRequest = GetLocalShuffleDataRequest .newBuilder() .setAppId(request.getAppId()) @@ -531,8 +533,8 @@ public RssGetShuffleDataResponse getShuffleData(RssGetShuffleDataRequest request .setPartitionNum(request.getPartitionNum()) .setOffset(request.getOffset()) .setLength(request.getLength()) + .setTimestamp(start) .build(); - long start = System.currentTimeMillis(); GetLocalShuffleDataResponse rpcResponse = getBlockingStub().getLocalShuffleData(rpcRequest); String requestInfo = "appId[" + request.getAppId() + "], shuffleId[" + request.getShuffleId() + "], partitionId[" + request.getPartitionId() + "]"; @@ -595,6 +597,7 @@ public RssGetShuffleIndexResponse getShuffleIndex(RssGetShuffleIndexRequest requ @Override public RssGetInMemoryShuffleDataResponse getInMemoryShuffleData( RssGetInMemoryShuffleDataRequest request) { + long start = System.currentTimeMillis(); GetMemoryShuffleDataRequest rpcRequest = GetMemoryShuffleDataRequest .newBuilder() .setAppId(request.getAppId()) @@ -602,9 +605,9 @@ public RssGetInMemoryShuffleDataResponse getInMemoryShuffleData( .setPartitionId(request.getPartitionId()) .setLastBlockId(request.getLastBlockId()) .setReadBufferSize(request.getReadBufferSize()) + .setTimestamp(start) .build(); - long start = System.currentTimeMillis(); GetMemoryShuffleDataResponse rpcResponse = getBlockingStub().getMemoryShuffleData(rpcRequest); String requestInfo = "appId[" + request.getAppId() + "], shuffleId[" + request.getShuffleId() + "], partitionId[" + request.getPartitionId() + "]"; diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto index 68b5c02c06..5c7e80b854 100644 --- a/proto/src/main/proto/Rss.proto +++ b/proto/src/main/proto/Rss.proto @@ -76,6 +76,7 @@ message GetLocalShuffleDataRequest { int32 partitionNum = 5; int64 offset = 6; int32 length = 7; + int64 timestamp = 8; } message GetLocalShuffleDataResponse { @@ -90,6 +91,7 @@ message GetMemoryShuffleDataRequest { int32 partitionId = 3; int64 lastBlockId = 4; int32 readBufferSize = 5; + int64 timestamp = 6; } message GetMemoryShuffleDataResponse { @@ -195,6 +197,7 @@ message SendShuffleDataRequest { int32 shuffleId = 2; int64 requireBufferId = 3; repeated ShuffleData shuffleData = 4; + int64 timestamp = 5; } message SendShuffleDataResponse { diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcMetrics.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcMetrics.java index 9ca3c75751..e83d7b85e6 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcMetrics.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcMetrics.java @@ -60,6 +60,18 @@ public class ShuffleServerGrpcMetrics extends GRPCMetrics { "grpc_get_in_memory_shuffle_data_total"; private static final String GRPC_GET_SHUFFLE_INDEX_TOTAL = "grpc_get_local_shuffle_index_total"; + private static final String GRPC_SEND_SHUFFLE_DATA_TRANSPORT_LATENCY = + "grpc_send_shuffle_data_transport_latency"; + private static final String GRPC_GET_SHUFFLE_DATA_TRANSPORT_LATENCY = + "grpc_get_local_shuffle_data_transport_latency"; + private static final String GRPC_GET_IN_MEMORY_SHUFFLE_DATA_TRANSPORT_LATENCY = + "grpc_get_in_memory_shuffle_data_transport_latency"; + + private static final String GRPC_SEND_SHUFFLE_DATA_PROCESS_LATENCY = "grpc_send_shuffle_data_process_latency"; + private static final String GRPC_GET_SHUFFLE_DATA_PROCESS_LATENCY = "grpc_get_local_shuffle_data_process_latency"; + private static final String GRPC_GET_IN_MEMORY_SHUFFLE_DATA_PROCESS_LATENCY = + "grpc_get_in_memory_shuffle_data_process_latency"; + @Override public void registerMetrics() { gaugeGrpcOpen = metricsManager.addGauge(GRPC_OPEN); @@ -110,6 +122,20 @@ public void registerMetrics() { metricsManager.addCounter(GRPC_GET_IN_MEMORY_SHUFFLE_DATA_TOTAL)); counterMap.putIfAbsent(GET_SHUFFLE_INDEX_METHOD, metricsManager.addCounter(GRPC_GET_SHUFFLE_INDEX_TOTAL)); + + transportTimeSummaryMap.putIfAbsent(SEND_SHUFFLE_DATA_METHOD, + metricsManager.addSummary(GRPC_SEND_SHUFFLE_DATA_TRANSPORT_LATENCY)); + transportTimeSummaryMap.putIfAbsent(GET_SHUFFLE_DATA_METHOD, + metricsManager.addSummary(GRPC_GET_SHUFFLE_DATA_TRANSPORT_LATENCY)); + transportTimeSummaryMap.putIfAbsent(GET_IN_MEMORY_SHUFFLE_DATA_METHOD, + metricsManager.addSummary(GRPC_GET_IN_MEMORY_SHUFFLE_DATA_TRANSPORT_LATENCY)); + + processTimeSummaryMap.putIfAbsent(SEND_SHUFFLE_DATA_METHOD, + metricsManager.addSummary(GRPC_SEND_SHUFFLE_DATA_PROCESS_LATENCY)); + processTimeSummaryMap.putIfAbsent(GET_SHUFFLE_DATA_METHOD, + metricsManager.addSummary(GRPC_GET_SHUFFLE_DATA_PROCESS_LATENCY)); + processTimeSummaryMap.putIfAbsent(GET_IN_MEMORY_SHUFFLE_DATA_METHOD, + metricsManager.addSummary(GRPC_GET_IN_MEMORY_SHUFFLE_DATA_PROCESS_LATENCY)); } } diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java index 6af12b7659..2d857264f2 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java @@ -186,6 +186,21 @@ public void sendShuffleData(SendShuffleDataRequest req, String appId = req.getAppId(); int shuffleId = req.getShuffleId(); long requireBufferId = req.getRequireBufferId(); + long timestamp = req.getTimestamp(); + if (timestamp > 0) { + /* + * Here we record the transport time, but we don't consider the impact of data size on transport time. + * The amount of data will not cause great fluctuations in latency. For example, 100K costs 1ms, + * and 1M costs 10ms. This seems like a normal fluctuation, but it may rise to 10s when the server load is high. + * In addition, we need to pay attention to that the time of the client machine and the machine + * time of the Shuffle Server should be kept in sync. TransportTime is accurate only if this condition is met. + * */ + long transportTime = System.currentTimeMillis() - timestamp; + if (transportTime > 0) { + shuffleServer.getGrpcMetrics().recordTransportTime( + ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD, transportTime); + } + } int requireSize = shuffleServer .getShuffleTaskManager().getRequireBufferSize(requireBufferId); @@ -240,8 +255,10 @@ public void sendShuffleData(SendShuffleDataRequest req, } } reply = SendShuffleDataResponse.newBuilder().setStatus(valueOf(ret)).setRetMsg(responseMessage).build(); + long costTime = System.currentTimeMillis() - start; + shuffleServer.getGrpcMetrics().recordProcessTime(ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD, costTime); LOG.debug("Cache Shuffle Data for appId[" + appId + "], shuffleId[" + shuffleId - + "], cost " + (System.currentTimeMillis() - start) + + "], cost " + costTime + " ms with " + shufflePartitionedData.size() + " blocks and " + requireSize + " bytes"); } else { reply = SendShuffleDataResponse @@ -476,6 +493,14 @@ public void getLocalShuffleData(GetLocalShuffleDataRequest request, int partitionNum = request.getPartitionNum(); long offset = request.getOffset(); int length = request.getLength(); + long timestamp = request.getTimestamp(); + if (timestamp > 0) { + long transportTime = System.currentTimeMillis() - timestamp; + if (transportTime > 0) { + shuffleServer.getGrpcMetrics().recordTransportTime( + ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD, transportTime); + } + } String storageType = shuffleServer.getShuffleServerConf().get(RssBaseConf.RSS_STORAGE_TYPE); StatusCode status = StatusCode.SUCCESS; String msg = "OK"; @@ -497,6 +522,8 @@ public void getLocalShuffleData(GetLocalShuffleDataRequest request, ShuffleServerMetrics.counterTotalReadTime.inc(readTime); ShuffleServerMetrics.counterTotalReadDataSize.inc(sdr.getData().length); ShuffleServerMetrics.counterTotalReadLocalDataFileSize.inc(sdr.getData().length); + shuffleServer.getGrpcMetrics().recordProcessTime( + ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD, readTime); LOG.info("Successfully getShuffleData cost {} ms for shuffle" + " data with {}", readTime, requestInfo); reply = GetLocalShuffleDataResponse.newBuilder() @@ -607,6 +634,14 @@ public void getMemoryShuffleData(GetMemoryShuffleDataRequest request, int partitionId = request.getPartitionId(); long blockId = request.getLastBlockId(); int readBufferSize = request.getReadBufferSize(); + long timestamp = request.getTimestamp(); + if (timestamp > 0) { + long transportTime = System.currentTimeMillis() - timestamp; + if (transportTime > 0) { + shuffleServer.getGrpcMetrics().recordTransportTime( + ShuffleServerGrpcMetrics.GET_IN_MEMORY_SHUFFLE_DATA_METHOD, transportTime); + } + } long start = System.currentTimeMillis(); StatusCode status = StatusCode.SUCCESS; String msg = "OK"; @@ -627,8 +662,11 @@ public void getMemoryShuffleData(GetMemoryShuffleDataRequest request, ShuffleServerMetrics.counterTotalReadDataSize.inc(data.length); ShuffleServerMetrics.counterTotalReadMemoryDataSize.inc(data.length); } + long costTime = System.currentTimeMillis() - start; + shuffleServer.getGrpcMetrics().recordProcessTime( + ShuffleServerGrpcMetrics.GET_IN_MEMORY_SHUFFLE_DATA_METHOD, costTime); LOG.info("Successfully getInMemoryShuffleData cost {} ms with {} bytes shuffle" - + " data for {}", (System.currentTimeMillis() - start), data.length, requestInfo); + + " data for {}", costTime, data.length, requestInfo); reply = GetMemoryShuffleDataResponse.newBuilder() .setStatus(valueOf(status)) diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleServerGrpcMetricsTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleServerGrpcMetricsTest.java new file mode 100644 index 0000000000..33e2a9d573 --- /dev/null +++ b/server/src/test/java/org/apache/uniffle/server/ShuffleServerGrpcMetricsTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.server; + +import java.util.Map; + +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.Summary; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class ShuffleServerGrpcMetricsTest { + @Test + public void testLatencyMetrics() { + ShuffleServerGrpcMetrics metrics = new ShuffleServerGrpcMetrics(); + metrics.register(new CollectorRegistry(true)); + metrics.recordTransportTime(ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD, 1000); + metrics.recordTransportTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD, 500); + metrics.recordTransportTime(ShuffleServerGrpcMetrics.GET_IN_MEMORY_SHUFFLE_DATA_METHOD, 200); + metrics.recordProcessTime(ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD, 1000); + metrics.recordProcessTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD, 500); + metrics.recordProcessTime(ShuffleServerGrpcMetrics.GET_IN_MEMORY_SHUFFLE_DATA_METHOD, 200); + Map sendTimeSummaryTime = metrics.getTransportTimeSummaryMap(); + Map processTimeSummaryTime = metrics.getProcessTimeSummaryMap(); + assertEquals(3, sendTimeSummaryTime.size()); + assertEquals(3, processTimeSummaryTime.size()); + + assertEquals(1D, sendTimeSummaryTime.get( + ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD).get().sum); + assertEquals(0.5D, sendTimeSummaryTime.get( + ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD).get().sum); + assertEquals(0.2D, sendTimeSummaryTime.get( + ShuffleServerGrpcMetrics.GET_IN_MEMORY_SHUFFLE_DATA_METHOD).get().sum); + + assertEquals(1D, processTimeSummaryTime.get( + ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD).get().sum); + assertEquals(0.5D, processTimeSummaryTime.get( + ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD).get().sum); + assertEquals(0.2D, processTimeSummaryTime.get( + ShuffleServerGrpcMetrics.GET_IN_MEMORY_SHUFFLE_DATA_METHOD).get().sum); + } + +} diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java index 965a567b05..6276dc23a8 100644 --- a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java +++ b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java @@ -150,7 +150,7 @@ public void testGrpcMetrics() throws Exception { ObjectMapper mapper = new ObjectMapper(); JsonNode actualObj = mapper.readTree(content); assertEquals(2, actualObj.size()); - assertEquals(27, actualObj.get("metrics").size()); + assertEquals(69, actualObj.get("metrics").size()); } @Test