From f1cb43fd3928daf845f8d5a6495e51f40a98ac10 Mon Sep 17 00:00:00 2001 From: xianjingfeng <583872483@qq.com> Date: Tue, 27 Sep 2022 14:06:53 +0800 Subject: [PATCH] Support deploy multiple shuffle servers in a single node (#166) ### What changes were proposed in this pull request? 1.Sufflle server with same ip will not be assigned to same partition 2.Check whether port is in use in start script of shuffle server ### Why are the changes needed? If we have a lot of memory(more than 1T) per host, so we need deploy multiple shuffle servers in a single node. #77 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? already added --- bin/start-shuffle-server.sh | 4 +- bin/utils.sh | 8 ++ .../AbstractAssignmentStrategy.java | 85 ++++++++++++ .../AssignmentStrategyFactory.java | 4 +- .../coordinator/BasicAssignmentStrategy.java | 8 +- .../uniffle/coordinator/CoordinatorConf.java | 6 + .../PartitionBalanceAssignmentStrategy.java | 7 +- .../BasicAssignmentStrategyTest.java | 8 +- ...artitionBalanceAssignmentStrategyTest.java | 127 +++++++++++++++++- .../PartitionRangeAssignmentTest.java | 2 +- 10 files changed, 241 insertions(+), 18 deletions(-) create mode 100644 coordinator/src/main/java/org/apache/uniffle/coordinator/AbstractAssignmentStrategy.java diff --git a/bin/start-shuffle-server.sh b/bin/start-shuffle-server.sh index 4fd6d6ae77..40d18d1f80 100644 --- a/bin/start-shuffle-server.sh +++ b/bin/start-shuffle-server.sh @@ -48,7 +48,9 @@ MAIN_CLASS="org.apache.uniffle.server.ShuffleServer" HADOOP_DEPENDENCY="$("$HADOOP_HOME/bin/hadoop" classpath --glob)" echo "Check process existence" -is_jvm_process_running "$JPS" $MAIN_CLASS +RPC_PORT=`grep '^rss.rpc.server.port' $CONF_FILE |awk '{print $2}'` +is_port_in_use $RPC_PORT + CLASSPATH="" diff --git a/bin/utils.sh b/bin/utils.sh index a7c5d42d16..6c4cadc5a1 100644 --- a/bin/utils.sh +++ b/bin/utils.sh @@ -133,6 +133,14 @@ function is_jvm_process_running { } +function is_port_in_use { + local port=$1 + local tmp=$(lsof -i:$port | grep LISTEN) + if [[ "$tmp" != "" ]]; then + echo "port[$port] is already in use" + exit 1 + fi +} #--- # load_rss_env: Export RSS environment variables #--- diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/AbstractAssignmentStrategy.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/AbstractAssignmentStrategy.java new file mode 100644 index 0000000000..a16bdf0943 --- /dev/null +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/AbstractAssignmentStrategy.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.coordinator; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_ASSGINMENT_HOST_STRATEGY; + +public abstract class AbstractAssignmentStrategy implements AssignmentStrategy { + protected final CoordinatorConf conf; + private final HostAssignmentStrategy assignmentHostStrategy; + + public AbstractAssignmentStrategy(CoordinatorConf conf) { + this.conf = conf; + assignmentHostStrategy = conf.get(COORDINATOR_ASSGINMENT_HOST_STRATEGY); + } + + protected List getCandidateNodes(List allNodes, int expectNum) { + switch (assignmentHostStrategy) { + case MUST_DIFF: return getCandidateNodesWithDiffHost(allNodes, expectNum); + case PREFER_DIFF: return tryGetCandidateNodesWithDiffHost(allNodes, expectNum); + case NONE: return allNodes.subList(0, expectNum); + default: throw new RuntimeException("Unsupported host assignment strategy:" + assignmentHostStrategy); + } + } + + protected List tryGetCandidateNodesWithDiffHost(List allNodes, int expectNum) { + List candidatesNodes = getCandidateNodesWithDiffHost(allNodes, expectNum); + Set candidatesNodeSet = candidatesNodes.stream().collect(Collectors.toSet()); + if (candidatesNodes.size() < expectNum) { + for (ServerNode node : allNodes) { + if (candidatesNodeSet.contains(node)) { + continue; + } + candidatesNodes.add(node); + if (candidatesNodes.size() >= expectNum) { + break; + } + } + } + return candidatesNodes; + } + + protected List getCandidateNodesWithDiffHost(List allNodes, int expectNum) { + List candidatesNodes = new ArrayList<>(); + Set hostIpCandidate = new HashSet<>(); + for (ServerNode node : allNodes) { + if (hostIpCandidate.contains(node.getIp())) { + continue; + } + hostIpCandidate.add(node.getIp()); + candidatesNodes.add(node); + if (candidatesNodes.size() >= expectNum) { + break; + } + } + return candidatesNodes; + } + + + public enum HostAssignmentStrategy { + MUST_DIFF, + PREFER_DIFF, + NONE + } +} diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/AssignmentStrategyFactory.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/AssignmentStrategyFactory.java index 073689caa9..0ae393144c 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/AssignmentStrategyFactory.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/AssignmentStrategyFactory.java @@ -30,9 +30,9 @@ public AssignmentStrategyFactory(CoordinatorConf conf, ClusterManager clusterMan public AssignmentStrategy getAssignmentStrategy() { StrategyName strategy = conf.get(CoordinatorConf.COORDINATOR_ASSIGNMENT_STRATEGY); if (StrategyName.BASIC == strategy) { - return new BasicAssignmentStrategy(clusterManager); + return new BasicAssignmentStrategy(clusterManager, conf); } else if (StrategyName.PARTITION_BALANCE == strategy) { - return new PartitionBalanceAssignmentStrategy(clusterManager); + return new PartitionBalanceAssignmentStrategy(clusterManager, conf); } else { throw new UnsupportedOperationException("Unsupported assignment strategy."); } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/BasicAssignmentStrategy.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/BasicAssignmentStrategy.java index 54ca2c22d9..9bb4ba87b6 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/BasicAssignmentStrategy.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/BasicAssignmentStrategy.java @@ -29,13 +29,14 @@ import org.apache.uniffle.common.PartitionRange; -public class BasicAssignmentStrategy implements AssignmentStrategy { +public class BasicAssignmentStrategy extends AbstractAssignmentStrategy { private static final Logger LOG = LoggerFactory.getLogger(BasicAssignmentStrategy.class); private ClusterManager clusterManager; - public BasicAssignmentStrategy(ClusterManager clusterManager) { + public BasicAssignmentStrategy(ClusterManager clusterManager, CoordinatorConf conf) { + super(conf); this.clusterManager = clusterManager; } @@ -81,6 +82,7 @@ private List getRequiredServers(Set requiredTags, int expect LOG.warn("Can't get expected servers [" + expectedNum + "] and found only [" + servers.size() + "]"); return servers; } - return servers.subList(0, expectedNum); + + return getCandidateNodes(servers, expectedNum); } } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java index b4b4126aa3..96053ab1a9 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java @@ -153,6 +153,12 @@ public class CoordinatorConf extends RssBaseConf { .intType() .defaultValue(3) .withDescription("The number of times to read and write HDFS files"); + public static final ConfigOption + COORDINATOR_ASSGINMENT_HOST_STRATEGY = + ConfigOptions.key("rss.coordinator.assignment.host.strategy") + .enumType(AbstractAssignmentStrategy.HostAssignmentStrategy.class) + .defaultValue(AbstractAssignmentStrategy.HostAssignmentStrategy.PREFER_DIFF) + .withDescription("Strategy for selecting shuffle servers"); public CoordinatorConf() { } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategy.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategy.java index b915f31714..a31f4bb950 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategy.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategy.java @@ -50,14 +50,15 @@ * .... **/ -public class PartitionBalanceAssignmentStrategy implements AssignmentStrategy { +public class PartitionBalanceAssignmentStrategy extends AbstractAssignmentStrategy { private static final Logger LOG = LoggerFactory.getLogger(PartitionBalanceAssignmentStrategy.class); private ClusterManager clusterManager; private Map serverToPartitions = Maps.newConcurrentMap(); - public PartitionBalanceAssignmentStrategy(ClusterManager clusterManager) { + public PartitionBalanceAssignmentStrategy(ClusterManager clusterManager, CoordinatorConf conf) { + super(conf); this.clusterManager = clusterManager; } @@ -119,7 +120,7 @@ public int compare(ServerNode o1, ServerNode o2) { expectNum = nodes.size(); } - List candidatesNodes = nodes.subList(0, expectNum); + List candidatesNodes = getCandidateNodes(nodes, expectNum); int idx = 0; List ranges = CoordinatorUtils.generateRanges(totalPartitionNum, 1); for (PartitionRange range : ranges) { diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/BasicAssignmentStrategyTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/BasicAssignmentStrategyTest.java index 69e0087408..89e54ac983 100644 --- a/coordinator/src/test/java/org/apache/uniffle/coordinator/BasicAssignmentStrategyTest.java +++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/BasicAssignmentStrategyTest.java @@ -51,7 +51,7 @@ public void setUp() throws Exception { CoordinatorConf ssc = new CoordinatorConf(); ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, shuffleNodesMax); clusterManager = new SimpleClusterManager(ssc, new Configuration()); - strategy = new BasicAssignmentStrategy(clusterManager); + strategy = new BasicAssignmentStrategy(clusterManager, ssc); } @AfterEach @@ -63,7 +63,7 @@ public void tearDown() throws IOException { @Test public void testAssign() { for (int i = 0; i < 20; ++i) { - clusterManager.add(new ServerNode(String.valueOf(i), "", 0, 0, 0, + clusterManager.add(new ServerNode(String.valueOf(i), "127.0.0." + i, 0, 0, 0, 20 - i, 0, tags, true)); } @@ -90,7 +90,7 @@ public void testAssign() { @Test public void testRandomAssign() { for (int i = 0; i < 20; ++i) { - clusterManager.add(new ServerNode(String.valueOf(i), "", 0, 0, 0, + clusterManager.add(new ServerNode(String.valueOf(i), "127.0.0." + i, 0, 0, 0, 0, 0, tags, true)); } PartitionRangeAssignment pra = strategy.assign(100, 10, 2, tags, -1); @@ -156,7 +156,7 @@ public void testAssignmentShuffleNodesNum() { Set serverTags = Sets.newHashSet("tag-1"); for (int i = 0; i < 20; ++i) { - clusterManager.add(new ServerNode("t1-" + i, "", 0, 0, 0, + clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, 20 - i, 0, serverTags, true)); } diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategyTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategyTest.java index 67d54f6885..fad48f7756 100644 --- a/coordinator/src/test/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategyTest.java +++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategyTest.java @@ -20,7 +20,9 @@ import java.io.IOException; import java.util.Collection; import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -34,6 +36,7 @@ import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -49,7 +52,7 @@ public void setUp() throws Exception { CoordinatorConf ssc = new CoordinatorConf(); ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, shuffleNodesMax); clusterManager = new SimpleClusterManager(ssc, new Configuration()); - strategy = new PartitionBalanceAssignmentStrategy(clusterManager); + strategy = new PartitionBalanceAssignmentStrategy(clusterManager, ssc); } @Test @@ -181,7 +184,7 @@ void updateServerResource(List resources) { for (int i = 0; i < 20; i++) { ServerNode node = new ServerNode( String.valueOf((char)('a' + i)), - "", + "127.0.0." + i, 0, 10L, 5L, @@ -198,7 +201,7 @@ public void testAssignmentShuffleNodesNum() { Set serverTags = Sets.newHashSet("tag-1"); for (int i = 0; i < 20; ++i) { - clusterManager.add(new ServerNode("t1-" + i, "", 0, 0, 0, + clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, 20 - i, 0, serverTags, true)); } @@ -268,7 +271,7 @@ public void testAssignmentShuffleNodesNum() { */ serverTags = Sets.newHashSet("tag-2"); for (int i = 0; i < shuffleNodesMax - 1; ++i) { - clusterManager.add(new ServerNode("t2-" + i, "", 0, 0, 0, + clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 0, 0, 0, 20 - i, 0, serverTags, true)); } pra = strategy.assign(100, 1, 1, serverTags, shuffleNodesMax); @@ -282,4 +285,120 @@ public void testAssignmentShuffleNodesNum() { .size() ); } + + + @Test + public void testAssignmentWithMustDiff() throws Exception { + CoordinatorConf ssc = new CoordinatorConf(); + ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, shuffleNodesMax); + ssc.set(CoordinatorConf.COORDINATOR_ASSGINMENT_HOST_STRATEGY, + AbstractAssignmentStrategy.HostAssignmentStrategy.MUST_DIFF); + SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new Configuration()); + AssignmentStrategy strategy = new PartitionBalanceAssignmentStrategy(clusterManager, ssc); + + Set serverTags = Sets.newHashSet("tag-1"); + + for (int i = 0; i < 5; ++i) { + clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, + 20 - i, 0, serverTags, true)); + } + for (int i = 0; i < 5; ++i) { + clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0, + 20 - i, 0, serverTags, true)); + } + PartitionRangeAssignment pra = strategy.assign(100, 1, 5, serverTags, -1); + pra.getAssignments().values().forEach((nodeList) -> { + Map nodeMap = new HashMap<>(); + nodeList.forEach((node) -> { + ServerNode serverNode = nodeMap.get(node.getIp()); + assertNull(serverNode); + nodeMap.put(node.getIp(), node); + }); + }); + + pra = strategy.assign(100, 1, 6, serverTags, -1); + pra.getAssignments().values().forEach((nodeList) -> { + Map nodeMap = new HashMap<>(); + boolean hasSameHost = false; + for (ServerNode node : nodeList) { + ServerNode serverNode = nodeMap.get(node.getIp()); + if (serverNode != null) { + hasSameHost = true; + break; + } + assertNull(serverNode); + nodeMap.put(node.getIp(), node); + } + assertTrue(hasSameHost); + }); + } + + @Test + public void testAssignmentWithPreferDiff() throws Exception { + CoordinatorConf ssc = new CoordinatorConf(); + ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, shuffleNodesMax); + ssc.set(CoordinatorConf.COORDINATOR_ASSGINMENT_HOST_STRATEGY, + AbstractAssignmentStrategy.HostAssignmentStrategy.PREFER_DIFF); + SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new Configuration()); + AssignmentStrategy strategy = new PartitionBalanceAssignmentStrategy(clusterManager, ssc); + Set serverTags = Sets.newHashSet("tag-1"); + + for (int i = 0; i < 3; ++i) { + clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, + 20 - i, 0, serverTags, true)); + } + for (int i = 0; i < 2; ++i) { + clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0, + 20 - i, 0, serverTags, true)); + } + PartitionRangeAssignment pra = strategy.assign(100, 1, 5, serverTags, -1); + pra.getAssignments().values().forEach((nodeList) -> { + assertEquals(5, nodeList.size()); + }); + + ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, 3); + clusterManager = new SimpleClusterManager(ssc, new Configuration()); + for (int i = 0; i < 3; ++i) { + clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, + 20 - i, 0, serverTags, true)); + } + for (int i = 0; i < 2; ++i) { + clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0, + 20 - i, 0, serverTags, true)); + } + strategy = new PartitionBalanceAssignmentStrategy(clusterManager, ssc); + pra = strategy.assign(100, 1, 3, serverTags, -1); + pra.getAssignments().values().forEach((nodeList) -> { + Map nodeMap = new HashMap<>(); + nodeList.forEach((node) -> { + ServerNode serverNode = nodeMap.get(node.getIp()); + assertNull(serverNode); + nodeMap.put(node.getIp(), node); + }); + }); + } + + @Test + public void testAssignmentWithNone() throws Exception { + CoordinatorConf ssc = new CoordinatorConf(); + ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, shuffleNodesMax); + ssc.set(CoordinatorConf.COORDINATOR_ASSGINMENT_HOST_STRATEGY, + AbstractAssignmentStrategy.HostAssignmentStrategy.NONE); + SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new Configuration()); + AssignmentStrategy strategy = new PartitionBalanceAssignmentStrategy(clusterManager, ssc); + Set serverTags = Sets.newHashSet("tag-1"); + + for (int i = 0; i < 3; ++i) { + clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, + 20 - i, 0, serverTags, true)); + } + for (int i = 0; i < 2; ++i) { + clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0, + 20 - i, 0, serverTags, true)); + } + PartitionRangeAssignment pra = strategy.assign(100, 1, 5, serverTags, -1); + pra.getAssignments().values().forEach((nodeList) -> { + assertEquals(5, nodeList.size()); + }); + } } diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/PartitionRangeAssignmentTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/PartitionRangeAssignmentTest.java index 4c23530ca8..4950eaa7b6 100644 --- a/coordinator/src/test/java/org/apache/uniffle/coordinator/PartitionRangeAssignmentTest.java +++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/PartitionRangeAssignmentTest.java @@ -39,7 +39,7 @@ public void test() { for (int i = 0; i < 9; i = i + 3) { PartitionRange range = new PartitionRange(i, i + 2); List nodes = Collections.singletonList(new ServerNode( - String.valueOf(i), "", i / 3, 0, 0, 0, 0, Sets.newHashSet("test"), true)); + String.valueOf(i), "127.0.0." + i, i / 3, 0, 0, 0, 0, Sets.newHashSet("test"), true)); sortedMap.put(range, nodes); }