Skip to content

Commit

Permalink
[Improvement] Small refactor for code quality (#394)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
tweaks some wording, fixes some typos and removes some dead code

### Why are the changes needed?
Better code quality

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing UTs.
  • Loading branch information
advancedxy authored Dec 11, 2022
1 parent 55191c4 commit ddf8384
Show file tree
Hide file tree
Showing 29 changed files with 62 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.IOException;
import java.nio.ByteBuffer;

import com.esotericsoftware.kryo.io.Input;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
Expand Down Expand Up @@ -54,7 +53,6 @@ public class RssShuffleDataIterator<K, C> extends AbstractIterator<Product2<K, C
private long readTime = 0;
private long serializeTime = 0;
private long decompressTime = 0;
private Input deserializationInput = null;
private DeserializationStream deserializationStream = null;
private ByteBufInputStream byteBufInputStream = null;
private long compressedBytesLength = 0;
Expand Down Expand Up @@ -95,13 +93,9 @@ private void clearDeserializationStream() {
LOG.warn("Can't close ByteBufInputStream, memory may be leaked.");
}
}
if (deserializationInput != null) {
deserializationInput.close();
}
if (deserializationStream != null) {
deserializationStream.close();
}
deserializationInput = null;
deserializationStream = null;
byteBufInputStream = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void addRecord(byte[] recordBuffer, int length) {
try {
System.arraycopy(recordBuffer, 0, buffer, nextOffset, length);
} catch (Exception e) {
LOG.error("Unexpect exception for System.arraycopy, length[" + length + "], nextOffset["
LOG.error("Unexpected exception for System.arraycopy, length[" + length + "], nextOffset["
+ nextOffset + "], bufferSize[" + bufferSize + "]");
throw e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ private ShuffleManager createShuffleManagerInDriver() throws RssException {
}

private boolean tryAccessCluster() {
String accessId = sparkConf.get(
RssSparkConfig.RSS_ACCESS_ID.key(), "").trim();
String accessId = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), "").trim();
if (StringUtils.isEmpty(accessId)) {
LOG.warn("Access id key is empty");
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,16 +168,19 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
private void writeImpl(Iterator<Product2<K,V>> records) {
List<ShuffleBlockInfo> shuffleBlockInfos = null;
Set<Long> blockIds = Sets.newConcurrentHashSet();
boolean isCombine = shuffleDependency.mapSideCombine();
Function1 createCombiner = null;
if (isCombine) {
createCombiner = shuffleDependency.aggregator().get().createCombiner();
}
while (records.hasNext()) {
// Task should fast fail when sending data failed
checkIfBlocksFailed();

Product2<K, V> record = records.next();
K key = record._1();
int partition = getPartition(key);
boolean isCombine = shuffleDependency.mapSideCombine();
if (isCombine) {
Function1 createCombiner = shuffleDependency.aggregator().get().createCombiner();
Object c = createCombiner.apply(record._2());
shuffleBlockInfos = bufferManager.addRecord(partition, record._1(), c);
} else {
Expand Down Expand Up @@ -216,7 +219,7 @@ private void processShuffleBlockInfos(List<ShuffleBlockInfo> shuffleBlockInfoLis
if (shuffleBlockInfoList != null && !shuffleBlockInfoList.isEmpty()) {
shuffleBlockInfoList.forEach(sbi -> {
long blockId = sbi.getBlockId();
// add blockId to set, check if it is send later
// add blockId to set, check if it is sent later
blockIds.add(blockId);
// update [partition, blockIds], it will be sent to shuffle server
int partitionId = sbi.getPartitionId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public static ShuffleClientFactory getInstance() {
}

/**
* Only for MR engine, which wont used to unregister to remote shuffle-servers
* Only for MR engine, which won't used to unregister to remote shuffle-servers
*/
public ShuffleWriteClient createShuffleWriteClient(
String clientType, int retryMax, long retryIntervalMax, int heartBeatThreadNum,
Expand All @@ -49,7 +49,7 @@ public ShuffleWriteClient createShuffleWriteClient(
String clientType, int retryMax, long retryIntervalMax, int heartBeatThreadNum,
int replica, int replicaWrite, int replicaRead, boolean replicaSkipEnabled, int dataTransferPoolSize,
int dataCommitPoolSize, int unregisterThreadPoolSize, int unregisterRequestTimeoutSec) {
// If replica > replicaWrite, blocks maybe will be sended for 2 rounds.
// If replica > replicaWrite, blocks maybe be sent for 2 rounds.
// We need retry less times in this case for let the first round fail fast.
if (replicaSkipEnabled && replica > replicaWrite) {
retryMax = retryMax / 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public CompressedShuffleBlock readShuffleBlockData() {
return null;
}

// if need request new data from shuffle server
// if client need request new data from shuffle server
if (bufferSegmentQueue.isEmpty()) {
if (read() <= 0) {
return null;
Expand All @@ -186,8 +186,6 @@ public CompressedShuffleBlock readShuffleBlockData() {
long actualCrc = -1;
try {
long start = System.currentTimeMillis();
copyTime.addAndGet(System.currentTimeMillis() - start);
start = System.currentTimeMillis();
expectedCrc = bs.getCrc();
actualCrc = ChecksumUtils.getCrc32(readBuffer, bs.getOffset(), bs.getLength());
crcCheckTime.addAndGet(System.currentTimeMillis() - start);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public ShuffleWriteClientImpl(
int replicaWrite,
int replicaRead,
boolean replicaSkipEnabled,
int dataTranferPoolSize,
int dataTransferPoolSize,
int dataCommitPoolSize,
int unregisterThreadPoolSize,
int unregisterRequestTimeSec) {
Expand All @@ -129,7 +129,7 @@ public ShuffleWriteClientImpl(
this.replicaWrite = replicaWrite;
this.replicaRead = replicaRead;
this.replicaSkipEnabled = replicaSkipEnabled;
this.dataTransferPool = Executors.newFixedThreadPool(dataTranferPoolSize);
this.dataTransferPool = Executors.newFixedThreadPool(dataTransferPoolSize);
this.dataCommitPoolSize = dataCommitPoolSize;
this.unregisterThreadPoolSize = unregisterThreadPoolSize;
this.unregisterRequestTimeSec = unregisterRequestTimeSec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

public class ClientUtils {

// BlockId is long and composed by partitionId, executorId and AtomicInteger
// BlockId is long and composed of partitionId, executorId and AtomicInteger.
// AtomicInteger is first 19 bit, max value is 2^19 - 1
// partitionId is next 24 bit, max value is 2^24 - 1
// taskAttemptId is rest of 20 bit, max value is 2^20 - 1
Expand Down Expand Up @@ -118,7 +118,7 @@ public static boolean waitUntilDoneOrFail(List<CompletableFuture<Boolean>> futur
public static void validateTestModeConf(boolean testMode, String storageType) {
if (!testMode && (StorageType.LOCALFILE.name().equals(storageType)
|| (StorageType.HDFS.name()).equals(storageType))) {
throw new IllegalArgumentException("RSS storage type about LOCALFILE and HDFS should be used in test mode, "
throw new IllegalArgumentException("LOCALFILE or HDFS storage type should be used in test mode only, "
+ "because of the poor performance of these two types.");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

/**
* This HadoopFilesystemProvider will provide the only entrypoint to get the hadoop filesystem whether
* the dfs cluster is kerberized or not.
* the dfs cluster is kerberos enabled or not.
*/
public class HadoopFilesystemProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(HadoopFilesystemProvider.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ public abstract class GRPCMetrics {
private static final String GRPC_SERVER_EXECUTOR_ACTIVE_THREADS = "grpc_server_executor_active_threads";
public static final String GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE_KEY = "grpcServerExecutorBlockingQueueSize";
private static final String GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE = "grpc_server_executor_blocking_queue_size";
public static final String GRCP_SERVER_CONNECTION_NUMBER_KEY = "grpcServerConnectionNumber";
private static final String GRCP_SERVER_CONNECTION_NUMBER = "grpc_server_connection_number";
public static final String GRPC_SERVER_CONNECTION_NUMBER_KEY = "grpcServerConnectionNumber";
private static final String GRPC_SERVER_CONNECTION_NUMBER = "grpc_server_connection_number";

private boolean isRegister = false;
private boolean isRegistered = false;
protected Map<String, Counter> counterMap = Maps.newConcurrentMap();
protected Map<String, Gauge> gaugeMap = Maps.newConcurrentMap();
protected Map<String, Summary> transportTimeSummaryMap = Maps.newConcurrentMap();
Expand All @@ -48,11 +48,11 @@ public abstract class GRPCMetrics {
public abstract void registerMetrics();

public void register(CollectorRegistry collectorRegistry) {
if (!isRegister) {
if (!isRegistered) {
metricsManager = new MetricsManager(collectorRegistry);
registerGeneralMetrics();
registerMetrics();
isRegister = true;
isRegistered = true;
}
}

Expand All @@ -66,13 +66,13 @@ private void registerGeneralMetrics() {
metricsManager.addGauge(GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE)
);
gaugeMap.putIfAbsent(
GRCP_SERVER_CONNECTION_NUMBER_KEY,
metricsManager.addGauge(GRCP_SERVER_CONNECTION_NUMBER)
GRPC_SERVER_CONNECTION_NUMBER_KEY,
metricsManager.addGauge(GRPC_SERVER_CONNECTION_NUMBER)
);
}

public void setGauge(String tag, double value) {
if (isRegister) {
if (isRegistered) {
Gauge gauge = gaugeMap.get(tag);
if (gauge != null) {
gauge.set(value);
Expand All @@ -81,7 +81,7 @@ public void setGauge(String tag, double value) {
}

public void incCounter(String methodName) {
if (isRegister) {
if (isRegistered) {
Gauge gauge = gaugeMap.get(methodName);
if (gauge != null) {
gauge.inc();
Expand All @@ -96,7 +96,7 @@ public void incCounter(String methodName) {
}

public void decCounter(String methodName) {
if (isRegister) {
if (isRegistered) {
Gauge gauge = gaugeMap.get(methodName);
if (gauge != null) {
gauge.dec();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import org.apache.uniffle.common.metrics.GRPCMetrics;

import static org.apache.uniffle.common.metrics.GRPCMetrics.GRCP_SERVER_CONNECTION_NUMBER_KEY;
import static org.apache.uniffle.common.metrics.GRPCMetrics.GRPC_SERVER_CONNECTION_NUMBER_KEY;

public class MonitoringServerTransportFilter extends ServerTransportFilter {
private final AtomicLong connectionSize = new AtomicLong(0);
Expand All @@ -35,12 +35,12 @@ public MonitoringServerTransportFilter(GRPCMetrics grpcMetrics) {
}

public Attributes transportReady(Attributes transportAttrs) {
grpcMetrics.setGauge(GRCP_SERVER_CONNECTION_NUMBER_KEY, connectionSize.incrementAndGet());
grpcMetrics.setGauge(GRPC_SERVER_CONNECTION_NUMBER_KEY, connectionSize.incrementAndGet());
return super.transportReady(transportAttrs);
}

public void transportTerminated(Attributes transportAttrs) {
grpcMetrics.setGauge(GRCP_SERVER_CONNECTION_NUMBER_KEY, connectionSize.decrementAndGet());
grpcMetrics.setGauge(GRPC_SERVER_CONNECTION_NUMBER_KEY, connectionSize.decrementAndGet());
super.transportTerminated(transportAttrs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

/**
* Configuration for Coordinator Service and rss-cluster, including service port,
* heartbeat interval and etc.
* heartbeat interval, etc.
*/
public class CoordinatorConf extends RssBaseConf {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.uniffle.proto.CoordinatorServerGrpc;
import org.apache.uniffle.proto.RssProtos;

import static org.apache.uniffle.common.metrics.GRPCMetrics.GRCP_SERVER_CONNECTION_NUMBER_KEY;
import static org.apache.uniffle.common.metrics.GRPCMetrics.GRPC_SERVER_CONNECTION_NUMBER_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;

/**
Expand Down Expand Up @@ -65,14 +65,14 @@ public void testGrpcConnectionSize() throws Exception {
grpcServer.start();

// case1: test the single one connection metric
double connSize = grpcMetrics.getGaugeMap().get(GRCP_SERVER_CONNECTION_NUMBER_KEY).get();
double connSize = grpcMetrics.getGaugeMap().get(GRPC_SERVER_CONNECTION_NUMBER_KEY).get();
assertEquals(0, connSize);

CoordinatorGrpcClient coordinatorGrpcClient = new CoordinatorGrpcClient("localhost", 20001);
coordinatorGrpcClient.registerApplicationInfo(
new RssApplicationInfoRequest("testGrpcConnectionSize", 10000, "user"));

connSize = grpcMetrics.getGaugeMap().get(GRCP_SERVER_CONNECTION_NUMBER_KEY).get();
connSize = grpcMetrics.getGaugeMap().get(GRPC_SERVER_CONNECTION_NUMBER_KEY).get();
assertEquals(1, connSize);

// case2: test the multiple connections
Expand All @@ -81,7 +81,7 @@ public void testGrpcConnectionSize() throws Exception {
client1.registerApplicationInfo(new RssApplicationInfoRequest("testGrpcConnectionSize", 10000, "user"));
client2.registerApplicationInfo(new RssApplicationInfoRequest("testGrpcConnectionSize", 10000, "user"));

connSize = grpcMetrics.getGaugeMap().get(GRCP_SERVER_CONNECTION_NUMBER_KEY).get();
connSize = grpcMetrics.getGaugeMap().get(GRPC_SERVER_CONNECTION_NUMBER_KEY).get();
assertEquals(3, connSize);

grpcServer.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import org.apache.uniffle.server.ShuffleServer;
import org.apache.uniffle.server.ShuffleServerConf;

import static org.apache.uniffle.common.metrics.GRPCMetrics.GRCP_SERVER_CONNECTION_NUMBER_KEY;
import static org.apache.uniffle.common.metrics.GRPCMetrics.GRPC_SERVER_CONNECTION_NUMBER_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -265,7 +265,7 @@ public void rpcMetricsTest() throws Exception {
assertEquals(oldValue + 1, newValue, 0.5);

double connectionSize = coordinators.get(0)
.getGrpcMetrics().getGaugeMap().get(GRCP_SERVER_CONNECTION_NUMBER_KEY).get();
.getGrpcMetrics().getGaugeMap().get(GRPC_SERVER_CONNECTION_NUMBER_KEY).get();
assertTrue(connectionSize > 0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public List<CoordinatorClient> createCoordinatorClient(String coordinators) {
LOG.info("Start to create coordinator clients from {}", coordinators);
List<CoordinatorClient> coordinatorClients = Lists.newLinkedList();
String[] coordinatorList = coordinators.trim().split(",");
if (coordinatorList.length <= 0) {
if (coordinatorList.length == 0) {
String msg = "Invalid " + coordinators;
LOG.error(msg);
throw new RuntimeException(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import org.slf4j.LoggerFactory;

/**
* HealthCheck will check every server whether has the ability to process shuffle data. Currently, we only support disk
* checker. If enough disks don't have enough disk space, server will become unhealthy, and only enough disks
* HealthCheck will check every server whether it has the ability to process shuffle data. Currently, we only support
* disk checker. If enough disks don't have enough disk space, server will become unhealthy, and only enough disks
* have enough disk space, server will become healthy again.
**/
public class HealthCheck {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,9 @@ private void initialization() throws Exception {

boolean healthCheckEnable = shuffleServerConf.getBoolean(ShuffleServerConf.HEALTH_CHECK_ENABLE);
if (healthCheckEnable) {
List<Checker> buildInCheckers = Lists.newArrayList();
buildInCheckers.add(storageManager.getStorageChecker());
healthCheck = new HealthCheck(isHealthy, shuffleServerConf, buildInCheckers);
List<Checker> builtInCheckers = Lists.newArrayList();
builtInCheckers.add(storageManager.getStorageChecker());
healthCheck = new HealthCheck(isHealthy, shuffleServerConf, builtInCheckers);
healthCheck.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ public void checkLeakShuffleData() {
LOG.info("Start check leak shuffle data");
try {
Set<String> appIds = Sets.newHashSet(shuffleTaskInfos.keySet());
storageManager.checkAndClearLeakShuffleData(appIds);
storageManager.checkAndClearLeakedShuffleData(appIds);
LOG.info("Finish check leak shuffle data");
} catch (Exception e) {
LOG.warn("Error happened in checkLeakShuffleData", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ private void updateShuffleData(List<ShufflePartitionedBlock> readBlocks, byte[]
try {
System.arraycopy(block.getData(), 0, data, offset, block.getLength());
} catch (Exception e) {
LOG.error("Unexpect exception for System.arraycopy, length["
LOG.error("Unexpected exception for System.arraycopy, length["
+ block.getLength() + "], offset["
+ offset + "], dataLength[" + data.length + "]", e);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public StatusCode registerBuffer(String appId, int shuffleId, int startPartition
public StatusCode cacheShuffleData(String appId, int shuffleId,
boolean isPreAllocated, ShufflePartitionedData spd) {
if (!isPreAllocated && isFull()) {
LOG.warn("Got unexpect data, can't cache it because the space is full");
LOG.warn("Got unexpected data, can't cache it because the space is full");
return StatusCode.NO_BUFFER;
}

Expand Down Expand Up @@ -182,7 +182,7 @@ public ShuffleDataResult getShuffleData(

void flushSingleBufferIfNecessary(ShuffleBuffer buffer, String appId,
int shuffleId, int startPartition, int endPartition) {
// When we use multistorage and trigger single buffer flush, the buffer size should be bigger
// When we use multi storage and trigger single buffer flush, the buffer size should be bigger
// than rss.server.flush.cold.storage.threshold.size, otherwise cold storage will be useless.
if (this.bufferFlushEnabled && buffer.getSize() > this.bufferFlushThreshold) {
flushBuffer(buffer, appId, shuffleId, startPartition, endPartition);
Expand Down Expand Up @@ -428,7 +428,7 @@ private Map<String, Set<Integer>> pickFlushedShuffle() {

Map<String, Set<Integer>> pickedShuffle = Maps.newHashMap();
// The algorithm here is to flush data size > highWaterMark - lowWaterMark
// the remain data in buffer maybe more than lowWaterMark
// the remaining data in buffer maybe more than lowWaterMark
// because shuffle server is still receiving data, but it should be ok
long expectedFlushSize = highWaterMark - lowWaterMark;
long pickedFlushSize = 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void registerRemoteStorage(String appId, RemoteStorageInfo remoteStorageI
}

@Override
public void checkAndClearLeakShuffleData(Collection<String> appIds) {
public void checkAndClearLeakedShuffleData(Collection<String> appIds) {
}

public HdfsStorage getStorageByAppId(String appId) {
Expand Down
Loading

0 comments on commit ddf8384

Please sign in to comment.