From 1c7429be8d366135e1f052e76bdfcc2a489890ba Mon Sep 17 00:00:00 2001 From: lrhkobe <34571087+lrhkobe@users.noreply.github.com> Date: Thu, 16 Dec 2021 17:14:38 +0800 Subject: [PATCH] [ISSUE #604]Improve the rebalance algorithm (#605) * modify:optimize flow control in downstreaming msg * modify:optimize stategy of selecting session in downstream msg * modify:optimize msg downstream,msg store in session * modify:fix bug:not a @Sharable handler * modify:downstream broadcast msg asynchronously * modify:remove unneccessary interface in eventmesh-connector-api * modify:fix conflict * modify:add license in EventMeshAction * modify:fix ack problem * modify:fix exception handle when exception occured in EventMeshTcpMessageDispatcher * modify:fix log print * modify: fix issue#496,ClassCastException * modify: improve rebalance algorithm close #604 --- .../rebalance/EventmeshRebalanceImpl.java | 133 ++++++++++++------ .../recommend/EventMeshRecommendImpl.java | 11 +- .../recommend/EventMeshRecommendStrategy.java | 2 +- .../runtime/util/ValueComparator.java | 6 +- 4 files changed, 100 insertions(+), 52 deletions(-) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventmeshRebalanceImpl.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventmeshRebalanceImpl.java index 204f9ed411..2929705542 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventmeshRebalanceImpl.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventmeshRebalanceImpl.java @@ -97,59 +97,108 @@ private Map queryLocalEventMeshMap(String cluster){ } private void doRebalanceByGroup(String cluster, String group, String purpose, Map eventMeshMap) throws Exception{ + logger.info("doRebalanceByGroup start, cluster:{}, group:{}, purpose:{}", cluster, group, purpose); + //query distribute data of loacl idc Map clientDistributionMap = queryLocalEventMeshDistributeData(cluster, group, purpose, eventMeshMap); if(clientDistributionMap == null || clientDistributionMap.size() == 0){ return; } + doRebalanceRedirect(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshName, group, purpose, eventMeshMap, clientDistributionMap); + logger.info("doRebalanceByGroup end, cluster:{}, group:{}, purpose:{}", cluster, group, purpose); + + } + + private void doRebalanceRedirect(String currEventMeshName, String group, String purpose, Map eventMeshMap, Map clientDistributionMap)throws Exception{ + if(clientDistributionMap == null || clientDistributionMap.size() == 0){ + return; + } + + //caculate client num need to redirect in currEventMesh + int judge = caculateRedirectNum(currEventMeshName, group, purpose, clientDistributionMap); + + if(judge > 0) { + + //select redirect target eventmesh lisg + List eventMeshRecommendResult = selectRedirectEventMesh(group, eventMeshMap, clientDistributionMap, judge, currEventMeshName); + if(eventMeshRecommendResult == null || eventMeshRecommendResult.size() != judge){ + logger.warn("doRebalance failed,recommendEventMeshNum is not consistent,recommendResult:{},judge:{}", eventMeshRecommendResult, judge); + return; + } + + //do redirect + doRedirect(group, purpose, judge, eventMeshRecommendResult); + }else{ + logger.info("rebalance condition not satisfy,group:{}, purpose:{},judge:{}", group, purpose, judge); + } + } + + private void doRedirect(String group, String purpose, int judge, List eventMeshRecommendResult) throws Exception{ + logger.info("doRebalance redirect start---------------------group:{},judge:{}", group, judge); + Set sessionSet = null; + if(EventMeshConstants.PURPOSE_SUB.equals(purpose)) { + sessionSet = eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupMap().get(group).getGroupConsumerSessions(); + }else if(EventMeshConstants.PURPOSE_PUB.equals(purpose)){ + sessionSet = eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupMap().get(group).getGroupProducerSessions(); + }else{ + logger.warn("doRebalance failed,param is illegal, group:{}, purpose:{}",group, purpose); + return; + } + List sessionList = new ArrayList<>(sessionSet); + Collections.shuffle(new ArrayList<>(sessionList)); + + for(int i= 0; i selectRedirectEventMesh(String group, Map eventMeshMap, Map clientDistributionMap, int judge, String evenMeshName)throws Exception{ + EventMeshRecommendStrategy eventMeshRecommendStrategy = new EventMeshRecommendImpl(eventMeshTCPServer); + return eventMeshRecommendStrategy.calculateRedirectRecommendEventMesh(eventMeshMap, clientDistributionMap, group, judge, evenMeshName); + } + + public int caculateRedirectNum(String eventMeshName, String group, String purpose, Map clientDistributionMap) throws Exception{ int sum = 0; for(Integer item : clientDistributionMap.values()){ sum += item.intValue(); } int currentNum = 0; - if(clientDistributionMap.get(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshName) != null){ - currentNum = clientDistributionMap.get(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshName); + if(clientDistributionMap.get(eventMeshName) != null){ + currentNum = clientDistributionMap.get(eventMeshName); } int avgNum = sum / clientDistributionMap.size(); - int judge = avgNum >= 2 ? avgNum/2 : 1; - - if(currentNum - avgNum > judge) { - Set sessionSet = null; - if(EventMeshConstants.PURPOSE_PUB.equals(purpose)){ - sessionSet = eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupMap().get(group).getGroupProducerSessions(); - }else if(EventMeshConstants.PURPOSE_SUB.equals(purpose)){ - sessionSet = eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupMap().get(group).getGroupConsumerSessions(); - }else{ - logger.warn("doRebalance failed,purpose is not support,purpose:{}", purpose); - return; + int modNum = sum % clientDistributionMap.size(); + + List eventMeshList = new ArrayList<>(clientDistributionMap.keySet()); + Collections.sort(eventMeshList); + int index = -1; + for(int i=0; i < Math.min(modNum, eventMeshList.size()); i++){ + if(StringUtils.equals(eventMeshName, eventMeshList.get(i))){ + index = i; + break; } - - List sessionList = new ArrayList<>(sessionSet); - Collections.shuffle(new ArrayList<>(sessionList)); - EventMeshRecommendStrategy eventMeshRecommendStrategy = new EventMeshRecommendImpl(eventMeshTCPServer); - List eventMeshRecommendResult = eventMeshRecommendStrategy.calculateRedirectRecommendEventMesh(eventMeshMap, clientDistributionMap, group, judge); - if(eventMeshRecommendResult == null || eventMeshRecommendResult.size() != judge){ - logger.warn("doRebalance failed,recommendProxyNum is not consistent,recommendResult:{},judge:{}", eventMeshRecommendResult, judge); - return; - } - logger.info("doRebalance redirect start---------------------group:{},purpose:{},judge:{}", group, purpose, judge); - for(int i= 0; i= 0) ? avgNum + 1 : avgNum; + } + logger.info("rebalance caculateRedirectNum,group:{}, purpose:{},sum:{},avgNum:{}," + + "modNum:{}, index:{}, currentNum:{}, rebalanceResult:{}", group, purpose, sum, + avgNum, modNum, index, currentNum, rebalanceResult); + return currentNum - rebalanceResult; } private Map queryLocalEventMeshDistributeData(String cluster, String group, String purpose, Map eventMeshMap){ @@ -197,12 +246,4 @@ private Map queryLocalEventMeshDistributeData(String cluster, S return localEventMeshDistributeData; } - - - private class ValueComparator implements Comparator> { - @Override - public int compare(Map.Entry x, Map.Entry y) { - return x.getValue().intValue() - y.getValue().intValue(); - } - } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/recommend/EventMeshRecommendImpl.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/recommend/EventMeshRecommendImpl.java index 6c8b4a349c..4b5091f413 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/recommend/EventMeshRecommendImpl.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/recommend/EventMeshRecommendImpl.java @@ -90,8 +90,11 @@ public String calculateRecommendEventMesh(String group, String purpose) throws E } @Override - public List calculateRedirectRecommendEventMesh(Map eventMeshMap, Map clientDistributeMap, String group, int recommendProxyNum) throws Exception { - logger.info("eventMeshMap:{},clientDistributionMap:{},group:{},recommendNum:{}", eventMeshMap,clientDistributeMap,group,recommendProxyNum); + public List calculateRedirectRecommendEventMesh(Map eventMeshMap, Map clientDistributeMap, String group, int recommendProxyNum, String eventMeshName) throws Exception { + if(recommendProxyNum < 1){ + return null; + } + logger.info("eventMeshMap:{},clientDistributionMap:{},group:{},recommendNum:{},currEventMeshName:{}", eventMeshMap,clientDistributeMap,group,recommendProxyNum, eventMeshName); List recommendProxyList = null; //find eventmesh with least client @@ -106,10 +109,10 @@ public List calculateRedirectRecommendEventMesh(Map even recommendProxyList = new ArrayList<>(recommendProxyNum); while(recommendProxyList.size() < recommendProxyNum){ Map.Entry minProxyItem = list.get(0); - int currProxyNum = clientDistributeMap.get(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshName); + int currProxyNum = clientDistributeMap.get(eventMeshName); recommendProxyList.add(eventMeshMap.get(minProxyItem.getKey())); clientDistributeMap.put(minProxyItem.getKey(),minProxyItem.getValue() + 1); - clientDistributeMap.put(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshName,currProxyNum - 1); + clientDistributeMap.put(eventMeshName,currProxyNum - 1); Collections.sort(list, vc); logger.info("clientDistributionMap after sort:{}", list); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/recommend/EventMeshRecommendStrategy.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/recommend/EventMeshRecommendStrategy.java index 3cc5044f1e..f17a513fe2 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/recommend/EventMeshRecommendStrategy.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/recommend/EventMeshRecommendStrategy.java @@ -22,5 +22,5 @@ public interface EventMeshRecommendStrategy { String calculateRecommendEventMesh(String group, String purpose) throws Exception; - List calculateRedirectRecommendEventMesh(Map eventMeshMap, Map clientDistributeMap, String group, int recommendNum) throws Exception; + List calculateRedirectRecommendEventMesh(Map eventMeshMap, Map clientDistributeMap, String group, int recommendNum, String eventMeshName) throws Exception; } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/ValueComparator.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/ValueComparator.java index 2eb24e57fd..510db76109 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/ValueComparator.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/ValueComparator.java @@ -23,6 +23,10 @@ public class ValueComparator implements Comparator> { @Override public int compare(Map.Entry x, Map.Entry y) { - return x.getValue().intValue() - y.getValue().intValue(); + if(x.getValue().intValue() != y.getValue().intValue()){ + return x.getValue().intValue() - y.getValue().intValue(); + }else { + return x.getKey().compareTo(y.getKey()); + } } }