Skip to content

Commit

Permalink
[ISSUE-309][FEATURE] Support ShuffleServer latency metrics. (#327)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
For #309, support ShuffleServer latency metrics.

### Why are the changes needed?
Accurately determine whether the current service load has caused a large delay to the client's read and write.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT

Co-authored-by: leixianming <leixianming@didiglobal.com>
  • Loading branch information
leixm and leixianming authored Nov 16, 2022
1 parent 87361da commit 4004f44
Show file tree
Hide file tree
Showing 9 changed files with 175 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.Summary;

import org.apache.uniffle.common.util.Constants;

public abstract class GRPCMetrics {
// Grpc server internal executor metrics
Expand All @@ -36,6 +39,8 @@ public abstract class GRPCMetrics {
private boolean isRegister = false;
protected Map<String, Counter> counterMap = Maps.newConcurrentMap();
protected Map<String, Gauge> gaugeMap = Maps.newConcurrentMap();
protected Map<String, Summary> transportTimeSummaryMap = Maps.newConcurrentMap();
protected Map<String, Summary> processTimeSummaryMap = Maps.newConcurrentMap();
protected Gauge gaugeGrpcOpen;
protected Counter counterGrpcTotal;
protected MetricsManager metricsManager;
Expand Down Expand Up @@ -100,6 +105,20 @@ public void decCounter(String methodName) {
}
}

public void recordTransportTime(String methodName, long transportTimeInMillionSecond) {
Summary summary = transportTimeSummaryMap.get(methodName);
if (summary != null) {
summary.observe(transportTimeInMillionSecond / Constants.MILLION_SECONDS_PER_SECOND);
}
}

public void recordProcessTime(String methodName, long processTimeInMillionSecond) {
Summary summary = processTimeSummaryMap.get(methodName);
if (summary != null) {
summary.observe(processTimeInMillionSecond / Constants.MILLION_SECONDS_PER_SECOND);
}
}

public CollectorRegistry getCollectorRegistry() {
return metricsManager.getCollectorRegistry();
}
Expand All @@ -119,4 +138,12 @@ public Gauge getGaugeGrpcOpen() {
public Counter getCounterGrpcTotal() {
return counterGrpcTotal;
}

public Map<String, Summary> getTransportTimeSummaryMap() {
return transportTimeSummaryMap;
}

public Map<String, Summary> getProcessTimeSummaryMap() {
return processTimeSummaryMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.Histogram;
import io.prometheus.client.Summary;

public class MetricsManager {
private CollectorRegistry collectorRegistry;
private static final double[] QUANTILES = {0.50, 0.75, 0.90, 0.95, 0.99};
private static final double QUANTILE_ERROR = 0.01;

public MetricsManager() {
this(null);
Expand Down Expand Up @@ -64,4 +67,12 @@ public Histogram addHistogram(String name, double[] buckets, String... labels) {
public Histogram addHistogram(String name, String help, double[] buckets, String[] labels) {
return Histogram.build().name(name).buckets(buckets).labelNames(labels).help(help).register(collectorRegistry);
}

public Summary addSummary(String name) {
Summary.Builder builder = Summary.build().name(name).help("Summary " + name);
for (int i = 0; i < QUANTILES.length; i++) {
builder = builder.quantile(QUANTILES[i], QUANTILE_ERROR);
}
return builder.register(collectorRegistry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,6 @@ public class Constants {
public static int MR_REDUCE_LIMIT_DEFAULT_VALUE = 0;
public static final String MR_SLOW_START = "mapreduce.job.reduce.slowstart.completedmaps";
public static double MR_SLOW_START_DEFAULT_VALUE = 0.05;

public static final double MILLION_SECONDS_PER_SECOND = 1E3D;
}
Original file line number Diff line number Diff line change
Expand Up @@ -307,13 +307,14 @@ public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest requ
throw new RssException(String.format(
"requirePreAllocation failed! size[%s], host[%s], port[%s]", allocateSize, host, port));
}
long start = System.currentTimeMillis();
SendShuffleDataRequest rpcRequest = SendShuffleDataRequest.newBuilder()
.setAppId(appId)
.setShuffleId(stb.getKey())
.setRequireBufferId(requireId)
.addAllShuffleData(shuffleData)
.setTimestamp(start)
.build();
long start = System.currentTimeMillis();
SendShuffleDataResponse response = getBlockingStub().sendShuffleData(rpcRequest);
LOG.info("Do sendShuffleData to {}:{} rpc cost:" + (System.currentTimeMillis() - start)
+ " ms for " + allocateSize + " bytes with " + finalBlockNum + " blocks", host, port);
Expand Down Expand Up @@ -522,6 +523,7 @@ public RssGetShuffleResultResponse getShuffleResultForMultiPart(RssGetShuffleRes

@Override
public RssGetShuffleDataResponse getShuffleData(RssGetShuffleDataRequest request) {
long start = System.currentTimeMillis();
GetLocalShuffleDataRequest rpcRequest = GetLocalShuffleDataRequest
.newBuilder()
.setAppId(request.getAppId())
Expand All @@ -531,8 +533,8 @@ public RssGetShuffleDataResponse getShuffleData(RssGetShuffleDataRequest request
.setPartitionNum(request.getPartitionNum())
.setOffset(request.getOffset())
.setLength(request.getLength())
.setTimestamp(start)
.build();
long start = System.currentTimeMillis();
GetLocalShuffleDataResponse rpcResponse = getBlockingStub().getLocalShuffleData(rpcRequest);
String requestInfo = "appId[" + request.getAppId() + "], shuffleId["
+ request.getShuffleId() + "], partitionId[" + request.getPartitionId() + "]";
Expand Down Expand Up @@ -595,16 +597,17 @@ public RssGetShuffleIndexResponse getShuffleIndex(RssGetShuffleIndexRequest requ
@Override
public RssGetInMemoryShuffleDataResponse getInMemoryShuffleData(
RssGetInMemoryShuffleDataRequest request) {
long start = System.currentTimeMillis();
GetMemoryShuffleDataRequest rpcRequest = GetMemoryShuffleDataRequest
.newBuilder()
.setAppId(request.getAppId())
.setShuffleId(request.getShuffleId())
.setPartitionId(request.getPartitionId())
.setLastBlockId(request.getLastBlockId())
.setReadBufferSize(request.getReadBufferSize())
.setTimestamp(start)
.build();

long start = System.currentTimeMillis();
GetMemoryShuffleDataResponse rpcResponse = getBlockingStub().getMemoryShuffleData(rpcRequest);
String requestInfo = "appId[" + request.getAppId() + "], shuffleId["
+ request.getShuffleId() + "], partitionId[" + request.getPartitionId() + "]";
Expand Down
3 changes: 3 additions & 0 deletions proto/src/main/proto/Rss.proto
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ message GetLocalShuffleDataRequest {
int32 partitionNum = 5;
int64 offset = 6;
int32 length = 7;
int64 timestamp = 8;
}

message GetLocalShuffleDataResponse {
Expand All @@ -90,6 +91,7 @@ message GetMemoryShuffleDataRequest {
int32 partitionId = 3;
int64 lastBlockId = 4;
int32 readBufferSize = 5;
int64 timestamp = 6;
}

message GetMemoryShuffleDataResponse {
Expand Down Expand Up @@ -195,6 +197,7 @@ message SendShuffleDataRequest {
int32 shuffleId = 2;
int64 requireBufferId = 3;
repeated ShuffleData shuffleData = 4;
int64 timestamp = 5;
}

message SendShuffleDataResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ public class ShuffleServerGrpcMetrics extends GRPCMetrics {
"grpc_get_in_memory_shuffle_data_total";
private static final String GRPC_GET_SHUFFLE_INDEX_TOTAL = "grpc_get_local_shuffle_index_total";

private static final String GRPC_SEND_SHUFFLE_DATA_TRANSPORT_LATENCY =
"grpc_send_shuffle_data_transport_latency";
private static final String GRPC_GET_SHUFFLE_DATA_TRANSPORT_LATENCY =
"grpc_get_local_shuffle_data_transport_latency";
private static final String GRPC_GET_IN_MEMORY_SHUFFLE_DATA_TRANSPORT_LATENCY =
"grpc_get_in_memory_shuffle_data_transport_latency";

private static final String GRPC_SEND_SHUFFLE_DATA_PROCESS_LATENCY = "grpc_send_shuffle_data_process_latency";
private static final String GRPC_GET_SHUFFLE_DATA_PROCESS_LATENCY = "grpc_get_local_shuffle_data_process_latency";
private static final String GRPC_GET_IN_MEMORY_SHUFFLE_DATA_PROCESS_LATENCY =
"grpc_get_in_memory_shuffle_data_process_latency";

@Override
public void registerMetrics() {
gaugeGrpcOpen = metricsManager.addGauge(GRPC_OPEN);
Expand Down Expand Up @@ -110,6 +122,20 @@ public void registerMetrics() {
metricsManager.addCounter(GRPC_GET_IN_MEMORY_SHUFFLE_DATA_TOTAL));
counterMap.putIfAbsent(GET_SHUFFLE_INDEX_METHOD,
metricsManager.addCounter(GRPC_GET_SHUFFLE_INDEX_TOTAL));

transportTimeSummaryMap.putIfAbsent(SEND_SHUFFLE_DATA_METHOD,
metricsManager.addSummary(GRPC_SEND_SHUFFLE_DATA_TRANSPORT_LATENCY));
transportTimeSummaryMap.putIfAbsent(GET_SHUFFLE_DATA_METHOD,
metricsManager.addSummary(GRPC_GET_SHUFFLE_DATA_TRANSPORT_LATENCY));
transportTimeSummaryMap.putIfAbsent(GET_IN_MEMORY_SHUFFLE_DATA_METHOD,
metricsManager.addSummary(GRPC_GET_IN_MEMORY_SHUFFLE_DATA_TRANSPORT_LATENCY));

processTimeSummaryMap.putIfAbsent(SEND_SHUFFLE_DATA_METHOD,
metricsManager.addSummary(GRPC_SEND_SHUFFLE_DATA_PROCESS_LATENCY));
processTimeSummaryMap.putIfAbsent(GET_SHUFFLE_DATA_METHOD,
metricsManager.addSummary(GRPC_GET_SHUFFLE_DATA_PROCESS_LATENCY));
processTimeSummaryMap.putIfAbsent(GET_IN_MEMORY_SHUFFLE_DATA_METHOD,
metricsManager.addSummary(GRPC_GET_IN_MEMORY_SHUFFLE_DATA_PROCESS_LATENCY));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,21 @@ public void sendShuffleData(SendShuffleDataRequest req,
String appId = req.getAppId();
int shuffleId = req.getShuffleId();
long requireBufferId = req.getRequireBufferId();
long timestamp = req.getTimestamp();
if (timestamp > 0) {
/*
* Here we record the transport time, but we don't consider the impact of data size on transport time.
* The amount of data will not cause great fluctuations in latency. For example, 100K costs 1ms,
* and 1M costs 10ms. This seems like a normal fluctuation, but it may rise to 10s when the server load is high.
* In addition, we need to pay attention to that the time of the client machine and the machine
* time of the Shuffle Server should be kept in sync. TransportTime is accurate only if this condition is met.
* */
long transportTime = System.currentTimeMillis() - timestamp;
if (transportTime > 0) {
shuffleServer.getGrpcMetrics().recordTransportTime(
ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD, transportTime);
}
}
int requireSize = shuffleServer
.getShuffleTaskManager().getRequireBufferSize(requireBufferId);

Expand Down Expand Up @@ -240,8 +255,10 @@ public void sendShuffleData(SendShuffleDataRequest req,
}
}
reply = SendShuffleDataResponse.newBuilder().setStatus(valueOf(ret)).setRetMsg(responseMessage).build();
long costTime = System.currentTimeMillis() - start;
shuffleServer.getGrpcMetrics().recordProcessTime(ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD, costTime);
LOG.debug("Cache Shuffle Data for appId[" + appId + "], shuffleId[" + shuffleId
+ "], cost " + (System.currentTimeMillis() - start)
+ "], cost " + costTime
+ " ms with " + shufflePartitionedData.size() + " blocks and " + requireSize + " bytes");
} else {
reply = SendShuffleDataResponse
Expand Down Expand Up @@ -476,6 +493,14 @@ public void getLocalShuffleData(GetLocalShuffleDataRequest request,
int partitionNum = request.getPartitionNum();
long offset = request.getOffset();
int length = request.getLength();
long timestamp = request.getTimestamp();
if (timestamp > 0) {
long transportTime = System.currentTimeMillis() - timestamp;
if (transportTime > 0) {
shuffleServer.getGrpcMetrics().recordTransportTime(
ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD, transportTime);
}
}
String storageType = shuffleServer.getShuffleServerConf().get(RssBaseConf.RSS_STORAGE_TYPE);
StatusCode status = StatusCode.SUCCESS;
String msg = "OK";
Expand All @@ -497,6 +522,8 @@ public void getLocalShuffleData(GetLocalShuffleDataRequest request,
ShuffleServerMetrics.counterTotalReadTime.inc(readTime);
ShuffleServerMetrics.counterTotalReadDataSize.inc(sdr.getData().length);
ShuffleServerMetrics.counterTotalReadLocalDataFileSize.inc(sdr.getData().length);
shuffleServer.getGrpcMetrics().recordProcessTime(
ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD, readTime);
LOG.info("Successfully getShuffleData cost {} ms for shuffle"
+ " data with {}", readTime, requestInfo);
reply = GetLocalShuffleDataResponse.newBuilder()
Expand Down Expand Up @@ -607,6 +634,14 @@ public void getMemoryShuffleData(GetMemoryShuffleDataRequest request,
int partitionId = request.getPartitionId();
long blockId = request.getLastBlockId();
int readBufferSize = request.getReadBufferSize();
long timestamp = request.getTimestamp();
if (timestamp > 0) {
long transportTime = System.currentTimeMillis() - timestamp;
if (transportTime > 0) {
shuffleServer.getGrpcMetrics().recordTransportTime(
ShuffleServerGrpcMetrics.GET_IN_MEMORY_SHUFFLE_DATA_METHOD, transportTime);
}
}
long start = System.currentTimeMillis();
StatusCode status = StatusCode.SUCCESS;
String msg = "OK";
Expand All @@ -627,8 +662,11 @@ public void getMemoryShuffleData(GetMemoryShuffleDataRequest request,
ShuffleServerMetrics.counterTotalReadDataSize.inc(data.length);
ShuffleServerMetrics.counterTotalReadMemoryDataSize.inc(data.length);
}
long costTime = System.currentTimeMillis() - start;
shuffleServer.getGrpcMetrics().recordProcessTime(
ShuffleServerGrpcMetrics.GET_IN_MEMORY_SHUFFLE_DATA_METHOD, costTime);
LOG.info("Successfully getInMemoryShuffleData cost {} ms with {} bytes shuffle"
+ " data for {}", (System.currentTimeMillis() - start), data.length, requestInfo);
+ " data for {}", costTime, data.length, requestInfo);

reply = GetMemoryShuffleDataResponse.newBuilder()
.setStatus(valueOf(status))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.server;

import java.util.Map;

import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Summary;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class ShuffleServerGrpcMetricsTest {
@Test
public void testLatencyMetrics() {
ShuffleServerGrpcMetrics metrics = new ShuffleServerGrpcMetrics();
metrics.register(new CollectorRegistry(true));
metrics.recordTransportTime(ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD, 1000);
metrics.recordTransportTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD, 500);
metrics.recordTransportTime(ShuffleServerGrpcMetrics.GET_IN_MEMORY_SHUFFLE_DATA_METHOD, 200);
metrics.recordProcessTime(ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD, 1000);
metrics.recordProcessTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD, 500);
metrics.recordProcessTime(ShuffleServerGrpcMetrics.GET_IN_MEMORY_SHUFFLE_DATA_METHOD, 200);
Map<String, Summary> sendTimeSummaryTime = metrics.getTransportTimeSummaryMap();
Map<String, Summary> processTimeSummaryTime = metrics.getProcessTimeSummaryMap();
assertEquals(3, sendTimeSummaryTime.size());
assertEquals(3, processTimeSummaryTime.size());

assertEquals(1D, sendTimeSummaryTime.get(
ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD).get().sum);
assertEquals(0.5D, sendTimeSummaryTime.get(
ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD).get().sum);
assertEquals(0.2D, sendTimeSummaryTime.get(
ShuffleServerGrpcMetrics.GET_IN_MEMORY_SHUFFLE_DATA_METHOD).get().sum);

assertEquals(1D, processTimeSummaryTime.get(
ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD).get().sum);
assertEquals(0.5D, processTimeSummaryTime.get(
ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD).get().sum);
assertEquals(0.2D, processTimeSummaryTime.get(
ShuffleServerGrpcMetrics.GET_IN_MEMORY_SHUFFLE_DATA_METHOD).get().sum);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public void testGrpcMetrics() throws Exception {
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(content);
assertEquals(2, actualObj.size());
assertEquals(27, actualObj.get("metrics").size());
assertEquals(69, actualObj.get("metrics").size());
}

@Test
Expand Down

0 comments on commit 4004f44

Please sign in to comment.