diff --git a/bin/start-shuffle-server.sh b/bin/start-shuffle-server.sh index 4fd6d6ae77..40d18d1f80 100644 --- a/bin/start-shuffle-server.sh +++ b/bin/start-shuffle-server.sh @@ -48,7 +48,9 @@ MAIN_CLASS="org.apache.uniffle.server.ShuffleServer" HADOOP_DEPENDENCY="$("$HADOOP_HOME/bin/hadoop" classpath --glob)" echo "Check process existence" -is_jvm_process_running "$JPS" $MAIN_CLASS +RPC_PORT=`grep '^rss.rpc.server.port' $CONF_FILE |awk '{print $2}'` +is_port_in_use $RPC_PORT + CLASSPATH="" diff --git a/bin/utils.sh b/bin/utils.sh index a7c5d42d16..6c4cadc5a1 100644 --- a/bin/utils.sh +++ b/bin/utils.sh @@ -133,6 +133,14 @@ function is_jvm_process_running { } +function is_port_in_use { + local port=$1 + local tmp=$(lsof -i:$port | grep LISTEN) + if [[ "$tmp" != "" ]]; then + echo "port[$port] is already in use" + exit 1 + fi +} #--- # load_rss_env: Export RSS environment variables #--- diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/AbstractAssignmentStrategy.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/AbstractAssignmentStrategy.java new file mode 100644 index 0000000000..a16bdf0943 --- /dev/null +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/AbstractAssignmentStrategy.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.coordinator; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_ASSGINMENT_HOST_STRATEGY; + +public abstract class AbstractAssignmentStrategy implements AssignmentStrategy { + protected final CoordinatorConf conf; + private final HostAssignmentStrategy assignmentHostStrategy; + + public AbstractAssignmentStrategy(CoordinatorConf conf) { + this.conf = conf; + assignmentHostStrategy = conf.get(COORDINATOR_ASSGINMENT_HOST_STRATEGY); + } + + protected List getCandidateNodes(List allNodes, int expectNum) { + switch (assignmentHostStrategy) { + case MUST_DIFF: return getCandidateNodesWithDiffHost(allNodes, expectNum); + case PREFER_DIFF: return tryGetCandidateNodesWithDiffHost(allNodes, expectNum); + case NONE: return allNodes.subList(0, expectNum); + default: throw new RuntimeException("Unsupported host assignment strategy:" + assignmentHostStrategy); + } + } + + protected List tryGetCandidateNodesWithDiffHost(List allNodes, int expectNum) { + List candidatesNodes = getCandidateNodesWithDiffHost(allNodes, expectNum); + Set candidatesNodeSet = candidatesNodes.stream().collect(Collectors.toSet()); + if (candidatesNodes.size() < expectNum) { + for (ServerNode node : allNodes) { + if (candidatesNodeSet.contains(node)) { + continue; + } + candidatesNodes.add(node); + if (candidatesNodes.size() >= expectNum) { + break; + } + } + } + return candidatesNodes; + } + + protected List getCandidateNodesWithDiffHost(List allNodes, int expectNum) { + List candidatesNodes = new ArrayList<>(); + Set hostIpCandidate = new HashSet<>(); + for (ServerNode node : allNodes) { + if (hostIpCandidate.contains(node.getIp())) { + continue; + } + hostIpCandidate.add(node.getIp()); + candidatesNodes.add(node); + if (candidatesNodes.size() >= expectNum) { + break; + } + } + return candidatesNodes; + } + + + public enum HostAssignmentStrategy { + MUST_DIFF, + PREFER_DIFF, + NONE + } +} diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/AssignmentStrategyFactory.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/AssignmentStrategyFactory.java index 073689caa9..0ae393144c 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/AssignmentStrategyFactory.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/AssignmentStrategyFactory.java @@ -30,9 +30,9 @@ public AssignmentStrategyFactory(CoordinatorConf conf, ClusterManager clusterMan public AssignmentStrategy getAssignmentStrategy() { StrategyName strategy = conf.get(CoordinatorConf.COORDINATOR_ASSIGNMENT_STRATEGY); if (StrategyName.BASIC == strategy) { - return new BasicAssignmentStrategy(clusterManager); + return new BasicAssignmentStrategy(clusterManager, conf); } else if (StrategyName.PARTITION_BALANCE == strategy) { - return new PartitionBalanceAssignmentStrategy(clusterManager); + return new PartitionBalanceAssignmentStrategy(clusterManager, conf); } else { throw new UnsupportedOperationException("Unsupported assignment strategy."); } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/BasicAssignmentStrategy.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/BasicAssignmentStrategy.java index 54ca2c22d9..9bb4ba87b6 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/BasicAssignmentStrategy.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/BasicAssignmentStrategy.java @@ -29,13 +29,14 @@ import org.apache.uniffle.common.PartitionRange; -public class BasicAssignmentStrategy implements AssignmentStrategy { +public class BasicAssignmentStrategy extends AbstractAssignmentStrategy { private static final Logger LOG = LoggerFactory.getLogger(BasicAssignmentStrategy.class); private ClusterManager clusterManager; - public BasicAssignmentStrategy(ClusterManager clusterManager) { + public BasicAssignmentStrategy(ClusterManager clusterManager, CoordinatorConf conf) { + super(conf); this.clusterManager = clusterManager; } @@ -81,6 +82,7 @@ private List getRequiredServers(Set requiredTags, int expect LOG.warn("Can't get expected servers [" + expectedNum + "] and found only [" + servers.size() + "]"); return servers; } - return servers.subList(0, expectedNum); + + return getCandidateNodes(servers, expectedNum); } } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java index b4b4126aa3..96053ab1a9 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java @@ -153,6 +153,12 @@ public class CoordinatorConf extends RssBaseConf { .intType() .defaultValue(3) .withDescription("The number of times to read and write HDFS files"); + public static final ConfigOption + COORDINATOR_ASSGINMENT_HOST_STRATEGY = + ConfigOptions.key("rss.coordinator.assignment.host.strategy") + .enumType(AbstractAssignmentStrategy.HostAssignmentStrategy.class) + .defaultValue(AbstractAssignmentStrategy.HostAssignmentStrategy.PREFER_DIFF) + .withDescription("Strategy for selecting shuffle servers"); public CoordinatorConf() { } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategy.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategy.java index b915f31714..a31f4bb950 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategy.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategy.java @@ -50,14 +50,15 @@ * .... **/ -public class PartitionBalanceAssignmentStrategy implements AssignmentStrategy { +public class PartitionBalanceAssignmentStrategy extends AbstractAssignmentStrategy { private static final Logger LOG = LoggerFactory.getLogger(PartitionBalanceAssignmentStrategy.class); private ClusterManager clusterManager; private Map serverToPartitions = Maps.newConcurrentMap(); - public PartitionBalanceAssignmentStrategy(ClusterManager clusterManager) { + public PartitionBalanceAssignmentStrategy(ClusterManager clusterManager, CoordinatorConf conf) { + super(conf); this.clusterManager = clusterManager; } @@ -119,7 +120,7 @@ public int compare(ServerNode o1, ServerNode o2) { expectNum = nodes.size(); } - List candidatesNodes = nodes.subList(0, expectNum); + List candidatesNodes = getCandidateNodes(nodes, expectNum); int idx = 0; List ranges = CoordinatorUtils.generateRanges(totalPartitionNum, 1); for (PartitionRange range : ranges) { diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/BasicAssignmentStrategyTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/BasicAssignmentStrategyTest.java index 69e0087408..89e54ac983 100644 --- a/coordinator/src/test/java/org/apache/uniffle/coordinator/BasicAssignmentStrategyTest.java +++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/BasicAssignmentStrategyTest.java @@ -51,7 +51,7 @@ public void setUp() throws Exception { CoordinatorConf ssc = new CoordinatorConf(); ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, shuffleNodesMax); clusterManager = new SimpleClusterManager(ssc, new Configuration()); - strategy = new BasicAssignmentStrategy(clusterManager); + strategy = new BasicAssignmentStrategy(clusterManager, ssc); } @AfterEach @@ -63,7 +63,7 @@ public void tearDown() throws IOException { @Test public void testAssign() { for (int i = 0; i < 20; ++i) { - clusterManager.add(new ServerNode(String.valueOf(i), "", 0, 0, 0, + clusterManager.add(new ServerNode(String.valueOf(i), "127.0.0." + i, 0, 0, 0, 20 - i, 0, tags, true)); } @@ -90,7 +90,7 @@ public void testAssign() { @Test public void testRandomAssign() { for (int i = 0; i < 20; ++i) { - clusterManager.add(new ServerNode(String.valueOf(i), "", 0, 0, 0, + clusterManager.add(new ServerNode(String.valueOf(i), "127.0.0." + i, 0, 0, 0, 0, 0, tags, true)); } PartitionRangeAssignment pra = strategy.assign(100, 10, 2, tags, -1); @@ -156,7 +156,7 @@ public void testAssignmentShuffleNodesNum() { Set serverTags = Sets.newHashSet("tag-1"); for (int i = 0; i < 20; ++i) { - clusterManager.add(new ServerNode("t1-" + i, "", 0, 0, 0, + clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, 20 - i, 0, serverTags, true)); } diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategyTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategyTest.java index 67d54f6885..fad48f7756 100644 --- a/coordinator/src/test/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategyTest.java +++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategyTest.java @@ -20,7 +20,9 @@ import java.io.IOException; import java.util.Collection; import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -34,6 +36,7 @@ import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -49,7 +52,7 @@ public void setUp() throws Exception { CoordinatorConf ssc = new CoordinatorConf(); ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, shuffleNodesMax); clusterManager = new SimpleClusterManager(ssc, new Configuration()); - strategy = new PartitionBalanceAssignmentStrategy(clusterManager); + strategy = new PartitionBalanceAssignmentStrategy(clusterManager, ssc); } @Test @@ -181,7 +184,7 @@ void updateServerResource(List resources) { for (int i = 0; i < 20; i++) { ServerNode node = new ServerNode( String.valueOf((char)('a' + i)), - "", + "127.0.0." + i, 0, 10L, 5L, @@ -198,7 +201,7 @@ public void testAssignmentShuffleNodesNum() { Set serverTags = Sets.newHashSet("tag-1"); for (int i = 0; i < 20; ++i) { - clusterManager.add(new ServerNode("t1-" + i, "", 0, 0, 0, + clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, 20 - i, 0, serverTags, true)); } @@ -268,7 +271,7 @@ public void testAssignmentShuffleNodesNum() { */ serverTags = Sets.newHashSet("tag-2"); for (int i = 0; i < shuffleNodesMax - 1; ++i) { - clusterManager.add(new ServerNode("t2-" + i, "", 0, 0, 0, + clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 0, 0, 0, 20 - i, 0, serverTags, true)); } pra = strategy.assign(100, 1, 1, serverTags, shuffleNodesMax); @@ -282,4 +285,120 @@ public void testAssignmentShuffleNodesNum() { .size() ); } + + + @Test + public void testAssignmentWithMustDiff() throws Exception { + CoordinatorConf ssc = new CoordinatorConf(); + ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, shuffleNodesMax); + ssc.set(CoordinatorConf.COORDINATOR_ASSGINMENT_HOST_STRATEGY, + AbstractAssignmentStrategy.HostAssignmentStrategy.MUST_DIFF); + SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new Configuration()); + AssignmentStrategy strategy = new PartitionBalanceAssignmentStrategy(clusterManager, ssc); + + Set serverTags = Sets.newHashSet("tag-1"); + + for (int i = 0; i < 5; ++i) { + clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, + 20 - i, 0, serverTags, true)); + } + for (int i = 0; i < 5; ++i) { + clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0, + 20 - i, 0, serverTags, true)); + } + PartitionRangeAssignment pra = strategy.assign(100, 1, 5, serverTags, -1); + pra.getAssignments().values().forEach((nodeList) -> { + Map nodeMap = new HashMap<>(); + nodeList.forEach((node) -> { + ServerNode serverNode = nodeMap.get(node.getIp()); + assertNull(serverNode); + nodeMap.put(node.getIp(), node); + }); + }); + + pra = strategy.assign(100, 1, 6, serverTags, -1); + pra.getAssignments().values().forEach((nodeList) -> { + Map nodeMap = new HashMap<>(); + boolean hasSameHost = false; + for (ServerNode node : nodeList) { + ServerNode serverNode = nodeMap.get(node.getIp()); + if (serverNode != null) { + hasSameHost = true; + break; + } + assertNull(serverNode); + nodeMap.put(node.getIp(), node); + } + assertTrue(hasSameHost); + }); + } + + @Test + public void testAssignmentWithPreferDiff() throws Exception { + CoordinatorConf ssc = new CoordinatorConf(); + ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, shuffleNodesMax); + ssc.set(CoordinatorConf.COORDINATOR_ASSGINMENT_HOST_STRATEGY, + AbstractAssignmentStrategy.HostAssignmentStrategy.PREFER_DIFF); + SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new Configuration()); + AssignmentStrategy strategy = new PartitionBalanceAssignmentStrategy(clusterManager, ssc); + Set serverTags = Sets.newHashSet("tag-1"); + + for (int i = 0; i < 3; ++i) { + clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, + 20 - i, 0, serverTags, true)); + } + for (int i = 0; i < 2; ++i) { + clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0, + 20 - i, 0, serverTags, true)); + } + PartitionRangeAssignment pra = strategy.assign(100, 1, 5, serverTags, -1); + pra.getAssignments().values().forEach((nodeList) -> { + assertEquals(5, nodeList.size()); + }); + + ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, 3); + clusterManager = new SimpleClusterManager(ssc, new Configuration()); + for (int i = 0; i < 3; ++i) { + clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, + 20 - i, 0, serverTags, true)); + } + for (int i = 0; i < 2; ++i) { + clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0, + 20 - i, 0, serverTags, true)); + } + strategy = new PartitionBalanceAssignmentStrategy(clusterManager, ssc); + pra = strategy.assign(100, 1, 3, serverTags, -1); + pra.getAssignments().values().forEach((nodeList) -> { + Map nodeMap = new HashMap<>(); + nodeList.forEach((node) -> { + ServerNode serverNode = nodeMap.get(node.getIp()); + assertNull(serverNode); + nodeMap.put(node.getIp(), node); + }); + }); + } + + @Test + public void testAssignmentWithNone() throws Exception { + CoordinatorConf ssc = new CoordinatorConf(); + ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, shuffleNodesMax); + ssc.set(CoordinatorConf.COORDINATOR_ASSGINMENT_HOST_STRATEGY, + AbstractAssignmentStrategy.HostAssignmentStrategy.NONE); + SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new Configuration()); + AssignmentStrategy strategy = new PartitionBalanceAssignmentStrategy(clusterManager, ssc); + Set serverTags = Sets.newHashSet("tag-1"); + + for (int i = 0; i < 3; ++i) { + clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, + 20 - i, 0, serverTags, true)); + } + for (int i = 0; i < 2; ++i) { + clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0, + 20 - i, 0, serverTags, true)); + } + PartitionRangeAssignment pra = strategy.assign(100, 1, 5, serverTags, -1); + pra.getAssignments().values().forEach((nodeList) -> { + assertEquals(5, nodeList.size()); + }); + } } diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/PartitionRangeAssignmentTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/PartitionRangeAssignmentTest.java index 4c23530ca8..4950eaa7b6 100644 --- a/coordinator/src/test/java/org/apache/uniffle/coordinator/PartitionRangeAssignmentTest.java +++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/PartitionRangeAssignmentTest.java @@ -39,7 +39,7 @@ public void test() { for (int i = 0; i < 9; i = i + 3) { PartitionRange range = new PartitionRange(i, i + 2); List nodes = Collections.singletonList(new ServerNode( - String.valueOf(i), "", i / 3, 0, 0, 0, 0, Sets.newHashSet("test"), true)); + String.valueOf(i), "127.0.0." + i, i / 3, 0, 0, 0, 0, Sets.newHashSet("test"), true)); sortedMap.put(range, nodes); }