Skip to content

Commit

Permalink
Support deploy multiple shuffle servers in a single node (#166)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
1.Sufflle server with same ip will not be assigned to same partition
2.Check whether port is in use in start script of shuffle server

### Why are the changes needed?
If we have a lot of  memory(more than 1T) per host, so we need deploy multiple shuffle servers in a single node. #77 

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
already added
  • Loading branch information
xianjingfeng authored Sep 27, 2022
1 parent 0645ebe commit f1cb43f
Show file tree
Hide file tree
Showing 10 changed files with 241 additions and 18 deletions.
4 changes: 3 additions & 1 deletion bin/start-shuffle-server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ MAIN_CLASS="org.apache.uniffle.server.ShuffleServer"
HADOOP_DEPENDENCY="$("$HADOOP_HOME/bin/hadoop" classpath --glob)"

echo "Check process existence"
is_jvm_process_running "$JPS" $MAIN_CLASS
RPC_PORT=`grep '^rss.rpc.server.port' $CONF_FILE |awk '{print $2}'`
is_port_in_use $RPC_PORT


CLASSPATH=""

Expand Down
8 changes: 8 additions & 0 deletions bin/utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ function is_jvm_process_running {

}

function is_port_in_use {
local port=$1
local tmp=$(lsof -i:$port | grep LISTEN)
if [[ "$tmp" != "" ]]; then
echo "port[$port] is already in use"
exit 1
fi
}
#---
# load_rss_env: Export RSS environment variables
#---
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.coordinator;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_ASSGINMENT_HOST_STRATEGY;

public abstract class AbstractAssignmentStrategy implements AssignmentStrategy {
protected final CoordinatorConf conf;
private final HostAssignmentStrategy assignmentHostStrategy;

public AbstractAssignmentStrategy(CoordinatorConf conf) {
this.conf = conf;
assignmentHostStrategy = conf.get(COORDINATOR_ASSGINMENT_HOST_STRATEGY);
}

protected List<ServerNode> getCandidateNodes(List<ServerNode> allNodes, int expectNum) {
switch (assignmentHostStrategy) {
case MUST_DIFF: return getCandidateNodesWithDiffHost(allNodes, expectNum);
case PREFER_DIFF: return tryGetCandidateNodesWithDiffHost(allNodes, expectNum);
case NONE: return allNodes.subList(0, expectNum);
default: throw new RuntimeException("Unsupported host assignment strategy:" + assignmentHostStrategy);
}
}

protected List<ServerNode> tryGetCandidateNodesWithDiffHost(List<ServerNode> allNodes, int expectNum) {
List<ServerNode> candidatesNodes = getCandidateNodesWithDiffHost(allNodes, expectNum);
Set<ServerNode> candidatesNodeSet = candidatesNodes.stream().collect(Collectors.toSet());
if (candidatesNodes.size() < expectNum) {
for (ServerNode node : allNodes) {
if (candidatesNodeSet.contains(node)) {
continue;
}
candidatesNodes.add(node);
if (candidatesNodes.size() >= expectNum) {
break;
}
}
}
return candidatesNodes;
}

protected List<ServerNode> getCandidateNodesWithDiffHost(List<ServerNode> allNodes, int expectNum) {
List<ServerNode> candidatesNodes = new ArrayList<>();
Set<String> hostIpCandidate = new HashSet<>();
for (ServerNode node : allNodes) {
if (hostIpCandidate.contains(node.getIp())) {
continue;
}
hostIpCandidate.add(node.getIp());
candidatesNodes.add(node);
if (candidatesNodes.size() >= expectNum) {
break;
}
}
return candidatesNodes;
}


public enum HostAssignmentStrategy {
MUST_DIFF,
PREFER_DIFF,
NONE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ public AssignmentStrategyFactory(CoordinatorConf conf, ClusterManager clusterMan
public AssignmentStrategy getAssignmentStrategy() {
StrategyName strategy = conf.get(CoordinatorConf.COORDINATOR_ASSIGNMENT_STRATEGY);
if (StrategyName.BASIC == strategy) {
return new BasicAssignmentStrategy(clusterManager);
return new BasicAssignmentStrategy(clusterManager, conf);
} else if (StrategyName.PARTITION_BALANCE == strategy) {
return new PartitionBalanceAssignmentStrategy(clusterManager);
return new PartitionBalanceAssignmentStrategy(clusterManager, conf);
} else {
throw new UnsupportedOperationException("Unsupported assignment strategy.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@

import org.apache.uniffle.common.PartitionRange;

public class BasicAssignmentStrategy implements AssignmentStrategy {
public class BasicAssignmentStrategy extends AbstractAssignmentStrategy {

private static final Logger LOG = LoggerFactory.getLogger(BasicAssignmentStrategy.class);

private ClusterManager clusterManager;

public BasicAssignmentStrategy(ClusterManager clusterManager) {
public BasicAssignmentStrategy(ClusterManager clusterManager, CoordinatorConf conf) {
super(conf);
this.clusterManager = clusterManager;
}

Expand Down Expand Up @@ -81,6 +82,7 @@ private List<ServerNode> getRequiredServers(Set<String> requiredTags, int expect
LOG.warn("Can't get expected servers [" + expectedNum + "] and found only [" + servers.size() + "]");
return servers;
}
return servers.subList(0, expectedNum);

return getCandidateNodes(servers, expectedNum);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ public class CoordinatorConf extends RssBaseConf {
.intType()
.defaultValue(3)
.withDescription("The number of times to read and write HDFS files");
public static final ConfigOption<AbstractAssignmentStrategy.HostAssignmentStrategy>
COORDINATOR_ASSGINMENT_HOST_STRATEGY =
ConfigOptions.key("rss.coordinator.assignment.host.strategy")
.enumType(AbstractAssignmentStrategy.HostAssignmentStrategy.class)
.defaultValue(AbstractAssignmentStrategy.HostAssignmentStrategy.PREFER_DIFF)
.withDescription("Strategy for selecting shuffle servers");

public CoordinatorConf() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,15 @@
* ....
**/

public class PartitionBalanceAssignmentStrategy implements AssignmentStrategy {
public class PartitionBalanceAssignmentStrategy extends AbstractAssignmentStrategy {

private static final Logger LOG = LoggerFactory.getLogger(PartitionBalanceAssignmentStrategy.class);

private ClusterManager clusterManager;
private Map<ServerNode, PartitionAssignmentInfo> serverToPartitions = Maps.newConcurrentMap();

public PartitionBalanceAssignmentStrategy(ClusterManager clusterManager) {
public PartitionBalanceAssignmentStrategy(ClusterManager clusterManager, CoordinatorConf conf) {
super(conf);
this.clusterManager = clusterManager;
}

Expand Down Expand Up @@ -119,7 +120,7 @@ public int compare(ServerNode o1, ServerNode o2) {
expectNum = nodes.size();
}

List<ServerNode> candidatesNodes = nodes.subList(0, expectNum);
List<ServerNode> candidatesNodes = getCandidateNodes(nodes, expectNum);
int idx = 0;
List<PartitionRange> ranges = CoordinatorUtils.generateRanges(totalPartitionNum, 1);
for (PartitionRange range : ranges) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void setUp() throws Exception {
CoordinatorConf ssc = new CoordinatorConf();
ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, shuffleNodesMax);
clusterManager = new SimpleClusterManager(ssc, new Configuration());
strategy = new BasicAssignmentStrategy(clusterManager);
strategy = new BasicAssignmentStrategy(clusterManager, ssc);
}

@AfterEach
Expand All @@ -63,7 +63,7 @@ public void tearDown() throws IOException {
@Test
public void testAssign() {
for (int i = 0; i < 20; ++i) {
clusterManager.add(new ServerNode(String.valueOf(i), "", 0, 0, 0,
clusterManager.add(new ServerNode(String.valueOf(i), "127.0.0." + i, 0, 0, 0,
20 - i, 0, tags, true));
}

Expand All @@ -90,7 +90,7 @@ public void testAssign() {
@Test
public void testRandomAssign() {
for (int i = 0; i < 20; ++i) {
clusterManager.add(new ServerNode(String.valueOf(i), "", 0, 0, 0,
clusterManager.add(new ServerNode(String.valueOf(i), "127.0.0." + i, 0, 0, 0,
0, 0, tags, true));
}
PartitionRangeAssignment pra = strategy.assign(100, 10, 2, tags, -1);
Expand Down Expand Up @@ -156,7 +156,7 @@ public void testAssignmentShuffleNodesNum() {
Set<String> serverTags = Sets.newHashSet("tag-1");

for (int i = 0; i < 20; ++i) {
clusterManager.add(new ServerNode("t1-" + i, "", 0, 0, 0,
clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0,
20 - i, 0, serverTags, true));
}

Expand Down
Loading

0 comments on commit f1cb43f

Please sign in to comment.