Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

【remember to add Hu and Peng to co-arthor and delete this bracket before merge】IoTConsensus and IoTConsensusV2 no longer stores the peer list locally on the DataNode #14814

Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,15 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.Objects;

public class Peer {
public class Peer implements Comparable<Peer> {

private final Logger logger = LoggerFactory.getLogger(Peer.class);
private final ConsensusGroupId groupId;
private final TEndPoint endpoint;
private final int nodeId;
private final TEndPoint endpoint;

public Peer(ConsensusGroupId groupId, int nodeId, TEndPoint endpoint) {
this.groupId = groupId;
Expand Down Expand Up @@ -105,6 +106,14 @@ public String toString() {
return "Peer{" + "groupId=" + groupId + ", endpoint=" + endpoint + ", nodeId=" + nodeId + '}';
}

@Override
public int compareTo(Peer peer) {
return Comparator.comparing(Peer::getGroupId)
.thenComparingInt(Peer::getNodeId)
.thenComparing(Peer::getEndpoint)
.compare(this, peer);
}

public static Peer valueOf(
TConsensusGroupId consensusGroupId, TDataNodeLocation dataNodeLocation) {
if (consensusGroupId.getType() == TConsensusGroupType.SchemaRegion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -174,7 +175,7 @@ private void initAndRecover() throws IOException {
new IoTConsensusServerImpl(
path.toString(),
new Peer(consensusGroupId, thisNodeId, thisNode),
new ArrayList<>(),
new TreeSet<>(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why use treeset?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, duplicates were removed during configuration persistence. Now that the persistence code is gone, a set is used to keep this logic.

TreeSet is used to return a List with a consistent order (for an interface), which I believe is better than the unordered HashSet.

registry.apply(consensusGroupId),
backgroundTaskService,
clientManager,
Expand Down Expand Up @@ -281,7 +282,7 @@ public void createLocalPeer(ConsensusGroupId groupId, List<Peer> peers)
new IoTConsensusServerImpl(
path,
new Peer(groupId, thisNodeId, thisNode),
peers,
new TreeSet<>(peers),
registry.apply(groupId),
backgroundTaskService,
clientManager,
Expand Down Expand Up @@ -497,7 +498,7 @@ public void resetPeerList(ConsensusGroupId groupId, List<Peer> correctPeers)
Peer localPeer = new Peer(groupId, thisNodeId, thisNode);
if (!correctPeers.contains(localPeer)) {
logger.info(
"[RESET PEER LIST] Local peer is not in the correct configuration, delete local peer {}",
"[RESET PEER LIST] {} Local peer is not in the correct configuration, delete it.",
groupId);
deleteLocalPeer(groupId);
return;
Expand All @@ -510,29 +511,32 @@ public void resetPeerList(ConsensusGroupId groupId, List<Peer> correctPeers)
for (Peer peer : currentMembers) {
if (!correctPeers.contains(peer)) {
if (!impl.removeSyncLogChannel(peer)) {
logger.error("[RESET PEER LIST] Failed to remove sync channel with: {}", peer);
logger.error(
"[RESET PEER LIST] {} Failed to remove sync channel with: {}", groupId, peer);
} else {
logger.info("[RESET PEER LIST] Remove sync channel with: {}", peer);
logger.info("[RESET PEER LIST] {} Remove sync channel with: {}", groupId, peer);
}
}
}
// add correct peer
for (Peer peer : correctPeers) {
if (!impl.getConfiguration().contains(peer)) {
impl.buildSyncLogChannel(peer);
logger.info("[RESET PEER LIST] Build sync channel with: {}", peer);
logger.info("[RESET PEER LIST] {} Build sync channel with: {}", groupId, peer);
}
}
// show result
String newPeerListStr = impl.getConfiguration().toString();
if (!previousPeerListStr.equals(newPeerListStr)) {
logger.info(
"[RESET PEER LIST] Local peer list has been reset: {} -> {}",
"[RESET PEER LIST] {} Local peer list has been reset: {} -> {}",
groupId,
previousPeerListStr,
newPeerListStr);
} else {
logger.info(
"[RESET PEER LIST] The current peer list is correct, nothing need to be reset: {}",
"[RESET PEER LIST] {} The current peer list is correct, nothing need to be reset: {}",
groupId,
previousPeerListStr);
}
}
Expand Down
Loading
Loading