Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename and deprecate public methods that contains 'master' in the name in 'server' directory #3647

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Deprecate and rename all master methods in DiscoveryNode class
Signed-off-by: Tianli Feng <ftianli@amazon.com>
  • Loading branch information
Tianli Feng committed Jun 22, 2022

Verified

This commit was signed with the committer’s verified signature.
minrk Min RK
commit 6b207805eba19baf2581f8568fff24865bece436
Original file line number Diff line number Diff line change
@@ -123,7 +123,7 @@ public void testReindexFromRemote() throws Exception {
*/
NodeInfo clusterManagerNode = null;
for (NodeInfo candidate : client.admin().cluster().prepareNodesInfo().get().getNodes()) {
if (candidate.getNode().isMasterNode()) {
if (candidate.getNode().isClusterManagerNode()) {
clusterManagerNode = candidate;
}
}
Original file line number Diff line number Diff line change
@@ -132,7 +132,7 @@ Set<VotingConfigExclusion> resolveVotingConfigExclusions(ClusterState currentSta
if (nodeDescriptions.length >= 1) {
newVotingConfigExclusions = Arrays.stream(allNodes.resolveNodes(nodeDescriptions))
.map(allNodes::get)
.filter(DiscoveryNode::isMasterNode)
.filter(DiscoveryNode::isClusterManagerNode)
.map(VotingConfigExclusion::new)
.collect(Collectors.toSet());

@@ -147,7 +147,7 @@ Set<VotingConfigExclusion> resolveVotingConfigExclusions(ClusterState currentSta
for (String nodeId : nodeIds) {
if (allNodes.nodeExists(nodeId)) {
DiscoveryNode discoveryNode = allNodes.get(nodeId);
if (discoveryNode.isMasterNode()) {
if (discoveryNode.isClusterManagerNode()) {
newVotingConfigExclusions.add(new VotingConfigExclusion(discoveryNode));
}
} else {
@@ -162,7 +162,7 @@ Set<VotingConfigExclusion> resolveVotingConfigExclusions(ClusterState currentSta
for (String nodeName : nodeNames) {
if (existingNodes.containsKey(nodeName)) {
DiscoveryNode discoveryNode = existingNodes.get(nodeName);
if (discoveryNode.isMasterNode()) {
if (discoveryNode.isClusterManagerNode()) {
newVotingConfigExclusions.add(new VotingConfigExclusion(discoveryNode));
}
} else {
Original file line number Diff line number Diff line change
@@ -126,14 +126,14 @@ public TransportCleanupRepositoryAction(
// We add a state applier that will remove any dangling repository cleanup actions on cluster-manager failover.
// This is safe to do since cleanups will increment the repository state id before executing any operations to prevent concurrent
// operations from corrupting the repository. This is the same safety mechanism used by snapshot deletes.
if (DiscoveryNode.isMasterNode(clusterService.getSettings())) {
if (DiscoveryNode.isClusterManagerNode(clusterService.getSettings())) {
addClusterStateApplier(clusterService);
}
}

private static void addClusterStateApplier(ClusterService clusterService) {
clusterService.addStateApplier(event -> {
if (event.localNodeMaster() && event.previousState().nodes().isLocalNodeElectedClusterManager() == false) {
if (event.localNodeClusterManager() && event.previousState().nodes().isLocalNodeElectedClusterManager() == false) {
final RepositoryCleanupInProgress repositoryCleanupInProgress = event.state()
.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY);
if (repositoryCleanupInProgress.hasCleanupInProgress() == false) {
Original file line number Diff line number Diff line change
@@ -169,7 +169,7 @@ protected void dispatchedShardOperationOnPrimary(
performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, (update, shardId, mappingListener) -> {
assert update != null;
assert shardId != null;
mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), update, mappingListener);
mappingUpdatedAction.updateMappingOnClusterManager(shardId.getIndex(), update, mappingListener);
}, mappingUpdateListener -> observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
Original file line number Diff line number Diff line change
@@ -213,10 +213,20 @@ public boolean blocksChanged() {
/**
* Returns <code>true</code> iff the local node is the mater node of the cluster.
*/
public boolean localNodeMaster() {
public boolean localNodeClusterManager() {
return state.nodes().isLocalNodeElectedClusterManager();
}

/**
* Returns <code>true</code> iff the local node is the mater node of the cluster.
*
* @deprecated As of 2.1, because supporting inclusive language, replaced by {@link #localNodeClusterManager()}
*/
@Deprecated
public boolean localNodeMaster() {
return localNodeClusterManager();
}

/**
* Returns the {@link org.opensearch.cluster.node.DiscoveryNodes.Delta} between
* the previous cluster state and the new cluster state.
Original file line number Diff line number Diff line change
@@ -151,14 +151,14 @@ void setUpdateFrequency(TimeValue updateFrequency) {

@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.localNodeMaster() && refreshAndRescheduleRunnable.get() == null) {
if (event.localNodeClusterManager() && refreshAndRescheduleRunnable.get() == null) {
logger.trace("elected as cluster-manager, scheduling cluster info update tasks");
executeRefresh(event.state(), "became cluster-manager");

final RefreshAndRescheduleRunnable newRunnable = new RefreshAndRescheduleRunnable();
refreshAndRescheduleRunnable.set(newRunnable);
threadPool.scheduleUnlessShuttingDown(updateFrequency, REFRESH_EXECUTOR, newRunnable);
} else if (event.localNodeMaster() == false) {
} else if (event.localNodeClusterManager() == false) {
refreshAndRescheduleRunnable.set(null);
return;
}
Original file line number Diff line number Diff line change
@@ -52,7 +52,7 @@ public interface LocalNodeMasterListener extends ClusterStateListener {
@Override
default void clusterChanged(ClusterChangedEvent event) {
final boolean wasClusterManager = event.previousState().nodes().isLocalNodeElectedClusterManager();
final boolean isClusterManager = event.localNodeMaster();
final boolean isClusterManager = event.localNodeClusterManager();
if (wasClusterManager == false && isClusterManager) {
onClusterManager();
} else if (wasClusterManager && isClusterManager == false) {
Original file line number Diff line number Diff line change
@@ -111,7 +111,7 @@ public void setClient(Client client) {
* {@code timeout} is the cluster-manager node timeout ({@link ClusterManagerNodeRequest#clusterManagerNodeTimeout()}),
* potentially waiting for a cluster-manager node to be available.
*/
public void updateMappingOnMaster(Index index, Mapping mappingUpdate, ActionListener<Void> listener) {
public void updateMappingOnClusterManager(Index index, Mapping mappingUpdate, ActionListener<Void> listener) {

final RunOnce release = new RunOnce(() -> semaphore.release());
try {
@@ -132,6 +132,19 @@ public void updateMappingOnMaster(Index index, Mapping mappingUpdate, ActionList
}
}

/**
* Update mappings on the cluster-manager node, waiting for the change to be committed,
* but not for the mapping update to be applied on all nodes. The timeout specified by
* {@code timeout} is the cluster-manager node timeout ({@link ClusterManagerNodeRequest#clusterManagerNodeTimeout()}),
* potentially waiting for a cluster-manager node to be available.
*
* @deprecated As of 2.1, because supporting inclusive language, replaced by {@link #updateMappingOnClusterManager(Index, Mapping, ActionListener)}
*/
@Deprecated
public void updateMappingOnMaster(Index index, Mapping mappingUpdate, ActionListener<Void> listener) {
updateMappingOnClusterManager(index, mappingUpdate, listener);
}

// used by tests
int blockedThreads() {
return semaphore.getQueueLength();
Original file line number Diff line number Diff line change
@@ -134,7 +134,7 @@ public ClusterBootstrapService(
+ "]"
);
}
if (DiscoveryNode.isMasterNode(settings) == false) {
if (DiscoveryNode.isClusterManagerNode(settings) == false) {
throw new IllegalArgumentException(
"node with ["
+ DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey()
@@ -176,7 +176,7 @@ public static boolean discoveryIsConfigured(Settings settings) {
void onFoundPeersUpdated() {
final Set<DiscoveryNode> nodes = getDiscoveredNodes();
if (bootstrappingPermitted.get()
&& transportService.getLocalNode().isMasterNode()
&& transportService.getLocalNode().isClusterManagerNode()
&& bootstrapRequirements.isEmpty() == false
&& isBootstrappedSupplier.getAsBoolean() == false
&& nodes.stream().noneMatch(Coordinator::isZen1Node)) {
@@ -219,7 +219,7 @@ void scheduleUnconfiguredBootstrap() {
return;
}

if (transportService.getLocalNode().isMasterNode() == false) {
if (transportService.getLocalNode().isClusterManagerNode() == false) {
return;
}

@@ -257,7 +257,7 @@ private Set<DiscoveryNode> getDiscoveredNodes() {
}

private void startBootstrap(Set<DiscoveryNode> discoveryNodes, List<String> unsatisfiedRequirements) {
assert discoveryNodes.stream().allMatch(DiscoveryNode::isMasterNode) : discoveryNodes;
assert discoveryNodes.stream().allMatch(DiscoveryNode::isClusterManagerNode) : discoveryNodes;
assert discoveryNodes.stream().noneMatch(Coordinator::isZen1Node) : discoveryNodes;
assert unsatisfiedRequirements.size() < discoveryNodes.size() : discoveryNodes + " smaller than " + unsatisfiedRequirements;
if (bootstrappingPermitted.compareAndSet(true, false)) {
@@ -277,7 +277,7 @@ public static boolean isBootstrapPlaceholder(String nodeId) {
}

private void doBootstrap(VotingConfiguration votingConfiguration) {
assert transportService.getLocalNode().isMasterNode();
assert transportService.getLocalNode().isClusterManagerNode();

try {
votingConfigurationConsumer.accept(votingConfiguration);
Original file line number Diff line number Diff line change
@@ -207,7 +207,7 @@ String getDescription() {
discoveryWillContinueDescription
);

if (clusterState.nodes().getLocalNode().isMasterNode() == false) {
if (clusterState.nodes().getLocalNode().isClusterManagerNode() == false) {
return String.format(Locale.ROOT, "cluster-manager not discovered yet: %s", discoveryStateIgnoringQuorum);
}

Original file line number Diff line number Diff line change
@@ -652,7 +652,7 @@ public static class VoteCollection {
private final Set<Join> joins;

public boolean addVote(DiscoveryNode sourceNode) {
return sourceNode.isMasterNode() && nodes.put(sourceNode.getId(), sourceNode) == null;
return sourceNode.isClusterManagerNode() && nodes.put(sourceNode.getId(), sourceNode) == null;
}

public boolean addJoinVote(Join join) {
Original file line number Diff line number Diff line change
@@ -515,7 +515,8 @@ private void startElection() {
private void abdicateTo(DiscoveryNode newClusterManager) {
assert Thread.holdsLock(mutex);
assert mode == Mode.LEADER : "expected to be leader on abdication but was " + mode;
assert newClusterManager.isMasterNode() : "should only abdicate to cluster-manager-eligible node but was " + newClusterManager;
assert newClusterManager.isClusterManagerNode() : "should only abdicate to cluster-manager-eligible node but was "
+ newClusterManager;
final StartJoinRequest startJoinRequest = new StartJoinRequest(newClusterManager, Math.max(getCurrentTerm(), maxTermSeen) + 1);
logger.info("abdicating to {} with term {}", newClusterManager, startJoinRequest.getTerm());
getLastAcceptedState().nodes().clusterManagersFirstStream().forEach(node -> {
@@ -568,7 +569,7 @@ private Join joinLeaderInTerm(StartJoinRequest startJoinRequest) {

private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) {
assert Thread.holdsLock(mutex) == false;
assert getLocalNode().isMasterNode() : getLocalNode() + " received a join but is not cluster-manager-eligible";
assert getLocalNode().isClusterManagerNode() : getLocalNode() + " received a join but is not cluster-manager-eligible";
logger.trace("handleJoinRequest: as {}, handling {}", mode, joinRequest);

if (singleNodeDiscovery && joinRequest.getSourceNode().equals(getLocalNode()) == false) {
@@ -688,7 +689,7 @@ void becomeCandidate(String method) {
void becomeLeader(String method) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
assert mode == Mode.CANDIDATE : "expected candidate but was " + mode;
assert getLocalNode().isMasterNode() : getLocalNode() + " became a leader but is not cluster-manager-eligible";
assert getLocalNode().isClusterManagerNode() : getLocalNode() + " became a leader but is not cluster-manager-eligible";

logger.debug(
"{}: coordinator becoming LEADER in term {} (was {}, lastKnownLeader was [{}])",
@@ -714,7 +715,7 @@ void becomeLeader(String method) {

void becomeFollower(String method, DiscoveryNode leaderNode) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
assert leaderNode.isMasterNode() : leaderNode + " became a leader but is not cluster-manager-eligible";
assert leaderNode.isClusterManagerNode() : leaderNode + " became a leader but is not cluster-manager-eligible";
assert mode != Mode.LEADER : "do not switch to follower from leader (should be candidate first)";

if (mode == Mode.FOLLOWER && Optional.of(leaderNode).equals(lastKnownLeader)) {
@@ -994,7 +995,7 @@ public boolean setInitialConfiguration(final VotingConfiguration votingConfigura
return false;
}

if (getLocalNode().isMasterNode() == false) {
if (getLocalNode().isClusterManagerNode() == false) {
logger.debug("skip setting initial configuration as local node is not a cluster-manager-eligible node");
throw new CoordinationStateRejectedException(
"this node is not cluster-manager-eligible, but cluster bootstrapping can only happen on a cluster-manager-eligible node"
@@ -1061,14 +1062,14 @@ ClusterState improveConfiguration(ClusterState clusterState) {
// the logging much harder to follow.
final Stream<String> clusterManagerIneligibleNodeIdsInVotingConfig = StreamSupport.stream(clusterState.nodes().spliterator(), false)
.filter(
n -> n.isMasterNode() == false
n -> n.isClusterManagerNode() == false
&& (clusterState.getLastAcceptedConfiguration().getNodeIds().contains(n.getId())
|| clusterState.getLastCommittedConfiguration().getNodeIds().contains(n.getId()))
)
.map(DiscoveryNode::getId);

final Set<DiscoveryNode> liveNodes = StreamSupport.stream(clusterState.nodes().spliterator(), false)
.filter(DiscoveryNode::isMasterNode)
.filter(DiscoveryNode::isClusterManagerNode)
.filter(coordinationState.get()::containsJoinVoteFor)
.filter(discoveryNode -> isZen1Node(discoveryNode) == false)
.collect(Collectors.toSet());
@@ -1109,7 +1110,8 @@ static boolean validVotingConfigExclusionState(ClusterState clusterState) {
.map(VotingConfigExclusion::getNodeId)
.collect(Collectors.toSet());
for (DiscoveryNode node : clusterState.getNodes()) {
if (node.isMasterNode() && (nodeIdsWithAbsentName.contains(node.getId()) || nodeNamesWithAbsentId.contains(node.getName()))) {
if (node.isClusterManagerNode()
&& (nodeIdsWithAbsentName.contains(node.getId()) || nodeNamesWithAbsentId.contains(node.getName()))) {
return false;
}
}
@@ -1147,7 +1149,7 @@ public void onFailure(String source, Exception e) {

// exposed for tests
boolean missingJoinVoteFrom(DiscoveryNode node) {
return node.isMasterNode() && coordinationState.get().containsJoinVoteFor(node) == false;
return node.isClusterManagerNode() && coordinationState.get().containsJoinVoteFor(node) == false;
}

private void handleJoin(Join join) {
@@ -1426,7 +1428,7 @@ protected void onFoundPeersUpdated() {
private void startElectionScheduler() {
assert electionScheduler == null : electionScheduler;

if (getLocalNode().isMasterNode() == false) {
if (getLocalNode().isClusterManagerNode() == false) {
return;
}

@@ -1641,7 +1643,7 @@ public void onSuccess(String source) {
final ClusterState state = getLastAcceptedState(); // committed state
if (localNodeMayWinElection(state) == false) {
final List<DiscoveryNode> clusterManagerCandidates = completedNodes().stream()
.filter(DiscoveryNode::isMasterNode)
.filter(DiscoveryNode::isClusterManagerNode)
.filter(node -> nodeMayWinElection(state, node))
.filter(node -> {
// check if cluster_manager candidate would be able to get an election quorum if we were
Original file line number Diff line number Diff line change
@@ -157,7 +157,7 @@ public ClusterTasksResult<JoinTaskExecutor.Task> execute(ClusterState currentSta
+ "), there is a newer cluster-manager"
);
} else if (currentState.nodes().getClusterManagerNodeId() == null
&& joiningTasks.stream().anyMatch(Task::isBecomeMasterTask)) {
&& joiningTasks.stream().anyMatch(Task::isBecomeClusterManagerTask)) {
assert currentState.term() < term
: "there should be at most one become cluster-manager task per election (= by term)";
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder(currentState.coordinationMetadata())
@@ -316,7 +316,7 @@ void logLastFailedJoinAttempt() {
}

public void sendJoinRequest(DiscoveryNode destination, long term, Optional<Join> optionalJoin, Runnable onCompletion) {
assert destination.isMasterNode() : "trying to join cluster-manager-ineligible " + destination;
assert destination.isClusterManagerNode() : "trying to join cluster-manager-ineligible " + destination;
final StatusInfo statusInfo = nodeHealthService.getHealth();
if (statusInfo.getStatus() == UNHEALTHY) {
logger.debug("dropping join request to [{}]: [{}]", destination, statusInfo.getInfo());
@@ -367,7 +367,7 @@ public String executor() {
}

public void sendStartJoinRequest(final StartJoinRequest startJoinRequest, final DiscoveryNode destination) {
assert startJoinRequest.getSourceNode().isMasterNode() : "sending start-join request for cluster-manager-ineligible "
assert startJoinRequest.getSourceNode().isClusterManagerNode() : "sending start-join request for cluster-manager-ineligible "
+ startJoinRequest.getSourceNode();
transportService.sendRequest(destination, START_JOIN_ACTION_NAME, startJoinRequest, new TransportResponseHandler<Empty>() {
@Override
Loading