From e7a0338ed876ef745b1f59ab86498329c9630145 Mon Sep 17 00:00:00 2001 From: wqliang Date: Sun, 22 Mar 2020 22:25:15 +0800 Subject: [PATCH] not send nearby if local mq less than threshold --- .../client/common/DeFiBusClientConfig.java | 11 +++++++++ .../impl/producer/DeFiBusProducerImpl.java | 3 ++- .../producer/HealthyMessageQueueSelector.java | 23 +++++++++++++++---- .../HealthyMessageQueueSelectorTest.java | 10 ++++---- 4 files changed, 37 insertions(+), 10 deletions(-) diff --git a/defibus-client/src/main/java/cn/webank/defibus/client/common/DeFiBusClientConfig.java b/defibus-client/src/main/java/cn/webank/defibus/client/common/DeFiBusClientConfig.java index cb557ab347..73e035364f 100644 --- a/defibus-client/src/main/java/cn/webank/defibus/client/common/DeFiBusClientConfig.java +++ b/defibus-client/src/main/java/cn/webank/defibus/client/common/DeFiBusClientConfig.java @@ -58,6 +58,8 @@ public class DeFiBusClientConfig { private long pullTimeDelayMillsWhenFlowControl = 50; private long pullTimeDelayMillsWhenSuspend = 500; + private int minMqNumWhenSendLocal = 1; + public String getProducerGroup() { return producerGroup; } @@ -270,6 +272,14 @@ public void setPullTimeDelayMillsWhenSuspend(long pullTimeDelayMillsWhenSuspend) this.pullTimeDelayMillsWhenSuspend = pullTimeDelayMillsWhenSuspend; } + public int getMinMqNumWhenSendLocal() { + return minMqNumWhenSendLocal; + } + + public void setMinMqNumWhenSendLocal(int minMqNumWhenSendLocal) { + this.minMqNumWhenSendLocal = minMqNumWhenSendLocal; + } + @Override public String toString() { return "DeFiBusClientConfig{" + "producerGroup='" + producerGroup + '\'' + @@ -299,6 +309,7 @@ public void setPullTimeDelayMillsWhenSuspend(long pullTimeDelayMillsWhenSuspend) ", pullTimeDelayMillsWhenExcept=" + pullTimeDelayMillsWhenExcept + ", pullTimeDelayMillsWhenFlowControl=" + pullTimeDelayMillsWhenFlowControl + ", pullTimeDelayMillsWhenSuspend=" + pullTimeDelayMillsWhenSuspend + + ", minMqNumWhenSendLocal=" + minMqNumWhenSendLocal + '}'; } } diff --git a/defibus-client/src/main/java/cn/webank/defibus/client/impl/producer/DeFiBusProducerImpl.java b/defibus-client/src/main/java/cn/webank/defibus/client/impl/producer/DeFiBusProducerImpl.java index 5c8542081f..f5e7b01a05 100644 --- a/defibus-client/src/main/java/cn/webank/defibus/client/impl/producer/DeFiBusProducerImpl.java +++ b/defibus-client/src/main/java/cn/webank/defibus/client/impl/producer/DeFiBusProducerImpl.java @@ -70,7 +70,8 @@ public class DeFiBusProducerImpl { public DeFiBusProducerImpl(DeFiBusProducer deFiBusProducer, DeFiBusClientConfig deFiBusClientConfig, DeFiBusClientInstance deFiBusClientInstance) { this.deFiBusProducer = deFiBusProducer; - this.messageQueueSelector = new HealthyMessageQueueSelector(new MessageQueueHealthManager(deFiBusClientConfig.getQueueIsolateTimeMillis())); + this.messageQueueSelector = new HealthyMessageQueueSelector(new MessageQueueHealthManager(deFiBusClientConfig.getQueueIsolateTimeMillis()), + deFiBusClientConfig.getMinMqNumWhenSendLocal()); executorService = deFiBusClientInstance.getExecutorService(); scheduledExecutorService = deFiBusClientInstance.getScheduledExecutorService(); diff --git a/defibus-client/src/main/java/cn/webank/defibus/client/impl/producer/HealthyMessageQueueSelector.java b/defibus-client/src/main/java/cn/webank/defibus/client/impl/producer/HealthyMessageQueueSelector.java index c64fc00025..c59ae99e97 100644 --- a/defibus-client/src/main/java/cn/webank/defibus/client/impl/producer/HealthyMessageQueueSelector.java +++ b/defibus-client/src/main/java/cn/webank/defibus/client/impl/producer/HealthyMessageQueueSelector.java @@ -39,11 +39,13 @@ public class HealthyMessageQueueSelector implements MessageQueueSelector { private final AtomicInteger sendWhichLocalQueue = new AtomicInteger(0); private final AtomicInteger sendWhichRemoteQueue = new AtomicInteger(0); private final MessageQueueHealthManager messageQueueHealthManager; + private int minMqCountWhenSendLocal = 1; private Map sendNearbyMapping = new HashMap<>(); private Set localBrokers = new HashSet(); - public HealthyMessageQueueSelector(MessageQueueHealthManager messageQueueHealthManager) { + public HealthyMessageQueueSelector(MessageQueueHealthManager messageQueueHealthManager, int minMqCountWhenSendLocal) { this.messageQueueHealthManager = messageQueueHealthManager; + this.minMqCountWhenSendLocal = minMqCountWhenSendLocal; } @Override @@ -61,7 +63,14 @@ public MessageQueue select(List mqs, Message msg, final Object sel if (pub2local) { List localMQs = new ArrayList<>(); List remoteMqs = new ArrayList<>(); - separateLocalAndRemoteMQs(mqs, localBrokers, localMQs, remoteMqs); + HashMap localBrokerMQCount = separateLocalAndRemoteMQs(mqs, localBrokers, localMQs, remoteMqs); + + for (String brokerName : localBrokerMQCount.keySet()) { + //if MQ num less than threshold, send msg to all broker + if (localBrokerMQCount.get(brokerName) <= minMqCountWhenSendLocal) { + localMQs.addAll(remoteMqs); + } + } //try select a mq from local idc first MessageQueue candidate = selectMessageQueue(localMQs, sendWhichLocalQueue, lastOne, msg); @@ -154,20 +163,26 @@ private List filterMqsByBrokerName(final List mqs, S return result; } - private void separateLocalAndRemoteMQs(List mqs, Set localBrokers, + private HashMap separateLocalAndRemoteMQs(List mqs, Set localBrokers, List localMQs, List remoteMQs) { if (localMQs == null) localMQs = new ArrayList<>(); if (remoteMQs == null) remoteMQs = new ArrayList<>(); - + HashMap brokerMQCount = new HashMap<>(); for (MessageQueue mq : mqs) { if (localBrokers.contains(mq.getBrokerName())) { localMQs.add(mq); + Integer count = brokerMQCount.get(mq.getBrokerName()); + if (count == null) { + count = 0; + } + brokerMQCount.put(mq.getBrokerName(), count+1); } else { remoteMQs.add(mq); } } + return brokerMQCount; } public MessageQueueHealthManager getMessageQueueHealthManager() { diff --git a/defibus-client/src/test/java/cn/webank/defibus/client/producer/HealthyMessageQueueSelectorTest.java b/defibus-client/src/test/java/cn/webank/defibus/client/producer/HealthyMessageQueueSelectorTest.java index a0ee2e22ce..81a749a9d1 100644 --- a/defibus-client/src/test/java/cn/webank/defibus/client/producer/HealthyMessageQueueSelectorTest.java +++ b/defibus-client/src/test/java/cn/webank/defibus/client/producer/HealthyMessageQueueSelectorTest.java @@ -45,7 +45,7 @@ public void testLocalValidQueue() { // PowerMockito.when(producerImplMock.getLocalBrokers()).thenReturn(locBrokers); MessageQueueHealthManager manager = new MessageQueueHealthManager(60 * 1000); - HealthyMessageQueueSelector selector = new HealthyMessageQueueSelector(manager); + HealthyMessageQueueSelector selector = new HealthyMessageQueueSelector(manager, 1); selector.setLocalBrokers(locBrokers); List mqs = new ArrayList<>(); @@ -65,7 +65,7 @@ public void testErrorQueue() { locBrokers.add("localIDC"); MessageQueueHealthManager manager = new MessageQueueHealthManager(60 * 1000); - HealthyMessageQueueSelector selector = new HealthyMessageQueueSelector(manager); + HealthyMessageQueueSelector selector = new HealthyMessageQueueSelector(manager, 1); selector.setLocalBrokers(locBrokers); List mqs = new ArrayList<>(); @@ -92,7 +92,7 @@ public void testOtherValidQueue() { locBrokers.add("localIDC"); MessageQueueHealthManager manager = new MessageQueueHealthManager(60 * 1000); - HealthyMessageQueueSelector selector = new HealthyMessageQueueSelector(manager); + HealthyMessageQueueSelector selector = new HealthyMessageQueueSelector(manager, 1); selector.setLocalBrokers(locBrokers); List mqs = new ArrayList<>(); @@ -122,7 +122,7 @@ public void testBizTopic() { localBrokers.add(localBrokerName); MessageQueueHealthManager manager = new MessageQueueHealthManager(60 * 1000); - HealthyMessageQueueSelector selector = new HealthyMessageQueueSelector(manager); + HealthyMessageQueueSelector selector = new HealthyMessageQueueSelector(manager, 1); selector.setLocalBrokers(localBrokers); //construct mq data @@ -193,7 +193,7 @@ public void testRetryBizTopic() { } MessageQueueHealthManager manager = new MessageQueueHealthManager(60 * 1000); - HealthyMessageQueueSelector selector = new HealthyMessageQueueSelector(manager); + HealthyMessageQueueSelector selector = new HealthyMessageQueueSelector(manager, 1); selector.setLocalBrokers(localBrokers); //construct mq data