From ddf83843a9fbce8155d159815c351b16eb5a9704 Mon Sep 17 00:00:00 2001 From: advancedxy Date: Mon, 12 Dec 2022 00:01:05 +0800 Subject: [PATCH] [Improvement] Small refactor for code quality (#394) ### What changes were proposed in this pull request? tweaks some wording, fixes some typos and removes some dead code ### Why are the changes needed? Better code quality ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing UTs. --- .../reader/RssShuffleDataIterator.java | 6 ------ .../spark/shuffle/writer/WriterBuffer.java | 2 +- .../shuffle/DelegationRssShuffleManager.java | 3 +-- .../shuffle/writer/RssShuffleWriter.java | 9 ++++++--- .../client/factory/ShuffleClientFactory.java | 4 ++-- .../client/impl/ShuffleReadClientImpl.java | 4 +--- .../client/impl/ShuffleWriteClientImpl.java | 4 ++-- .../uniffle/client/util/ClientUtils.java | 4 ++-- .../filesystem/HadoopFilesystemProvider.java | 2 +- .../uniffle/common/metrics/GRPCMetrics.java | 20 +++++++++---------- .../rpc/MonitoringServerTransportFilter.java | 6 +++--- .../uniffle/coordinator/CoordinatorConf.java | 2 +- .../test/CoordinatorGrpcServerTest.java | 8 ++++---- .../uniffle/test/CoordinatorGrpcTest.java | 4 ++-- .../factory/CoordinatorClientFactory.java | 2 +- .../apache/uniffle/server/HealthCheck.java | 4 ++-- .../apache/uniffle/server/ShuffleServer.java | 6 +++--- .../uniffle/server/ShuffleTaskManager.java | 2 +- .../uniffle/server/buffer/ShuffleBuffer.java | 2 +- .../server/buffer/ShuffleBufferManager.java | 6 +++--- .../server/storage/HdfsStorageManager.java | 2 +- .../server/storage/LocalStorageManager.java | 2 +- .../server/storage/MultiStorageManager.java | 4 ++-- .../server/storage/StorageManager.java | 2 +- .../server/ShuffleFlushManagerTest.java | 2 +- .../server/ShuffleTaskManagerTest.java | 2 +- .../storage/LocalStorageManagerTest.java | 2 +- .../storage/common/LocalStorageMeta.java | 4 ++-- .../storage/util/ShuffleStorageUtils.java | 10 +++++----- 29 files changed, 62 insertions(+), 68 deletions(-) diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java index 7dec34e87f..dd01d517cb 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java +++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.nio.ByteBuffer; -import com.esotericsoftware.kryo.io.Input; import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.Unpooled; @@ -54,7 +53,6 @@ public class RssShuffleDataIterator extends AbstractIterator> records) throws IOException { private void writeImpl(Iterator> records) { List shuffleBlockInfos = null; Set blockIds = Sets.newConcurrentHashSet(); + boolean isCombine = shuffleDependency.mapSideCombine(); + Function1 createCombiner = null; + if (isCombine) { + createCombiner = shuffleDependency.aggregator().get().createCombiner(); + } while (records.hasNext()) { // Task should fast fail when sending data failed checkIfBlocksFailed(); @@ -175,9 +180,7 @@ private void writeImpl(Iterator> records) { Product2 record = records.next(); K key = record._1(); int partition = getPartition(key); - boolean isCombine = shuffleDependency.mapSideCombine(); if (isCombine) { - Function1 createCombiner = shuffleDependency.aggregator().get().createCombiner(); Object c = createCombiner.apply(record._2()); shuffleBlockInfos = bufferManager.addRecord(partition, record._1(), c); } else { @@ -216,7 +219,7 @@ private void processShuffleBlockInfos(List shuffleBlockInfoLis if (shuffleBlockInfoList != null && !shuffleBlockInfoList.isEmpty()) { shuffleBlockInfoList.forEach(sbi -> { long blockId = sbi.getBlockId(); - // add blockId to set, check if it is send later + // add blockId to set, check if it is sent later blockIds.add(blockId); // update [partition, blockIds], it will be sent to shuffle server int partitionId = sbi.getPartitionId(); diff --git a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java index 547e6af09d..caaa14c59e 100644 --- a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java +++ b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java @@ -35,7 +35,7 @@ public static ShuffleClientFactory getInstance() { } /** - * Only for MR engine, which wont used to unregister to remote shuffle-servers + * Only for MR engine, which won't used to unregister to remote shuffle-servers */ public ShuffleWriteClient createShuffleWriteClient( String clientType, int retryMax, long retryIntervalMax, int heartBeatThreadNum, @@ -49,7 +49,7 @@ public ShuffleWriteClient createShuffleWriteClient( String clientType, int retryMax, long retryIntervalMax, int heartBeatThreadNum, int replica, int replicaWrite, int replicaRead, boolean replicaSkipEnabled, int dataTransferPoolSize, int dataCommitPoolSize, int unregisterThreadPoolSize, int unregisterRequestTimeoutSec) { - // If replica > replicaWrite, blocks maybe will be sended for 2 rounds. + // If replica > replicaWrite, blocks maybe be sent for 2 rounds. // We need retry less times in this case for let the first round fail fast. if (replicaSkipEnabled && replica > replicaWrite) { retryMax = retryMax / 2; diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java index eacefe789c..333fe527e6 100644 --- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java +++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java @@ -159,7 +159,7 @@ public CompressedShuffleBlock readShuffleBlockData() { return null; } - // if need request new data from shuffle server + // if client need request new data from shuffle server if (bufferSegmentQueue.isEmpty()) { if (read() <= 0) { return null; @@ -186,8 +186,6 @@ public CompressedShuffleBlock readShuffleBlockData() { long actualCrc = -1; try { long start = System.currentTimeMillis(); - copyTime.addAndGet(System.currentTimeMillis() - start); - start = System.currentTimeMillis(); expectedCrc = bs.getCrc(); actualCrc = ChecksumUtils.getCrc32(readBuffer, bs.getOffset(), bs.getLength()); crcCheckTime.addAndGet(System.currentTimeMillis() - start); diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java index 1d76a41a30..60a775f040 100644 --- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java +++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java @@ -115,7 +115,7 @@ public ShuffleWriteClientImpl( int replicaWrite, int replicaRead, boolean replicaSkipEnabled, - int dataTranferPoolSize, + int dataTransferPoolSize, int dataCommitPoolSize, int unregisterThreadPoolSize, int unregisterRequestTimeSec) { @@ -129,7 +129,7 @@ public ShuffleWriteClientImpl( this.replicaWrite = replicaWrite; this.replicaRead = replicaRead; this.replicaSkipEnabled = replicaSkipEnabled; - this.dataTransferPool = Executors.newFixedThreadPool(dataTranferPoolSize); + this.dataTransferPool = Executors.newFixedThreadPool(dataTransferPoolSize); this.dataCommitPoolSize = dataCommitPoolSize; this.unregisterThreadPoolSize = unregisterThreadPoolSize; this.unregisterRequestTimeSec = unregisterRequestTimeSec; diff --git a/client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java b/client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java index 803d65bc83..0bdf7cf00b 100644 --- a/client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java +++ b/client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java @@ -30,7 +30,7 @@ public class ClientUtils { - // BlockId is long and composed by partitionId, executorId and AtomicInteger + // BlockId is long and composed of partitionId, executorId and AtomicInteger. // AtomicInteger is first 19 bit, max value is 2^19 - 1 // partitionId is next 24 bit, max value is 2^24 - 1 // taskAttemptId is rest of 20 bit, max value is 2^20 - 1 @@ -118,7 +118,7 @@ public static boolean waitUntilDoneOrFail(List> futur public static void validateTestModeConf(boolean testMode, String storageType) { if (!testMode && (StorageType.LOCALFILE.name().equals(storageType) || (StorageType.HDFS.name()).equals(storageType))) { - throw new IllegalArgumentException("RSS storage type about LOCALFILE and HDFS should be used in test mode, " + throw new IllegalArgumentException("LOCALFILE or HDFS storage type should be used in test mode only, " + "because of the poor performance of these two types."); } } diff --git a/common/src/main/java/org/apache/uniffle/common/filesystem/HadoopFilesystemProvider.java b/common/src/main/java/org/apache/uniffle/common/filesystem/HadoopFilesystemProvider.java index ada4d06d98..96affc3b11 100644 --- a/common/src/main/java/org/apache/uniffle/common/filesystem/HadoopFilesystemProvider.java +++ b/common/src/main/java/org/apache/uniffle/common/filesystem/HadoopFilesystemProvider.java @@ -32,7 +32,7 @@ /** * This HadoopFilesystemProvider will provide the only entrypoint to get the hadoop filesystem whether - * the dfs cluster is kerberized or not. + * the dfs cluster is kerberos enabled or not. */ public class HadoopFilesystemProvider { private static final Logger LOGGER = LoggerFactory.getLogger(HadoopFilesystemProvider.class); diff --git a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java b/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java index 7d8b484666..c693604d16 100644 --- a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java +++ b/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java @@ -33,10 +33,10 @@ public abstract class GRPCMetrics { private static final String GRPC_SERVER_EXECUTOR_ACTIVE_THREADS = "grpc_server_executor_active_threads"; public static final String GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE_KEY = "grpcServerExecutorBlockingQueueSize"; private static final String GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE = "grpc_server_executor_blocking_queue_size"; - public static final String GRCP_SERVER_CONNECTION_NUMBER_KEY = "grpcServerConnectionNumber"; - private static final String GRCP_SERVER_CONNECTION_NUMBER = "grpc_server_connection_number"; + public static final String GRPC_SERVER_CONNECTION_NUMBER_KEY = "grpcServerConnectionNumber"; + private static final String GRPC_SERVER_CONNECTION_NUMBER = "grpc_server_connection_number"; - private boolean isRegister = false; + private boolean isRegistered = false; protected Map counterMap = Maps.newConcurrentMap(); protected Map gaugeMap = Maps.newConcurrentMap(); protected Map transportTimeSummaryMap = Maps.newConcurrentMap(); @@ -48,11 +48,11 @@ public abstract class GRPCMetrics { public abstract void registerMetrics(); public void register(CollectorRegistry collectorRegistry) { - if (!isRegister) { + if (!isRegistered) { metricsManager = new MetricsManager(collectorRegistry); registerGeneralMetrics(); registerMetrics(); - isRegister = true; + isRegistered = true; } } @@ -66,13 +66,13 @@ private void registerGeneralMetrics() { metricsManager.addGauge(GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE) ); gaugeMap.putIfAbsent( - GRCP_SERVER_CONNECTION_NUMBER_KEY, - metricsManager.addGauge(GRCP_SERVER_CONNECTION_NUMBER) + GRPC_SERVER_CONNECTION_NUMBER_KEY, + metricsManager.addGauge(GRPC_SERVER_CONNECTION_NUMBER) ); } public void setGauge(String tag, double value) { - if (isRegister) { + if (isRegistered) { Gauge gauge = gaugeMap.get(tag); if (gauge != null) { gauge.set(value); @@ -81,7 +81,7 @@ public void setGauge(String tag, double value) { } public void incCounter(String methodName) { - if (isRegister) { + if (isRegistered) { Gauge gauge = gaugeMap.get(methodName); if (gauge != null) { gauge.inc(); @@ -96,7 +96,7 @@ public void incCounter(String methodName) { } public void decCounter(String methodName) { - if (isRegister) { + if (isRegistered) { Gauge gauge = gaugeMap.get(methodName); if (gauge != null) { gauge.dec(); diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/MonitoringServerTransportFilter.java b/common/src/main/java/org/apache/uniffle/common/rpc/MonitoringServerTransportFilter.java index 62ad537adf..085b13fd34 100644 --- a/common/src/main/java/org/apache/uniffle/common/rpc/MonitoringServerTransportFilter.java +++ b/common/src/main/java/org/apache/uniffle/common/rpc/MonitoringServerTransportFilter.java @@ -24,7 +24,7 @@ import org.apache.uniffle.common.metrics.GRPCMetrics; -import static org.apache.uniffle.common.metrics.GRPCMetrics.GRCP_SERVER_CONNECTION_NUMBER_KEY; +import static org.apache.uniffle.common.metrics.GRPCMetrics.GRPC_SERVER_CONNECTION_NUMBER_KEY; public class MonitoringServerTransportFilter extends ServerTransportFilter { private final AtomicLong connectionSize = new AtomicLong(0); @@ -35,12 +35,12 @@ public MonitoringServerTransportFilter(GRPCMetrics grpcMetrics) { } public Attributes transportReady(Attributes transportAttrs) { - grpcMetrics.setGauge(GRCP_SERVER_CONNECTION_NUMBER_KEY, connectionSize.incrementAndGet()); + grpcMetrics.setGauge(GRPC_SERVER_CONNECTION_NUMBER_KEY, connectionSize.incrementAndGet()); return super.transportReady(transportAttrs); } public void transportTerminated(Attributes transportAttrs) { - grpcMetrics.setGauge(GRCP_SERVER_CONNECTION_NUMBER_KEY, connectionSize.decrementAndGet()); + grpcMetrics.setGauge(GRPC_SERVER_CONNECTION_NUMBER_KEY, connectionSize.decrementAndGet()); super.transportTerminated(transportAttrs); } } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java index bea5301825..6f2b94137e 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java @@ -33,7 +33,7 @@ /** * Configuration for Coordinator Service and rss-cluster, including service port, - * heartbeat interval and etc. + * heartbeat interval, etc. */ public class CoordinatorConf extends RssBaseConf { diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java index 4b5016df45..99abcee406 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java @@ -30,7 +30,7 @@ import org.apache.uniffle.proto.CoordinatorServerGrpc; import org.apache.uniffle.proto.RssProtos; -import static org.apache.uniffle.common.metrics.GRPCMetrics.GRCP_SERVER_CONNECTION_NUMBER_KEY; +import static org.apache.uniffle.common.metrics.GRPCMetrics.GRPC_SERVER_CONNECTION_NUMBER_KEY; import static org.junit.jupiter.api.Assertions.assertEquals; /** @@ -65,14 +65,14 @@ public void testGrpcConnectionSize() throws Exception { grpcServer.start(); // case1: test the single one connection metric - double connSize = grpcMetrics.getGaugeMap().get(GRCP_SERVER_CONNECTION_NUMBER_KEY).get(); + double connSize = grpcMetrics.getGaugeMap().get(GRPC_SERVER_CONNECTION_NUMBER_KEY).get(); assertEquals(0, connSize); CoordinatorGrpcClient coordinatorGrpcClient = new CoordinatorGrpcClient("localhost", 20001); coordinatorGrpcClient.registerApplicationInfo( new RssApplicationInfoRequest("testGrpcConnectionSize", 10000, "user")); - connSize = grpcMetrics.getGaugeMap().get(GRCP_SERVER_CONNECTION_NUMBER_KEY).get(); + connSize = grpcMetrics.getGaugeMap().get(GRPC_SERVER_CONNECTION_NUMBER_KEY).get(); assertEquals(1, connSize); // case2: test the multiple connections @@ -81,7 +81,7 @@ public void testGrpcConnectionSize() throws Exception { client1.registerApplicationInfo(new RssApplicationInfoRequest("testGrpcConnectionSize", 10000, "user")); client2.registerApplicationInfo(new RssApplicationInfoRequest("testGrpcConnectionSize", 10000, "user")); - connSize = grpcMetrics.getGaugeMap().get(GRCP_SERVER_CONNECTION_NUMBER_KEY).get(); + connSize = grpcMetrics.getGaugeMap().get(GRPC_SERVER_CONNECTION_NUMBER_KEY).get(); assertEquals(3, connSize); grpcServer.stop(); diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java index 7573c61677..4b20b40c6e 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java @@ -48,7 +48,7 @@ import org.apache.uniffle.server.ShuffleServer; import org.apache.uniffle.server.ShuffleServerConf; -import static org.apache.uniffle.common.metrics.GRPCMetrics.GRCP_SERVER_CONNECTION_NUMBER_KEY; +import static org.apache.uniffle.common.metrics.GRPCMetrics.GRPC_SERVER_CONNECTION_NUMBER_KEY; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -265,7 +265,7 @@ public void rpcMetricsTest() throws Exception { assertEquals(oldValue + 1, newValue, 0.5); double connectionSize = coordinators.get(0) - .getGrpcMetrics().getGaugeMap().get(GRCP_SERVER_CONNECTION_NUMBER_KEY).get(); + .getGrpcMetrics().getGaugeMap().get(GRPC_SERVER_CONNECTION_NUMBER_KEY).get(); assertTrue(connectionSize > 0); } diff --git a/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java b/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java index b9f23058f9..fab7416007 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java @@ -50,7 +50,7 @@ public List createCoordinatorClient(String coordinators) { LOG.info("Start to create coordinator clients from {}", coordinators); List coordinatorClients = Lists.newLinkedList(); String[] coordinatorList = coordinators.trim().split(","); - if (coordinatorList.length <= 0) { + if (coordinatorList.length == 0) { String msg = "Invalid " + coordinators; LOG.error(msg); throw new RuntimeException(msg); diff --git a/server/src/main/java/org/apache/uniffle/server/HealthCheck.java b/server/src/main/java/org/apache/uniffle/server/HealthCheck.java index ac21024b7e..ecfa28b1d4 100644 --- a/server/src/main/java/org/apache/uniffle/server/HealthCheck.java +++ b/server/src/main/java/org/apache/uniffle/server/HealthCheck.java @@ -30,8 +30,8 @@ import org.slf4j.LoggerFactory; /** - * HealthCheck will check every server whether has the ability to process shuffle data. Currently, we only support disk - * checker. If enough disks don't have enough disk space, server will become unhealthy, and only enough disks + * HealthCheck will check every server whether it has the ability to process shuffle data. Currently, we only support + * disk checker. If enough disks don't have enough disk space, server will become unhealthy, and only enough disks * have enough disk space, server will become healthy again. **/ public class HealthCheck { diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java index 10d59f8acd..56e1a68a55 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java @@ -179,9 +179,9 @@ private void initialization() throws Exception { boolean healthCheckEnable = shuffleServerConf.getBoolean(ShuffleServerConf.HEALTH_CHECK_ENABLE); if (healthCheckEnable) { - List buildInCheckers = Lists.newArrayList(); - buildInCheckers.add(storageManager.getStorageChecker()); - healthCheck = new HealthCheck(isHealthy, shuffleServerConf, buildInCheckers); + List builtInCheckers = Lists.newArrayList(); + builtInCheckers.add(storageManager.getStorageChecker()); + healthCheck = new HealthCheck(isHealthy, shuffleServerConf, builtInCheckers); healthCheck.start(); } diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java index 71371ae86f..c197b525fe 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java @@ -465,7 +465,7 @@ public void checkLeakShuffleData() { LOG.info("Start check leak shuffle data"); try { Set appIds = Sets.newHashSet(shuffleTaskInfos.keySet()); - storageManager.checkAndClearLeakShuffleData(appIds); + storageManager.checkAndClearLeakedShuffleData(appIds); LOG.info("Finish check leak shuffle data"); } catch (Exception e) { LOG.warn("Error happened in checkLeakShuffleData", e); diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java index 9b0a50a2d3..187d6d6545 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java @@ -255,7 +255,7 @@ private void updateShuffleData(List readBlocks, byte[] try { System.arraycopy(block.getData(), 0, data, offset, block.getLength()); } catch (Exception e) { - LOG.error("Unexpect exception for System.arraycopy, length[" + LOG.error("Unexpected exception for System.arraycopy, length[" + block.getLength() + "], offset[" + offset + "], dataLength[" + data.length + "]", e); throw e; diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java index ddad74b3b9..a5aa997312 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java @@ -103,7 +103,7 @@ public StatusCode registerBuffer(String appId, int shuffleId, int startPartition public StatusCode cacheShuffleData(String appId, int shuffleId, boolean isPreAllocated, ShufflePartitionedData spd) { if (!isPreAllocated && isFull()) { - LOG.warn("Got unexpect data, can't cache it because the space is full"); + LOG.warn("Got unexpected data, can't cache it because the space is full"); return StatusCode.NO_BUFFER; } @@ -182,7 +182,7 @@ public ShuffleDataResult getShuffleData( void flushSingleBufferIfNecessary(ShuffleBuffer buffer, String appId, int shuffleId, int startPartition, int endPartition) { - // When we use multistorage and trigger single buffer flush, the buffer size should be bigger + // When we use multi storage and trigger single buffer flush, the buffer size should be bigger // than rss.server.flush.cold.storage.threshold.size, otherwise cold storage will be useless. if (this.bufferFlushEnabled && buffer.getSize() > this.bufferFlushThreshold) { flushBuffer(buffer, appId, shuffleId, startPartition, endPartition); @@ -428,7 +428,7 @@ private Map> pickFlushedShuffle() { Map> pickedShuffle = Maps.newHashMap(); // The algorithm here is to flush data size > highWaterMark - lowWaterMark - // the remain data in buffer maybe more than lowWaterMark + // the remaining data in buffer maybe more than lowWaterMark // because shuffle server is still receiving data, but it should be ok long expectedFlushSize = highWaterMark - lowWaterMark; long pickedFlushSize = 0L; diff --git a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java index 09c7dd03c9..516391cd5f 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java @@ -138,7 +138,7 @@ public void registerRemoteStorage(String appId, RemoteStorageInfo remoteStorageI } @Override - public void checkAndClearLeakShuffleData(Collection appIds) { + public void checkAndClearLeakedShuffleData(Collection appIds) { } public HdfsStorage getStorageByAppId(String appId) { diff --git a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java index 4e47a34f52..ee09993901 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java @@ -223,7 +223,7 @@ public void registerRemoteStorage(String appId, RemoteStorageInfo remoteStorageI } @Override - public void checkAndClearLeakShuffleData(Collection appIds) { + public void checkAndClearLeakedShuffleData(Collection appIds) { Set appIdsOnStorages = new HashSet<>(); for (LocalStorage localStorage : localStorages) { if (!localStorage.isCorrupted()) { diff --git a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java index ee53ff0330..53039279a2 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java @@ -140,8 +140,8 @@ public boolean canWrite(ShuffleDataFlushEvent event) { } @Override - public void checkAndClearLeakShuffleData(Collection appIds) { - warmStorageManager.checkAndClearLeakShuffleData(appIds); + public void checkAndClearLeakedShuffleData(Collection appIds) { + warmStorageManager.checkAndClearLeakedShuffleData(appIds); } public void removeResources(PurgeEvent event) { diff --git a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java index 2ba7b4c50d..2a487535d5 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java @@ -53,5 +53,5 @@ public interface StorageManager { // todo: add an interface that check storage isHealthy - void checkAndClearLeakShuffleData(Collection appIds); + void checkAndClearLeakedShuffleData(Collection appIds); } diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java index 8d27f0f81e..3e8afaa9c0 100644 --- a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java @@ -258,7 +258,7 @@ public void clearTest() throws Exception { size = storage.getHandlerSize(); assertEquals(0, size); // fs create a remoteStorage for appId2 before remove resources, - // but thecache from appIdToStorages has removed, so we need to delete this path in hdfs + // but the cache from appIdToStorages has been removed, so we need to delete this path in hdfs Path path = new Path(remoteStorage.getPath() + "/" + appId2 + "/"); assertTrue(fs.mkdirs(path)); storageManager.removeResources( diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java index 129806c3b2..31e48903d2 100644 --- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java @@ -68,7 +68,7 @@ public class ShuffleTaskManagerTest extends HdfsTestBase { - private static AtomicInteger ATOMIC_INT = new AtomicInteger(0); + private static final AtomicInteger ATOMIC_INT = new AtomicInteger(0); @AfterAll public static void tearDown() { diff --git a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java index da6e5ae5aa..da536099ec 100644 --- a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java @@ -162,7 +162,7 @@ public void testInitializeLocalStorage() throws IOException { localStorageManager = new LocalStorageManager(conf); assertEquals(2, localStorageManager.getStorages().size()); - // case4: only have 1 candidates, but exceed the number of rss.server.localstorage.initialize.max.fail.number + // case4: only have 1 candidate, but exceed the number of rss.server.localstorage.initialize.max.fail.number conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("/a/rss-data", "/tmp/rss-data-1")); conf.setLong(ShuffleServerConf.LOCAL_STORAGE_INITIALIZE_MAX_FAIL_NUMBER, 0L); try { diff --git a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java index 0751398177..cdfea45807 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java +++ b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java @@ -34,7 +34,7 @@ /** * Metadata has three dimensions from top to down including disk, shuffle, partition. - * And each dimensions contains two aspects, status data and indicator data. + * And each dimension contains two aspects, status data and indicator data. * Disk status data contains writable flag, Shuffle status data contains stable, uploading, deleting flag. * Disk indicator data contains size, fileNum, shuffleNum, Shuffle indicator contains size, partition list, * uploaded partition list and uploaded size. @@ -48,7 +48,7 @@ public class LocalStorageMeta { // todo: add ut public List getSortedShuffleKeys(boolean checkRead, int hint) { // Filter the unread shuffle is checkRead is true - // Filter the remain size is 0 + // Filter the remaining size is 0 List> shuffleMetaList = shuffleMetaMap .entrySet() .stream() diff --git a/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java b/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java index 7deda234ff..71cca83cd6 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java +++ b/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java @@ -78,7 +78,7 @@ public static List mergeSegments( } else { Collections.sort(segments); long start = -1; - long lastestPosition = -1; + long latestPosition = -1; long skipThreshold = readBufferSize / 2; long lastPosition = Long.MAX_VALUE; List bufferSegments = Lists.newArrayList(); @@ -94,16 +94,16 @@ public static List mergeSegments( bufferSegments = Lists.newArrayList(); start = segment.getOffset(); } - lastestPosition = segment.getOffset() + segment.getLength(); + latestPosition = segment.getOffset() + segment.getLength(); bufferSegments.add(new BufferSegment(segment.getBlockId(), segment.getOffset() - start, segment.getLength(), segment.getUncompressLength(), segment.getCrc(), segment.getTaskAttemptId())); - if (lastestPosition - start >= readBufferSize) { + if (latestPosition - start >= readBufferSize) { dataFileSegments.add(new DataFileSegment( - path, start, (int) (lastestPosition - start), bufferSegments)); + path, start, (int) (latestPosition - start), bufferSegments)); start = -1; } - lastPosition = lastestPosition; + lastPosition = latestPosition; } if (start > -1) { dataFileSegments.add(new DataFileSegment(path, start, (int) (lastPosition - start), bufferSegments));