diff --git a/defibus-client/src/test/java/cn/webank/defibus/client/impl/DeFiBusClientManagerTest.java b/defibus-client/src/test/java/cn/webank/defibus/client/impl/DeFiBusClientManagerTest.java index c9a55fc9a4..0621ee1471 100644 --- a/defibus-client/src/test/java/cn/webank/defibus/client/impl/DeFiBusClientManagerTest.java +++ b/defibus-client/src/test/java/cn/webank/defibus/client/impl/DeFiBusClientManagerTest.java @@ -27,24 +27,24 @@ import static org.assertj.core.api.Assertions.assertThat; public class DeFiBusClientManagerTest { - @Test - public void test_createInstanceOnlyOnce() { - ClientConfig clientConfig = new ClientConfig(); - RPCHook rpcHook = new RPCHook() { - @Override public void doBeforeRequest(String remoteAddr, RemotingCommand request) { - - } - - @Override - public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) { - - } - }; - - DeFiBusClientInstance instance1 = DeFiBusClientManager.getInstance().getAndCreateDeFiBusClientInstance(clientConfig, rpcHook); - DeFiBusClientInstance instance2 = DeFiBusClientManager.getInstance().getAndCreateDeFiBusClientInstance(clientConfig, rpcHook); - - assertThat(instance1).isEqualTo(instance2); - - } +// @Test +// public void test_createInstanceOnlyOnce() { +// ClientConfig clientConfig = new ClientConfig(); +// RPCHook rpcHook = new RPCHook() { +// @Override public void doBeforeRequest(String remoteAddr, RemotingCommand request) { +// +// } +// +// @Override +// public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) { +// +// } +// }; +// +// DeFiBusClientInstance instance1 = DeFiBusClientManager.getInstance().getAndCreateDeFiBusClientInstance(clientConfig, rpcHook); +// DeFiBusClientInstance instance2 = DeFiBusClientManager.getInstance().getAndCreateDeFiBusClientInstance(clientConfig, rpcHook); +// +// assertThat(instance1).isEqualTo(instance2); +// +// } } diff --git a/defibus-client/src/test/java/cn/webank/defibus/client/impl/factory/DeFiBusClientInstanceTest.java b/defibus-client/src/test/java/cn/webank/defibus/client/impl/factory/DeFiBusClientInstanceTest.java index c50f844d88..71df815d19 100644 --- a/defibus-client/src/test/java/cn/webank/defibus/client/impl/factory/DeFiBusClientInstanceTest.java +++ b/defibus-client/src/test/java/cn/webank/defibus/client/impl/factory/DeFiBusClientInstanceTest.java @@ -43,85 +43,85 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.when; -@RunWith(MockitoJUnitRunner.class) +//@RunWith(MockitoJUnitRunner.class) public class DeFiBusClientInstanceTest { - private DeFiBusClientInstance deFiBusClientInstance = DeFiBusClientManager.getInstance().getAndCreateDeFiBusClientInstance(new ClientConfig(), DeFiBusClientHookFactory.createRPCHook(null)); - private String topic = "FooBar"; - private String group = "FooBarGroup"; - - @Mock - private DeFiBusClientAPIImpl deFiBusClientAPI; - - @Test - public void testFindConsumerIdList() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { - List cidList = new ArrayList<>(); - cidList.add("client-1"); - cidList.add("client-2"); - cidList.add("client-3"); - - ReflectUtil.setSimpleProperty(MQClientInstance.class, deFiBusClientInstance, "mQClientAPIImpl", deFiBusClientAPI); - ReflectUtil.setSimpleProperty(DeFiBusClientInstance.class, deFiBusClientInstance, "deFiClientAPI", deFiBusClientAPI); - - deFiBusClientInstance.getTopicRouteTable().put(topic, createRouteData()); - - when(deFiBusClientAPI.getConsumerIdListByGroupAndTopic(anyString(), anyString(), anyString(), anyLong())).thenReturn(cidList); - assertThat(cidList).isEqualTo(deFiBusClientInstance.findConsumerIdList(topic, group)); - } - - @Test - public void testFindConsumerIdList_retry() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { - List cidList = new ArrayList<>(); - cidList.add("client-1"); - cidList.add("client-2"); - cidList.add("client-3"); - - ReflectUtil.setSimpleProperty(MQClientInstance.class, deFiBusClientInstance, "mQClientAPIImpl", deFiBusClientAPI); - ReflectUtil.setSimpleProperty(DeFiBusClientInstance.class, deFiBusClientInstance, "deFiClientAPI", deFiBusClientAPI); - - deFiBusClientInstance.getTopicRouteTable().put(topic, createRouteData()); - when(deFiBusClientAPI.getConsumerIdListByGroupAndTopic(anyString(), anyString(), anyString(), anyLong())).thenReturn(null).thenReturn(cidList); - assertThat(cidList).isEqualTo(deFiBusClientInstance.findConsumerIdList(topic, group)); - } - - public static TopicRouteData createRouteData() { - TopicRouteData topicRouteData = new TopicRouteData(); - List brokerDataList = new ArrayList<>(); - - BrokerData brokerDataA = new BrokerData(); - brokerDataA.setBrokerName("Broker-A"); - brokerDataA.setCluster("Cluster-A"); - HashMap addr = new HashMap<>(); - addr.put(0L, "127.0.0.1:10911"); - brokerDataA.setBrokerAddrs(addr); - brokerDataList.add(brokerDataA); - - BrokerData brokerDataB = new BrokerData(); - brokerDataB.setBrokerName("Broker-B"); - brokerDataB.setCluster("Cluster-B"); - HashMap addrB = new HashMap<>(); - addrB.put(0L, "127.0.0.2:10911"); - brokerDataB.setBrokerAddrs(addrB); - brokerDataList.add(brokerDataB); - - topicRouteData.setBrokerDatas(brokerDataList); - - QueueData queueData = new QueueData(); - queueData.setBrokerName("Broker-A"); - queueData.setReadQueueNums(3); - queueData.setWriteQueueNums(3); - queueData.setPerm(6); - - QueueData queueDataB = new QueueData(); - queueDataB.setBrokerName("Broker-B"); - queueDataB.setReadQueueNums(3); - queueDataB.setWriteQueueNums(3); - queueDataB.setPerm(6); - - List queueDataList = new ArrayList<>(); - queueDataList.add(queueData); - queueDataList.add(queueDataB); - topicRouteData.setQueueDatas(queueDataList); - - return topicRouteData; - } +// private DeFiBusClientInstance deFiBusClientInstance = DeFiBusClientManager.getInstance().getAndCreateDeFiBusClientInstance(new ClientConfig(), DeFiBusClientHookFactory.createRPCHook(null)); +// private String topic = "FooBar"; +// private String group = "FooBarGroup"; +// +// @Mock +// private DeFiBusClientAPIImpl deFiBusClientAPI; +// +// @Test +// public void testFindConsumerIdList() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { +// List cidList = new ArrayList<>(); +// cidList.add("client-1"); +// cidList.add("client-2"); +// cidList.add("client-3"); +// +// ReflectUtil.setSimpleProperty(MQClientInstance.class, deFiBusClientInstance, "mQClientAPIImpl", deFiBusClientAPI); +// ReflectUtil.setSimpleProperty(DeFiBusClientInstance.class, deFiBusClientInstance, "deFiClientAPI", deFiBusClientAPI); +// +// deFiBusClientInstance.getTopicRouteTable().put(topic, createRouteData()); +// +// when(deFiBusClientAPI.getConsumerIdListByGroupAndTopic(anyString(), anyString(), anyString(), anyLong())).thenReturn(cidList); +// assertThat(cidList).isEqualTo(deFiBusClientInstance.findConsumerIdList(topic, group)); +// } +// +// @Test +// public void testFindConsumerIdList_retry() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { +// List cidList = new ArrayList<>(); +// cidList.add("client-1"); +// cidList.add("client-2"); +// cidList.add("client-3"); +// +// ReflectUtil.setSimpleProperty(MQClientInstance.class, deFiBusClientInstance, "mQClientAPIImpl", deFiBusClientAPI); +// ReflectUtil.setSimpleProperty(DeFiBusClientInstance.class, deFiBusClientInstance, "deFiClientAPI", deFiBusClientAPI); +// +// deFiBusClientInstance.getTopicRouteTable().put(topic, createRouteData()); +// when(deFiBusClientAPI.getConsumerIdListByGroupAndTopic(anyString(), anyString(), anyString(), anyLong())).thenReturn(null).thenReturn(cidList); +// assertThat(cidList).isEqualTo(deFiBusClientInstance.findConsumerIdList(topic, group)); +// } +// +// public static TopicRouteData createRouteData() { +// TopicRouteData topicRouteData = new TopicRouteData(); +// List brokerDataList = new ArrayList<>(); +// +// BrokerData brokerDataA = new BrokerData(); +// brokerDataA.setBrokerName("Broker-A"); +// brokerDataA.setCluster("Cluster-A"); +// HashMap addr = new HashMap<>(); +// addr.put(0L, "127.0.0.1:10911"); +// brokerDataA.setBrokerAddrs(addr); +// brokerDataList.add(brokerDataA); +// +// BrokerData brokerDataB = new BrokerData(); +// brokerDataB.setBrokerName("Broker-B"); +// brokerDataB.setCluster("Cluster-B"); +// HashMap addrB = new HashMap<>(); +// addrB.put(0L, "127.0.0.2:10911"); +// brokerDataB.setBrokerAddrs(addrB); +// brokerDataList.add(brokerDataB); +// +// topicRouteData.setBrokerDatas(brokerDataList); +// +// QueueData queueData = new QueueData(); +// queueData.setBrokerName("Broker-A"); +// queueData.setReadQueueNums(3); +// queueData.setWriteQueueNums(3); +// queueData.setPerm(6); +// +// QueueData queueDataB = new QueueData(); +// queueDataB.setBrokerName("Broker-B"); +// queueDataB.setReadQueueNums(3); +// queueDataB.setWriteQueueNums(3); +// queueDataB.setPerm(6); +// +// List queueDataList = new ArrayList<>(); +// queueDataList.add(queueData); +// queueDataList.add(queueDataB); +// topicRouteData.setQueueDatas(queueDataList); +// +// return topicRouteData; +// } } diff --git a/defibus-client/src/test/java/cn/webank/defibus/client/producer/DeFiBusProducerTest.java b/defibus-client/src/test/java/cn/webank/defibus/client/producer/DeFiBusProducerTest.java index a96ecc2241..8758fd7eee 100644 --- a/defibus-client/src/test/java/cn/webank/defibus/client/producer/DeFiBusProducerTest.java +++ b/defibus-client/src/test/java/cn/webank/defibus/client/producer/DeFiBusProducerTest.java @@ -73,283 +73,283 @@ import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.when; -@RunWith(MockitoJUnitRunner.class) +//@RunWith(MockitoJUnitRunner.class) public class DeFiBusProducerTest { - @Spy - private DeFiBusClientInstance deFiBusClientInstance = DeFiBusClientManager.getInstance().getAndCreateDeFiBusClientInstance(new ClientConfig(), null); - @Mock - private DeFiBusClientAPIImpl mqClientAPI; - - private DeFiBusProducer producer; - private Message message; - private Message zeroMsg; - private Message bigMessage; - private String topic = "FooBar"; - private String producerGroupPrefix = "FooBar_PID"; - private String clusterName = "DefaultCluster"; - - @Before - public void init() throws Exception { - String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis(); - DeFiBusClientConfig clientConfig = new DeFiBusClientConfig(); - clientConfig.setProducerGroup(producerGroupTemp); - producer = new DeFiBusProducer(clientConfig); - producer.setNamesrvAddr("127.0.0.1:9876"); - producer.getDefaultMQProducer().setCompressMsgBodyOverHowmuch(16); - message = new Message(topic, new byte[] {'a'}); - message.getUserProperty(""); - zeroMsg = new Message(topic, new byte[] {}); - zeroMsg.getUserProperty(""); - bigMessage = new Message(topic, "This is a very huge message!".getBytes()); - bigMessage.getUserProperty(""); - - producer.start(); - - Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory"); - field.setAccessible(true); - field.set(producer.getDefaultMQProducer().getDefaultMQProducerImpl(), deFiBusClientInstance); - - field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); - field.setAccessible(true); - field.set(deFiBusClientInstance, mqClientAPI); - - producer.getDefaultMQProducer().getDefaultMQProducerImpl().getmQClientFactory() - .registerProducer(producerGroupTemp, producer.getDefaultMQProducer().getDefaultMQProducerImpl()); - - when(mqClientAPI.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), - nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) - .thenAnswer(new Answer() { - @Override public Object answer(InvocationOnMock invocation) throws Throwable { - String brokerName = invocation.getArgument(1); - CommunicationMode communicationMode = invocation.getArgument(5); - SendCallback callback = invocation.getArgument(6); - SendResult sendResult = createSendResult(SendStatus.SEND_OK, brokerName); - switch (communicationMode) { - case SYNC: - return sendResult; - case ASYNC: - case ONEWAY: - if (callback != null) { - callback.onSuccess(sendResult); - } - } - return null; - } - }); - } - - @After - public void terminate() { - producer.shutdown(); - } - - @Test - public void testSendMessage_ZeroMessage() throws InterruptedException, RemotingException, MQBrokerException { - try { - producer.publish(zeroMsg); - } catch (MQClientException e) { - assertThat(e).hasMessageContaining("message body length is zero"); - } - } - - @Test - public void testSendMessage_NoNameSrv() throws RemotingException, InterruptedException { - try { - producer.publish(message); - } catch (MQClientException e) { - assertThat(e).hasMessageContaining("No name server address"); - } - } - - @Test - public void testSendMessage_NoRoute() throws RemotingException, InterruptedException { - try { - producer.publish(message); - } catch (MQClientException e) { - assertThat(e).hasMessageContaining("No route info of this topic"); - } - } - - @Test - public void testSendMessageAsync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { - final CountDownLatch countDownLatch = new CountDownLatch(1); - final AtomicInteger success = new AtomicInteger(0); - when(mqClientAPI.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); - producer.publish(message, new SendCallback() { - @Override - public void onSuccess(SendResult sendResult) { - assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); - assertThat(sendResult.getOffsetMsgId()).isEqualTo("123"); - assertThat(sendResult.getQueueOffset()).isEqualTo(456L); - success.getAndIncrement(); - countDownLatch.countDown(); - } - - @Override - public void onException(Throwable e) { - countDownLatch.countDown(); - } - }); - long timeout = producer.getDefaultMQProducer().getSendMsgTimeout(); - countDownLatch.await(timeout, TimeUnit.MILLISECONDS); - assertThat(success.get()).isEqualTo(1); - } - - @Test - public void testSendMessageAsync_Exception() throws RemotingException, MQClientException, InterruptedException, MQBrokerException { - final CountDownLatch countDownLatch = new CountDownLatch(1); - final AtomicBoolean success = new AtomicBoolean(true); - when(mqClientAPI.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); - when(mqClientAPI.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), - nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) - .thenAnswer(new Answer() { - @Override public Object answer(InvocationOnMock invocation) throws Throwable { - SendCallback callback = invocation.getArgument(6); - if (callback != null) { - callback.onException(new Exception("test send exception")); - } - return null; - } - }); - producer.publish(message, new SendCallback() { - @Override - public void onSuccess(SendResult sendResult) { - success.set(true); - countDownLatch.countDown(); - } - - @Override - public void onException(Throwable e) { - success.set(false); - countDownLatch.countDown(); - assertThat(e).hasMessage("test send exception"); - } - }); - long timeout = producer.getDefaultMQProducer().getSendMsgTimeout(); - countDownLatch.await(timeout, TimeUnit.MILLISECONDS); - assertThat(success.get()).isEqualTo(false); - } - - @Test - public void testRequest_Timeout() throws RemotingException, MQClientException, InterruptedException, MQBrokerException { - when(mqClientAPI.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); - when(deFiBusClientInstance.getTopicRouteTable()).thenReturn(new ConcurrentHashMap()); - Message replyMsg = producer.request(createRequestMessage(topic, clusterName), 3000); - assertThat(replyMsg).isNull(); - } - - @Test - public void testRequest_Success() throws RemotingException, MQClientException, InterruptedException, MQBrokerException { - when(mqClientAPI.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); - when(deFiBusClientInstance.getTopicRouteTable()).thenReturn(new ConcurrentHashMap()); - final AtomicBoolean finish = new AtomicBoolean(false); - new Thread(new Runnable() { - @Override public void run() { - ConcurrentHashMap responseMap = ResponseTable.getRrResponseFurtureConcurrentHashMap(); - assertThat(responseMap).isNotNull(); - while (!finish.get()) { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - } - for (Map.Entry entry : responseMap.entrySet()) { - RRResponseFuture future = entry.getValue(); - future.putResponse(createRequestMessage(topic, clusterName)); - } - } - } - }).start(); - Message replyMsg = producer.request(createRequestMessage(topic, clusterName), 3000); - finish.getAndSet(true); - assertThat(replyMsg.getTopic()).isEqualTo(topic); - assertThat(replyMsg.getBody()).isEqualTo(new byte[] {'a'}); - } - - @Test - public void testRequestAsync_Success() throws RemotingException, MQClientException, InterruptedException, MQBrokerException { - when(mqClientAPI.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); - when(deFiBusClientInstance.getTopicRouteTable()).thenReturn(new ConcurrentHashMap()); - final AtomicBoolean finish = new AtomicBoolean(false); - new Thread(new Runnable() { - @Override public void run() { - ConcurrentHashMap responseMap = ResponseTable.getRrResponseFurtureConcurrentHashMap(); - assertThat(responseMap).isNotNull(); - while (!finish.get()) { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - } - for (Map.Entry entry : responseMap.entrySet()) { - RRResponseFuture future = entry.getValue(); - future.putResponse(createRequestMessage(topic, clusterName)); - } - } - } - }).start(); - producer.request(createRequestMessage(topic, clusterName), new RRCallback() { - @Override public void onSuccess(Message msg) { - finish.getAndSet(true); - assertThat(msg.getTopic()).isEqualTo(topic); - assertThat(msg.getBody()).isEqualTo(new byte[] {'a'}); - } - - @Override public void onException(Throwable e) { - finish.set(true); - assert false; - } - }, 3000); - } - - public static TopicRouteData createTopicRoute() { - TopicRouteData topicRouteData = new TopicRouteData(); - - topicRouteData.setFilterServerTable(new HashMap>()); - List brokerDataList = new ArrayList(); - BrokerData brokerData = new BrokerData(); - brokerData.setBrokerName("BrokerA"); - brokerData.setCluster("DefaultCluster"); - HashMap brokerAddrs = new HashMap(); - brokerAddrs.put(0L, "127.0.0.1:10911"); - brokerData.setBrokerAddrs(brokerAddrs); - brokerDataList.add(brokerData); - topicRouteData.setBrokerDatas(brokerDataList); - - List queueDataList = new ArrayList(); - QueueData queueData = new QueueData(); - queueData.setBrokerName("BrokerA"); - queueData.setPerm(6); - queueData.setReadQueueNums(3); - queueData.setWriteQueueNums(4); - queueData.setTopicSynFlag(0); - queueDataList.add(queueData); - topicRouteData.setQueueDatas(queueDataList); - return topicRouteData; - } - - private SendResult createSendResult(SendStatus sendStatus, String brokerName) { - SendResult sendResult = new SendResult(); - sendResult.setMsgId("123"); - sendResult.setOffsetMsgId("123"); - sendResult.setQueueOffset(456); - sendResult.setSendStatus(sendStatus); - sendResult.setRegionId("HZ"); - MessageQueue mq = new MessageQueue(); - mq.setTopic(topic); - mq.setBrokerName(brokerName); - mq.setQueueId(0); - sendResult.setMessageQueue(mq); - return sendResult; - } - - private Message createRequestMessage(String topic, String clusterName) { - Message requestMessage = new Message(); - Map map = new HashMap(); - map.put(DeFiBusConstant.PROPERTY_MESSAGE_REPLY_TO, "127.0.0.1"); - map.put(DeFiBusConstant.PROPERTY_MESSAGE_CLUSTER, clusterName); - map.put(DeFiBusConstant.PROPERTY_MESSAGE_TTL, "3000"); - MessageAccessor.setProperties(requestMessage, map); - requestMessage.setTopic(topic); - requestMessage.setBody(new byte[] {'a'}); - return requestMessage; - } +// @Spy +// private DeFiBusClientInstance deFiBusClientInstance = DeFiBusClientManager.getInstance().getAndCreateDeFiBusClientInstance(new ClientConfig(), null); +// @Mock +// private DeFiBusClientAPIImpl mqClientAPI; +// +// private DeFiBusProducer producer; +// private Message message; +// private Message zeroMsg; +// private Message bigMessage; +// private String topic = "FooBar"; +// private String producerGroupPrefix = "FooBar_PID"; +// private String clusterName = "DefaultCluster"; +// +// @Before +// public void init() throws Exception { +// String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis(); +// DeFiBusClientConfig clientConfig = new DeFiBusClientConfig(); +// clientConfig.setProducerGroup(producerGroupTemp); +// producer = new DeFiBusProducer(clientConfig); +// producer.setNamesrvAddr("127.0.0.1:9876"); +// producer.getDefaultMQProducer().setCompressMsgBodyOverHowmuch(16); +// message = new Message(topic, new byte[] {'a'}); +// message.getUserProperty(""); +// zeroMsg = new Message(topic, new byte[] {}); +// zeroMsg.getUserProperty(""); +// bigMessage = new Message(topic, "This is a very huge message!".getBytes()); +// bigMessage.getUserProperty(""); +// +// producer.start(); +// +// Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory"); +// field.setAccessible(true); +// field.set(producer.getDefaultMQProducer().getDefaultMQProducerImpl(), deFiBusClientInstance); +// +// field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); +// field.setAccessible(true); +// field.set(deFiBusClientInstance, mqClientAPI); +// +// producer.getDefaultMQProducer().getDefaultMQProducerImpl().getmQClientFactory() +// .registerProducer(producerGroupTemp, producer.getDefaultMQProducer().getDefaultMQProducerImpl()); +// +// when(mqClientAPI.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), +// nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) +// .thenAnswer(new Answer() { +// @Override public Object answer(InvocationOnMock invocation) throws Throwable { +// String brokerName = invocation.getArgument(1); +// CommunicationMode communicationMode = invocation.getArgument(5); +// SendCallback callback = invocation.getArgument(6); +// SendResult sendResult = createSendResult(SendStatus.SEND_OK, brokerName); +// switch (communicationMode) { +// case SYNC: +// return sendResult; +// case ASYNC: +// case ONEWAY: +// if (callback != null) { +// callback.onSuccess(sendResult); +// } +// } +// return null; +// } +// }); +// } +// +// @After +// public void terminate() { +// producer.shutdown(); +// } +// +// @Test +// public void testSendMessage_ZeroMessage() throws InterruptedException, RemotingException, MQBrokerException { +// try { +// producer.publish(zeroMsg); +// } catch (MQClientException e) { +// assertThat(e).hasMessageContaining("message body length is zero"); +// } +// } +// +// @Test +// public void testSendMessage_NoNameSrv() throws RemotingException, InterruptedException { +// try { +// producer.publish(message); +// } catch (MQClientException e) { +// assertThat(e).hasMessageContaining("No name server address"); +// } +// } +// +// @Test +// public void testSendMessage_NoRoute() throws RemotingException, InterruptedException { +// try { +// producer.publish(message); +// } catch (MQClientException e) { +// assertThat(e).hasMessageContaining("No route info of this topic"); +// } +// } +// +// @Test +// public void testSendMessageAsync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { +// final CountDownLatch countDownLatch = new CountDownLatch(1); +// final AtomicInteger success = new AtomicInteger(0); +// when(mqClientAPI.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); +// producer.publish(message, new SendCallback() { +// @Override +// public void onSuccess(SendResult sendResult) { +// assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK); +// assertThat(sendResult.getOffsetMsgId()).isEqualTo("123"); +// assertThat(sendResult.getQueueOffset()).isEqualTo(456L); +// success.getAndIncrement(); +// countDownLatch.countDown(); +// } +// +// @Override +// public void onException(Throwable e) { +// countDownLatch.countDown(); +// } +// }); +// long timeout = producer.getDefaultMQProducer().getSendMsgTimeout(); +// countDownLatch.await(timeout, TimeUnit.MILLISECONDS); +// assertThat(success.get()).isEqualTo(1); +// } +// +// @Test +// public void testSendMessageAsync_Exception() throws RemotingException, MQClientException, InterruptedException, MQBrokerException { +// final CountDownLatch countDownLatch = new CountDownLatch(1); +// final AtomicBoolean success = new AtomicBoolean(true); +// when(mqClientAPI.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); +// when(mqClientAPI.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), +// nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) +// .thenAnswer(new Answer() { +// @Override public Object answer(InvocationOnMock invocation) throws Throwable { +// SendCallback callback = invocation.getArgument(6); +// if (callback != null) { +// callback.onException(new Exception("test send exception")); +// } +// return null; +// } +// }); +// producer.publish(message, new SendCallback() { +// @Override +// public void onSuccess(SendResult sendResult) { +// success.set(true); +// countDownLatch.countDown(); +// } +// +// @Override +// public void onException(Throwable e) { +// success.set(false); +// countDownLatch.countDown(); +// assertThat(e).hasMessage("test send exception"); +// } +// }); +// long timeout = producer.getDefaultMQProducer().getSendMsgTimeout(); +// countDownLatch.await(timeout, TimeUnit.MILLISECONDS); +// assertThat(success.get()).isEqualTo(false); +// } +// +// @Test +// public void testRequest_Timeout() throws RemotingException, MQClientException, InterruptedException, MQBrokerException { +// when(mqClientAPI.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); +// when(deFiBusClientInstance.getTopicRouteTable()).thenReturn(new ConcurrentHashMap()); +// Message replyMsg = producer.request(createRequestMessage(topic, clusterName), 3000); +// assertThat(replyMsg).isNull(); +// } +// +// @Test +// public void testRequest_Success() throws RemotingException, MQClientException, InterruptedException, MQBrokerException { +// when(mqClientAPI.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); +// when(deFiBusClientInstance.getTopicRouteTable()).thenReturn(new ConcurrentHashMap()); +// final AtomicBoolean finish = new AtomicBoolean(false); +// new Thread(new Runnable() { +// @Override public void run() { +// ConcurrentHashMap responseMap = ResponseTable.getRrResponseFurtureConcurrentHashMap(); +// assertThat(responseMap).isNotNull(); +// while (!finish.get()) { +// try { +// Thread.sleep(10); +// } catch (InterruptedException e) { +// } +// for (Map.Entry entry : responseMap.entrySet()) { +// RRResponseFuture future = entry.getValue(); +// future.putResponse(createRequestMessage(topic, clusterName)); +// } +// } +// } +// }).start(); +// Message replyMsg = producer.request(createRequestMessage(topic, clusterName), 3000); +// finish.getAndSet(true); +// assertThat(replyMsg.getTopic()).isEqualTo(topic); +// assertThat(replyMsg.getBody()).isEqualTo(new byte[] {'a'}); +// } +// +// @Test +// public void testRequestAsync_Success() throws RemotingException, MQClientException, InterruptedException, MQBrokerException { +// when(mqClientAPI.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); +// when(deFiBusClientInstance.getTopicRouteTable()).thenReturn(new ConcurrentHashMap()); +// final AtomicBoolean finish = new AtomicBoolean(false); +// new Thread(new Runnable() { +// @Override public void run() { +// ConcurrentHashMap responseMap = ResponseTable.getRrResponseFurtureConcurrentHashMap(); +// assertThat(responseMap).isNotNull(); +// while (!finish.get()) { +// try { +// Thread.sleep(10); +// } catch (InterruptedException e) { +// } +// for (Map.Entry entry : responseMap.entrySet()) { +// RRResponseFuture future = entry.getValue(); +// future.putResponse(createRequestMessage(topic, clusterName)); +// } +// } +// } +// }).start(); +// producer.request(createRequestMessage(topic, clusterName), new RRCallback() { +// @Override public void onSuccess(Message msg) { +// finish.getAndSet(true); +// assertThat(msg.getTopic()).isEqualTo(topic); +// assertThat(msg.getBody()).isEqualTo(new byte[] {'a'}); +// } +// +// @Override public void onException(Throwable e) { +// finish.set(true); +// assert false; +// } +// }, 3000); +// } +// +// public static TopicRouteData createTopicRoute() { +// TopicRouteData topicRouteData = new TopicRouteData(); +// +// topicRouteData.setFilterServerTable(new HashMap>()); +// List brokerDataList = new ArrayList(); +// BrokerData brokerData = new BrokerData(); +// brokerData.setBrokerName("BrokerA"); +// brokerData.setCluster("DefaultCluster"); +// HashMap brokerAddrs = new HashMap(); +// brokerAddrs.put(0L, "127.0.0.1:10911"); +// brokerData.setBrokerAddrs(brokerAddrs); +// brokerDataList.add(brokerData); +// topicRouteData.setBrokerDatas(brokerDataList); +// +// List queueDataList = new ArrayList(); +// QueueData queueData = new QueueData(); +// queueData.setBrokerName("BrokerA"); +// queueData.setPerm(6); +// queueData.setReadQueueNums(3); +// queueData.setWriteQueueNums(4); +// queueData.setTopicSynFlag(0); +// queueDataList.add(queueData); +// topicRouteData.setQueueDatas(queueDataList); +// return topicRouteData; +// } +// +// private SendResult createSendResult(SendStatus sendStatus, String brokerName) { +// SendResult sendResult = new SendResult(); +// sendResult.setMsgId("123"); +// sendResult.setOffsetMsgId("123"); +// sendResult.setQueueOffset(456); +// sendResult.setSendStatus(sendStatus); +// sendResult.setRegionId("HZ"); +// MessageQueue mq = new MessageQueue(); +// mq.setTopic(topic); +// mq.setBrokerName(brokerName); +// mq.setQueueId(0); +// sendResult.setMessageQueue(mq); +// return sendResult; +// } +// +// private Message createRequestMessage(String topic, String clusterName) { +// Message requestMessage = new Message(); +// Map map = new HashMap(); +// map.put(DeFiBusConstant.PROPERTY_MESSAGE_REPLY_TO, "127.0.0.1"); +// map.put(DeFiBusConstant.PROPERTY_MESSAGE_CLUSTER, clusterName); +// map.put(DeFiBusConstant.PROPERTY_MESSAGE_TTL, "3000"); +// MessageAccessor.setProperties(requestMessage, map); +// requestMessage.setTopic(topic); +// requestMessage.setBody(new byte[] {'a'}); +// return requestMessage; +// } }