diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java index e60efc6565f5..bc6ec923aaf9 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java @@ -35,14 +35,15 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Comparator; import java.util.Objects; -public class Peer { +public class Peer implements Comparable { private final Logger logger = LoggerFactory.getLogger(Peer.class); private final ConsensusGroupId groupId; - private final TEndPoint endpoint; private final int nodeId; + private final TEndPoint endpoint; public Peer(ConsensusGroupId groupId, int nodeId, TEndPoint endpoint) { this.groupId = groupId; @@ -105,6 +106,14 @@ public String toString() { return "Peer{" + "groupId=" + groupId + ", endpoint=" + endpoint + ", nodeId=" + nodeId + '}'; } + @Override + public int compareTo(Peer peer) { + return Comparator.comparing(Peer::getGroupId) + .thenComparingInt(Peer::getNodeId) + .thenComparing(Peer::getEndpoint) + .compare(this, peer); + } + public static Peer valueOf( TConsensusGroupId consensusGroupId, TDataNodeLocation dataNodeLocation) { if (consensusGroupId.getType() == TConsensusGroupType.SchemaRegion) { diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java index 75d1ca399182..164de58aee99 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java @@ -75,6 +75,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; @@ -174,7 +175,7 @@ private void initAndRecover() throws IOException { new IoTConsensusServerImpl( path.toString(), new Peer(consensusGroupId, thisNodeId, thisNode), - new ArrayList<>(), + new TreeSet<>(), registry.apply(consensusGroupId), backgroundTaskService, clientManager, @@ -188,7 +189,7 @@ private void initAndRecover() throws IOException { BiConsumer> resetPeerListWithoutThrow = (consensusGroupId, peers) -> { try { - resetPeerList(consensusGroupId, peers); + resetPeerListImpl(consensusGroupId, peers, false); } catch (ConsensusGroupNotExistException ignore) { } catch (Exception e) { @@ -281,7 +282,7 @@ public void createLocalPeer(ConsensusGroupId groupId, List peers) new IoTConsensusServerImpl( path, new Peer(groupId, thisNodeId, thisNode), - peers, + new TreeSet<>(peers), registry.apply(groupId), backgroundTaskService, clientManager, @@ -490,6 +491,12 @@ public void recordCorrectPeerListBeforeStarting( @Override public void resetPeerList(ConsensusGroupId groupId, List correctPeers) throws ConsensusException { + resetPeerListImpl(groupId, correctPeers, true); + } + + private void resetPeerListImpl( + ConsensusGroupId groupId, List correctPeers, boolean startNow) + throws ConsensusException { IoTConsensusServerImpl impl = Optional.ofNullable(stateMachineMap.get(groupId)) .orElseThrow(() -> new ConsensusGroupNotExistException(groupId)); @@ -497,7 +504,7 @@ public void resetPeerList(ConsensusGroupId groupId, List correctPeers) Peer localPeer = new Peer(groupId, thisNodeId, thisNode); if (!correctPeers.contains(localPeer)) { logger.info( - "[RESET PEER LIST] Local peer is not in the correct configuration, delete local peer {}", + "[RESET PEER LIST] {} Local peer is not in the correct configuration, delete it.", groupId); deleteLocalPeer(groupId); return; @@ -510,29 +517,32 @@ public void resetPeerList(ConsensusGroupId groupId, List correctPeers) for (Peer peer : currentMembers) { if (!correctPeers.contains(peer)) { if (!impl.removeSyncLogChannel(peer)) { - logger.error("[RESET PEER LIST] Failed to remove sync channel with: {}", peer); + logger.error( + "[RESET PEER LIST] {} Failed to remove sync channel with: {}", groupId, peer); } else { - logger.info("[RESET PEER LIST] Remove sync channel with: {}", peer); + logger.info("[RESET PEER LIST] {} Remove sync channel with: {}", groupId, peer); } } } // add correct peer for (Peer peer : correctPeers) { if (!impl.getConfiguration().contains(peer)) { - impl.buildSyncLogChannel(peer); - logger.info("[RESET PEER LIST] Build sync channel with: {}", peer); + impl.buildSyncLogChannel(peer, startNow); + logger.info("[RESET PEER LIST] {} Build sync channel with: {}", groupId, peer); } } // show result String newPeerListStr = impl.getConfiguration().toString(); if (!previousPeerListStr.equals(newPeerListStr)) { logger.info( - "[RESET PEER LIST] Local peer list has been reset: {} -> {}", + "[RESET PEER LIST] {} Local peer list has been reset: {} -> {}", + groupId, previousPeerListStr, newPeerListStr); } else { logger.info( - "[RESET PEER LIST] The current peer list is correct, nothing need to be reset: {}", + "[RESET PEER LIST] {} The current peer list is correct, nothing need to be reset: {}", + groupId, previousPeerListStr); } } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java index ee049ee4d9c2..a5df6ab3dbb2 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java @@ -73,11 +73,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.DataOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; -import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Files; @@ -85,12 +83,11 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Objects; import java.util.PriorityQueue; -import java.util.Set; +import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; @@ -100,15 +97,11 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; -import java.util.stream.Collectors; -import java.util.stream.Stream; import static org.apache.iotdb.commons.utils.FileUtils.humanReadableByteCountSI; public class IoTConsensusServerImpl { - private static final String CONFIGURATION_FILE_NAME = "configuration.dat"; - private static final String CONFIGURATION_TMP_FILE_NAME = "configuration.dat.tmp"; public static final String SNAPSHOT_DIR_NAME = "snapshot"; private static final Pattern SNAPSHOT_INDEX_PATTEN = Pattern.compile(".*[^\\d](?=(\\d+))"); private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS = @@ -120,7 +113,7 @@ public class IoTConsensusServerImpl { private final Lock stateMachineLock = new ReentrantLock(); private final Condition stateMachineCondition = stateMachineLock.newCondition(); private final String storageDir; - private final List configuration; + private final TreeSet configuration; private final AtomicLong searchIndex; private final LogDispatcher logDispatcher; private IoTConsensusConfig config; @@ -137,7 +130,7 @@ public class IoTConsensusServerImpl { public IoTConsensusServerImpl( String storageDir, Peer thisNode, - List configuration, + TreeSet configuration, IStateMachine stateMachine, ScheduledExecutorService backgroundTaskService, IClientManager clientManager, @@ -150,11 +143,6 @@ public IoTConsensusServerImpl( this.cacheQueueMap = new ConcurrentHashMap<>(); this.syncClientManager = syncClientManager; this.configuration = configuration; - if (configuration.isEmpty()) { - recoverConfiguration(); - } else { - persistConfiguration(); - } this.backgroundTaskService = backgroundTaskService; this.config = config; this.consensusGroupId = thisNode.getGroupId().toString(); @@ -498,7 +486,7 @@ public void notifyPeersToBuildSyncLogChannel(Peer targetPeer) if (peer.equals(thisNode)) { // use searchIndex for thisNode as the initialSyncIndex because targetPeer will load the // snapshot produced by thisNode - buildSyncLogChannel(targetPeer); + buildSyncLogChannel(targetPeer, true); } else { // use RPC to tell other peers to build sync log channel to target peer try (SyncIoTConsensusServiceClient client = @@ -646,23 +634,22 @@ private boolean isSuccess(TSStatus status) { } /** build SyncLog channel with safeIndex as the default initial sync index. */ - public void buildSyncLogChannel(Peer targetPeer) { - buildSyncLogChannel(targetPeer, getMinSyncIndex()); + public void buildSyncLogChannel(Peer targetPeer, boolean startNow) { + buildSyncLogChannel(targetPeer, getMinSyncIndex(), startNow); } - public void buildSyncLogChannel(Peer targetPeer, long initialSyncIndex) { + public void buildSyncLogChannel(Peer targetPeer, long initialSyncIndex, boolean startNow) { KillPoint.setKillPoint(DataNodeKillPoints.ORIGINAL_ADD_PEER_DONE); - // step 1, build sync channel in LogDispatcher + configuration.add(targetPeer); + if (Objects.equals(targetPeer, thisNode)) { + return; + } + logDispatcher.addLogDispatcherThread(targetPeer, initialSyncIndex, startNow); logger.info( - "[IoTConsensus] build sync log channel to {} with initialSyncIndex {}", + "[IoTConsensus] Successfully build sync log channel to {} with initialSyncIndex {}. {}", targetPeer, - initialSyncIndex); - logDispatcher.addLogDispatcherThread(targetPeer, initialSyncIndex); - // step 2, update configuration - configuration.add(targetPeer); - // step 3, persist configuration - persistConfiguration(); - logger.info("[IoTConsensus Configuration] persist new configuration: {}", configuration); + initialSyncIndex, + startNow ? "Sync log channel has started." : "Sync log channel maybe start later."); } /** @@ -689,8 +676,6 @@ public boolean removeSyncLogChannel(Peer targetPeer) { // step 2, update configuration configuration.remove(targetPeer); checkAndUpdateSafeDeletedSearchIndex(); - // step 3, persist configuration - persistConfiguration(); logger.info( "[IoTConsensus Configuration] Configuration updated to {}. {}", this.configuration, @@ -698,103 +683,10 @@ public boolean removeSyncLogChannel(Peer targetPeer) { return !exceptionHappened; } - public void persistConfiguration() { - try { - removeDuplicateConfiguration(); - renameTmpConfigurationFileToRemoveSuffix(); - serializeConfigurationAndFsyncToDisk(); - deleteConfiguration(); - renameTmpConfigurationFileToRemoveSuffix(); - } catch (IOException e) { - // TODO: (xingtanzjr) need to handle the IOException because the IoTConsensus won't - // work expectedly - // if the exception occurs - logger.error("Unexpected error occurs when persisting configuration", e); - } - } - - public void recoverConfiguration() { - try { - Path tmpConfigurationPath = - Paths.get(new File(storageDir, CONFIGURATION_TMP_FILE_NAME).getAbsolutePath()); - Path configurationPath = - Paths.get(new File(storageDir, CONFIGURATION_FILE_NAME).getAbsolutePath()); - // If the tmpConfigurationPath exists, it means the `persistConfigurationUpdate` is - // interrupted - // unexpectedly, we need substitute configuration with tmpConfiguration file - if (Files.exists(tmpConfigurationPath)) { - Files.deleteIfExists(configurationPath); - Files.move(tmpConfigurationPath, configurationPath); - logger.info( - "[IoTConsensus Configuration] recover configuration from tmpConfigurationFile, {}", - tmpConfigurationPath); - } - if (Files.exists(configurationPath)) { - recoverFromOldConfigurationFile(configurationPath); - logger.info( - "[IoTConsensus Configuration] recover configuration from oldConfigurationFile, {}", - configurationPath); - } else { - // recover from split configuration file - logger.info( - "[IoTConsensus Configuration] recover configuration from old split configuration file"); - Path dirPath = Paths.get(storageDir); - List tmpPeerList = getConfiguration(dirPath, CONFIGURATION_TMP_FILE_NAME); - configuration.addAll(tmpPeerList); - logger.info( - "[IoTConsensus Configuration] recover configuration from tmpPeerList, {}", - configuration); - List peerList = getConfiguration(dirPath, CONFIGURATION_FILE_NAME); - for (Peer peer : peerList) { - if (!configuration.contains(peer)) { - configuration.add(peer); - } - } - logger.info( - "[IoTConsensus Configuration] recover configuration from peerList, {}", configuration); - persistConfiguration(); - } - logger.info("Recover IoTConsensus server Impl, configuration: {}", configuration); - } catch (IOException e) { - logger.error("Unexpected error occurs when recovering configuration", e); - } - } - - // @Compatibility - private void recoverFromOldConfigurationFile(Path oldConfigurationPath) throws IOException { - // recover from old configuration file - ByteBuffer buffer = ByteBuffer.wrap(Files.readAllBytes(oldConfigurationPath)); - int size = buffer.getInt(); - for (int i = 0; i < size; i++) { - configuration.add(Peer.deserialize(buffer)); - } - persistConfiguration(); - } - public static String generateConfigurationDatFileName(int nodeId, String suffix) { return nodeId + "_" + suffix; } - private List getConfiguration(Path dirPath, String configurationFileName) - throws IOException { - ByteBuffer buffer; - List tmpConfiguration = new ArrayList<>(); - Path[] files = - Files.walk(dirPath) - .filter(Files::isRegularFile) - .filter(filePath -> filePath.getFileName().toString().contains(configurationFileName)) - .toArray(Path[]::new); - logger.info( - "[IoTConsensus Configuration] getConfiguration: fileName, {}, fileList: {}", - configurationFileName, - files); - for (Path file : files) { - buffer = ByteBuffer.wrap(Files.readAllBytes(file)); - tmpConfiguration.add(Peer.deserialize(buffer)); - } - return tmpConfiguration; - } - public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest( IConsensusRequest request) { if (request instanceof ComparableConsensusRequest) { @@ -832,7 +724,7 @@ public Peer getThisNode() { } public List getConfiguration() { - return configuration; + return new ArrayList<>(configuration); } public long getSearchIndex() { @@ -984,90 +876,6 @@ public String getConsensusGroupId() { return consensusGroupId; } - private void serializeConfigurationAndFsyncToDisk() throws IOException { - for (Peer peer : configuration) { - String peerConfigurationFileName = - generateConfigurationDatFileName(peer.getNodeId(), CONFIGURATION_TMP_FILE_NAME); - FileOutputStream fileOutputStream = - new FileOutputStream(new File(storageDir, peerConfigurationFileName)); - try (DataOutputStream outputStream = new DataOutputStream(fileOutputStream)) { - peer.serialize(outputStream); - } finally { - try { - fileOutputStream.flush(); - fileOutputStream.getFD().sync(); - } catch (IOException ignore) { - // ignore sync exception - } - } - logger.info("[IoTConsensus Configuration] serializeConfiguration: {}", peer); - } - } - - private void renameTmpConfigurationFileToRemoveSuffix() throws IOException { - try (Stream stream = Files.list(Paths.get(storageDir))) { - List paths = - stream - .filter(Files::isRegularFile) - .filter( - filePath -> - filePath.getFileName().toString().endsWith(CONFIGURATION_TMP_FILE_NAME)) - .collect(Collectors.toList()); - for (Path filePath : paths) { - String targetPath = - filePath.toString().replace(CONFIGURATION_TMP_FILE_NAME, CONFIGURATION_FILE_NAME); - File targetFile = new File(targetPath); - if (targetFile.exists()) { - try { - Files.delete(targetFile.toPath()); - } catch (IOException e) { - logger.error("Unexpected error occurs when delete file: {}", targetPath, e); - } - } - if (!filePath.toFile().renameTo(targetFile)) { - logger.error("Unexpected error occurs when rename file: {} -> {}", filePath, targetPath); - } - logger.info("[IoTConsensus Configuration] renameTmpConfigurationFile: {}", targetPath); - } - } catch (UncheckedIOException e) { - throw e.getCause(); - } - } - - private void deleteConfiguration() throws IOException { - try (Stream stream = Files.list(Paths.get(storageDir))) { - stream - .filter(Files::isRegularFile) - .filter(filePath -> filePath.getFileName().toString().endsWith(CONFIGURATION_FILE_NAME)) - .forEach( - filePath -> { - try { - Files.delete(filePath); - } catch (IOException e) { - logger.error( - "Unexpected error occurs when deleting old configuration file {}", - filePath, - e); - } - logger.info("[IoTConsensus Configuration] deleteConfiguration: {}", filePath); - }); - } catch (UncheckedIOException e) { - throw e.getCause(); - } - } - - public void removeDuplicateConfiguration() { - Set seen = new HashSet<>(); - Iterator it = configuration.iterator(); - - while (it.hasNext()) { - Peer peer = it.next(); - if (!seen.add(peer)) { - it.remove(); - } - } - } - /** This method is used for hot reload of IoTConsensusConfig. */ public void reloadConsensusConfig(IoTConsensusConfig config) { this.config = config; diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java index 6b33fcc5dfc3..6f67bc70c860 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java @@ -98,7 +98,7 @@ private void initLogSyncThreadPool() { public synchronized void start() { if (!threads.isEmpty()) { - threads.forEach(executorService::submit); + threads.forEach(logDispatcherThread -> executorService.submit(logDispatcherThread)); } } @@ -120,7 +120,8 @@ public synchronized void stop() { stopped = true; } - public synchronized void addLogDispatcherThread(Peer peer, long initialSyncIndex) { + public synchronized void addLogDispatcherThread( + Peer peer, long initialSyncIndex, boolean startNow) { if (stopped) { return; } @@ -131,7 +132,9 @@ public synchronized void addLogDispatcherThread(Peer peer, long initialSyncIndex if (this.executorService == null) { initLogSyncThreadPool(); } - executorService.submit(thread); + if (startNow) { + executorService.submit(thread); + } } public synchronized void removeLogDispatcherThread(Peer peer) throws IOException { diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java index f6a67c3e8c60..2bac66738fdd 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java @@ -189,7 +189,7 @@ public TBuildSyncLogChannelRes buildSyncLogChannel(TBuildSyncLogChannelReq req) return new TBuildSyncLogChannelRes(status); } TSStatus responseStatus; - impl.buildSyncLogChannel(new Peer(groupId, req.nodeId, req.endPoint)); + impl.buildSyncLogChannel(new Peer(groupId, req.nodeId, req.endPoint), true); responseStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); return new TBuildSyncLogChannelRes(responseStatus); } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java index 291f3871fece..8cae765ffff2 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java @@ -162,47 +162,41 @@ private Future initAndRecover() throws IOException { return CompletableFuture.completedFuture(null); } else { // asynchronously recover, retry logic is implemented at PipeConsensusImpl - CompletableFuture future = - CompletableFuture.runAsync( - () -> { - try (DirectoryStream stream = - Files.newDirectoryStream(storageDir.toPath())) { - for (Path path : stream) { - ConsensusGroupId consensusGroupId = - parsePeerFileName(path.getFileName().toString()); - try { - PipeConsensusServerImpl consensus = - new PipeConsensusServerImpl( - new Peer(consensusGroupId, thisNodeId, thisNode), - registry.apply(consensusGroupId), - path.toString(), - new ArrayList<>(), - config, - consensusPipeManager, - syncClientManager); - stateMachineMap.put(consensusGroupId, consensus); - checkPeerListAndStartIfEligible(consensusGroupId, consensus); - } catch (Exception e) { - LOGGER.error( - "Failed to recover consensus from {} for {}, ignore it and continue recover other group, async backend checker thread will automatically deregister related pipe side effects for this failed consensus group.", - storageDir, - consensusGroupId, - e); - } - } - } catch (IOException e) { + return CompletableFuture.runAsync( + () -> { + try (DirectoryStream stream = Files.newDirectoryStream(storageDir.toPath())) { + for (Path path : stream) { + ConsensusGroupId consensusGroupId = + parsePeerFileName(path.getFileName().toString()); + try { + PipeConsensusServerImpl consensus = + new PipeConsensusServerImpl( + new Peer(consensusGroupId, thisNodeId, thisNode), + registry.apply(consensusGroupId), + new ArrayList<>(), + config, + consensusPipeManager, + syncClientManager); + stateMachineMap.put(consensusGroupId, consensus); + checkPeerListAndStartIfEligible(consensusGroupId, consensus); + } catch (Exception e) { LOGGER.error( - "Failed to recover consensus from {} because read dir failed", + "Failed to recover consensus from {} for {}, ignore it and continue recover other group, async backend checker thread will automatically deregister related pipe side effects for this failed consensus group.", storageDir, + consensusGroupId, e); } - }) - .exceptionally( - e -> { - LOGGER.error("Failed to recover consensus from {}", storageDir, e); - return null; - }); - return future; + } + } catch (IOException e) { + LOGGER.error( + "Failed to recover consensus from {} because read dir failed", storageDir, e); + } + }) + .exceptionally( + e -> { + LOGGER.error("Failed to recover consensus from {}", storageDir, e); + return null; + }); } } @@ -347,7 +341,6 @@ public void createLocalPeer(ConsensusGroupId groupId, List peers) new PipeConsensusServerImpl( new Peer(groupId, thisNodeId, thisNode), registry.apply(groupId), - path, peers, config, consensusPipeManager, @@ -382,9 +375,6 @@ public void deleteLocalPeer(ConsensusGroupId groupId) throws ConsensusException FileUtils.deleteFileOrDirectory(new File(getPeerDir(groupId))); KillPoint.setKillPoint(IoTConsensusDeleteLocalPeerKillPoints.AFTER_DELETE); - } catch (IOException e) { - LOGGER.warn("Cannot delete local peer for group {}", groupId, e); - throw new ConsensusException(e); } finally { stateMachineMapLock.readLock().unlock(); lock.unlock(); @@ -487,7 +477,7 @@ public void resetPeerList(ConsensusGroupId groupId, List correctPeers) if (!correctPeers.contains(new Peer(groupId, thisNodeId, thisNode))) { LOGGER.warn( - "[RESET PEER LIST] Local peer is not in the correct configuration, delete local peer {}", + "[RESET PEER LIST] {} Local peer is not in the correct configuration, delete it.", groupId); deleteLocalPeer(groupId); return; @@ -500,9 +490,10 @@ public void resetPeerList(ConsensusGroupId groupId, List correctPeers) if (!correctPeers.contains(peer)) { try { impl.dropConsensusPipeToTargetPeer(peer); - LOGGER.info("[RESET PEER LIST] Remove sync channel with: {}", peer); + LOGGER.info("[RESET PEER LIST] {} Remove sync channel with: {}", groupId, peer); } catch (ConsensusGroupModifyPeerException e) { - LOGGER.error("[RESET PEER LIST] Failed to remove sync channel with: {}", peer, e); + LOGGER.error( + "[RESET PEER LIST] {} Failed to remove sync channel with: {}", groupId, peer, e); } } } @@ -511,9 +502,10 @@ public void resetPeerList(ConsensusGroupId groupId, List correctPeers) if (!impl.containsPeer(peer) && peer.getNodeId() != this.thisNodeId) { try { impl.createConsensusPipeToTargetPeer(peer, false); - LOGGER.info("[RESET PEER LIST] Build sync channel with: {}", peer); + LOGGER.info("[RESET PEER LIST] {} Build sync channel with: {}", groupId, peer); } catch (ConsensusGroupModifyPeerException e) { - LOGGER.warn("[RESET PEER LIST] Failed to build sync channel with: {}", peer, e); + LOGGER.warn( + "[RESET PEER LIST] {} Failed to build sync channel with: {}", groupId, peer, e); } } } @@ -521,12 +513,14 @@ public void resetPeerList(ConsensusGroupId groupId, List correctPeers) String currentPeerListStr = impl.getPeers().toString(); if (!previousPeerListStr.equals(currentPeerListStr)) { LOGGER.info( - "[RESET PEER LIST] Local peer list has been reset: {} -> {}", + "[RESET PEER LIST] {} Local peer list has been reset: {} -> {}", + groupId, previousPeerListStr, impl.getPeers()); } else { LOGGER.info( - "[RESET PEER LIST] The current peer list is correct, nothing need to be reset: {}", + "[RESET PEER LIST] {} The current peer list is correct, nothing need to be reset: {}", + groupId, previousPeerListStr); } } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusPeerManager.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusPeerManager.java index 6376097d15f7..5ff44029ee6e 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusPeerManager.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusPeerManager.java @@ -19,37 +19,24 @@ package org.apache.iotdb.consensus.pipe; -import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.consensus.common.Peer; import com.google.common.collect.ImmutableList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; import java.util.Collections; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Stream; public class PipeConsensusPeerManager { - private static final String CONFIGURATION_FILE_NAME = "configuration.dat"; private static final Logger LOGGER = LoggerFactory.getLogger(PipeConsensusPeerManager.class); - private final String storageDir; private final Set peers; - public PipeConsensusPeerManager(String storageDir, List peers) { - this.storageDir = storageDir; + public PipeConsensusPeerManager(List peers) { this.peers = Collections.newSetFromMap(new ConcurrentHashMap<>()); this.peers.addAll(peers); @@ -58,63 +45,15 @@ public PipeConsensusPeerManager(String storageDir, List peers) { } } - public void recover() throws IOException { - try (Stream pathStream = Files.walk(Paths.get(storageDir), 1)) { - Path[] configurationPaths = - pathStream - .filter(Files::isRegularFile) - .filter(path -> path.getFileName().toString().endsWith(CONFIGURATION_FILE_NAME)) - .toArray(Path[]::new); - ByteBuffer readBuffer; - for (Path path : configurationPaths) { - readBuffer = ByteBuffer.wrap(Files.readAllBytes(path)); - peers.add(Peer.deserialize(readBuffer)); - } - } - } - - private void persist(Peer peer) throws IOException { - File configurationFile = new File(storageDir, generateConfigurationFileName(peer)); - if (configurationFile.exists()) { - LOGGER.warn("Configuration file {} already exists, delete it.", configurationFile); - FileUtils.deleteFileOrDirectory(configurationFile); - } - - try (FileOutputStream fileOutputStream = new FileOutputStream(configurationFile)) { - try (DataOutputStream dataOutputStream = new DataOutputStream(fileOutputStream)) { - peer.serialize(dataOutputStream); - } finally { - try { - fileOutputStream.flush(); - fileOutputStream.getFD().sync(); - } catch (IOException ignore) { - // ignore sync exception - } - } - } - } - - private String generateConfigurationFileName(Peer peer) { - return peer.getNodeId() + "_" + CONFIGURATION_FILE_NAME; - } - - public void persistAll() throws IOException { - for (Peer peer : peers) { - persist(peer); - } - } - public boolean contains(Peer peer) { return peers.contains(peer); } - public void addAndPersist(Peer peer) throws IOException { + public void addPeer(Peer peer) { peers.add(peer); - persist(peer); } - public void removeAndPersist(Peer peer) throws IOException { - Files.deleteIfExists(Paths.get(storageDir, generateConfigurationFileName(peer))); + public void removePeer(Peer peer) { peers.remove(peer); } @@ -128,27 +67,7 @@ public List getPeers() { return ImmutableList.copyOf(peers); } - public void deleteAllFiles() throws IOException { - IOException exception = null; - for (Peer peer : peers) { - try { - Files.deleteIfExists(Paths.get(storageDir, generateConfigurationFileName(peer))); - } catch (IOException e) { - LOGGER.error("Failed to delete configuration file for peer {}", peer, e); - if (exception == null) { - exception = e; - } else { - exception.addSuppressed(e); - } - } - } - if (exception != null) { - throw exception; - } - } - - public void clear() throws IOException { - deleteAllFiles(); + public void clear() { peers.clear(); } } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java index b085abbd64b2..25a86ca51782 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java @@ -99,7 +99,6 @@ public class PipeConsensusServerImpl { public PipeConsensusServerImpl( Peer thisNode, IStateMachine stateMachine, - String storageDir, List peers, PipeConsensusConfig config, ConsensusPipeManager consensusPipeManager, @@ -107,7 +106,7 @@ public PipeConsensusServerImpl( throws IOException { this.thisNode = thisNode; this.stateMachine = stateMachine; - this.peerManager = new PipeConsensusPeerManager(storageDir, peers); + this.peerManager = new PipeConsensusPeerManager(peers); this.active = new AtomicBoolean(true); this.isStarted = new AtomicBoolean(false); this.consensusGroupId = thisNode.getGroupId().toString(); @@ -117,9 +116,8 @@ public PipeConsensusServerImpl( this.pipeConsensusServerMetrics = new PipeConsensusServerMetrics(this); this.replicateMode = config.getReplicateMode(); - if (peers.isEmpty()) { - peerManager.recover(); - } else { + // if peers is empty, the `resetPeerList` will automatically fetch correct peers' info from CN. + if (!peers.isEmpty()) { // create consensus pipes Set deepCopyPeersWithoutSelf = peers.stream().filter(peer -> !peer.equals(thisNode)).collect(Collectors.toSet()); @@ -129,17 +127,6 @@ public PipeConsensusServerImpl( updateConsensusPipesStatus(successfulPipes, PipeStatus.DROPPED); throw new IOException(String.format("%s cannot create all consensus pipes", thisNode)); } - - // persist peers' info - try { - peerManager.persistAll(); - } catch (Exception e) { - // roll back - LOGGER.warn("{} cannot persist all peers", thisNode, e); - peerManager.deleteAllFiles(); - updateConsensusPipesStatus(successfulPipes, PipeStatus.DROPPED); - throw e; - } } } @@ -193,7 +180,7 @@ public synchronized void stop() { isStarted.set(false); } - public synchronized void clear() throws IOException { + public synchronized void clear() { final List otherPeers = peerManager.getOtherPeers(thisNode); final List failedPipes = updateConsensusPipesStatus(new ArrayList<>(otherPeers), PipeStatus.DROPPED); @@ -448,11 +435,7 @@ public synchronized void createConsensusPipeToTargetPeer( try { KillPoint.setKillPoint(DataNodeKillPoints.ORIGINAL_ADD_PEER_DONE); consensusPipeManager.createConsensusPipe(thisNode, targetPeer, needManuallyStart); - peerManager.addAndPersist(targetPeer); - } catch (IOException e) { - LOGGER.warn("{} cannot persist peer {}", thisNode, targetPeer, e); - throw new ConsensusGroupModifyPeerException( - String.format("%s cannot persist peer %s", thisNode, targetPeer), e); + peerManager.addPeer(targetPeer); } catch (Exception e) { LOGGER.warn("{} cannot create consensus pipe to {}", thisNode, targetPeer, e); throw new ConsensusGroupModifyPeerException( @@ -504,11 +487,7 @@ public synchronized void dropConsensusPipeToTargetPeer(Peer targetPeer) throws ConsensusGroupModifyPeerException { try { consensusPipeManager.dropConsensusPipe(thisNode, targetPeer); - peerManager.removeAndPersist(targetPeer); - } catch (IOException e) { - LOGGER.warn("{} cannot persist peer {}", thisNode, targetPeer, e); - throw new ConsensusGroupModifyPeerException( - String.format("%s cannot persist peer %s", thisNode, targetPeer), e); + peerManager.removePeer(targetPeer); } catch (Exception e) { LOGGER.warn("{} cannot drop consensus pipe to {}", thisNode, targetPeer, e); throw new ConsensusGroupModifyPeerException( diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java index 3ec7769f2ac5..f22d3fdadcd9 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java @@ -46,8 +46,8 @@ import java.net.ServerSocket; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class ReplicateTest { @@ -57,12 +57,6 @@ public class ReplicateTest { private final ConsensusGroupId gid = new DataRegionId(1); - private static final long timeout = TimeUnit.SECONDS.toMillis(300); - - private static final String CONFIGURATION_FILE_NAME = "configuration.dat"; - - private static final String CONFIGURATION_TMP_FILE_NAME = "configuration.dat.tmp"; - private int basePort = 9000; private final List peers = @@ -73,9 +67,9 @@ public class ReplicateTest { private final List peersStorage = Arrays.asList( - new File("target" + java.io.File.separator + "1"), - new File("target" + java.io.File.separator + "2"), - new File("target" + java.io.File.separator + "3")); + new File("target" + File.separator + "1"), + new File("target" + File.separator + "2"), + new File("target" + File.separator + "3")); private final ConsensusGroup group = new ConsensusGroup(gid, peers); private final List servers = new ArrayList<>(); @@ -120,6 +114,9 @@ private void initServer() throws IOException { String.format( ConsensusFactory.CONSTRUCT_FAILED_MSG, ConsensusFactory.IOT_CONSENSUS)))); + servers.get(i).recordCorrectPeerListBeforeStarting(Collections.singletonMap(gid, peers)); + } + for (int i = 0; i < peers.size(); i++) { servers.get(i).start(); } } catch (IOException e) { @@ -187,23 +184,9 @@ public void replicateUsingQueueTest() stopServer(); initServer(); - Assert.assertEquals( - peers.stream().map(Peer::getNodeId).collect(Collectors.toSet()), - servers.get(0).getImpl(gid).getConfiguration().stream() - .map(Peer::getNodeId) - .collect(Collectors.toSet())); - - Assert.assertEquals( - peers.stream().map(Peer::getNodeId).collect(Collectors.toSet()), - servers.get(1).getImpl(gid).getConfiguration().stream() - .map(Peer::getNodeId) - .collect(Collectors.toSet())); - - Assert.assertEquals( - peers.stream().map(Peer::getNodeId).collect(Collectors.toSet()), - servers.get(2).getImpl(gid).getConfiguration().stream() - .map(Peer::getNodeId) - .collect(Collectors.toSet())); + checkPeerList(servers.get(0).getImpl(gid)); + checkPeerList(servers.get(1).getImpl(gid)); + checkPeerList(servers.get(2).getImpl(gid)); Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getSearchIndex()); Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getSearchIndex()); @@ -264,23 +247,9 @@ public void replicateUsingWALTest() throws IOException, InterruptedException, Co initServer(); servers.get(2).createLocalPeer(group.getGroupId(), group.getPeers()); - Assert.assertEquals( - peers.stream().map(Peer::getNodeId).collect(Collectors.toSet()), - servers.get(0).getImpl(gid).getConfiguration().stream() - .map(Peer::getNodeId) - .collect(Collectors.toSet())); - - Assert.assertEquals( - peers.stream().map(Peer::getNodeId).collect(Collectors.toSet()), - servers.get(1).getImpl(gid).getConfiguration().stream() - .map(Peer::getNodeId) - .collect(Collectors.toSet())); - - Assert.assertEquals( - peers.stream().map(Peer::getNodeId).collect(Collectors.toSet()), - servers.get(2).getImpl(gid).getConfiguration().stream() - .map(Peer::getNodeId) - .collect(Collectors.toSet())); + checkPeerList(servers.get(0).getImpl(gid)); + checkPeerList(servers.get(1).getImpl(gid)); + checkPeerList(servers.get(2).getImpl(gid)); Assert.assertEquals(CHECK_POINT_GAP, servers.get(0).getImpl(gid).getSearchIndex()); Assert.assertEquals(CHECK_POINT_GAP, servers.get(1).getImpl(gid).getSearchIndex()); @@ -345,4 +314,10 @@ private boolean checkPortAvailable() { } return true; } + + private void checkPeerList(IoTConsensusServerImpl iotServerImpl) { + Assert.assertEquals( + peers.stream().map(Peer::getNodeId).collect(Collectors.toSet()), + iotServerImpl.getConfiguration().stream().map(Peer::getNodeId).collect(Collectors.toSet())); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java index 6a771007b3ac..f3fe432ae7ef 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java @@ -25,7 +25,7 @@ import java.util.Objects; /** We abstract this class to hide word `ConsensusGroup` for IoTDB StorageEngine/SchemaEngine. */ -public abstract class ConsensusGroupId { +public abstract class ConsensusGroupId implements Comparable { protected int id; @@ -126,4 +126,9 @@ public static ConsensusGroupId createFromTConsensusGroupId( return create(tConsensusGroupId.getType().getValue(), tConsensusGroupId.getId()); } } + + @Override + public int compareTo(ConsensusGroupId o) { + return Integer.compare(id, o.id); + } }