From f7f5e3a6f5c02a74e9437656fde48045672a7dbc Mon Sep 17 00:00:00 2001 From: ShannonDing Date: Tue, 14 Jan 2020 11:48:48 +0800 Subject: [PATCH 1/3] feat(version): add maxConsumerTimes to support higher client version --- include/DefaultMQPushConsumer.h | 3 ++ src/MQClientAPIImpl.cpp | 5 +++ src/MQClientAPIImpl.h | 1 + .../ConsumeMessageConcurrentlyService.cpp | 3 ++ src/consumer/DefaultMQPushConsumer.cpp | 19 +++++++++- src/protocol/CommandHeader.cpp | 8 +++- src/protocol/CommandHeader.h | 4 ++ test/src/MQClientAPIImpTest.cpp | 20 ++++++++++ test/src/protocol/CommandHeaderTest.cpp | 37 +++++++++++++++++++ 9 files changed, 96 insertions(+), 4 deletions(-) diff --git a/include/DefaultMQPushConsumer.h b/include/DefaultMQPushConsumer.h index 2ff762054..7f69258c8 100644 --- a/include/DefaultMQPushConsumer.h +++ b/include/DefaultMQPushConsumer.h @@ -111,6 +111,8 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer : public MQConsumer { */ void setConsumeThreadCount(int threadCount); int getConsumeThreadCount() const; + void setMaxReconsumeTimes(int maxReconsumeTimes); + int getMaxReconsumeTimes() const; /* set pullMsg thread count, default value is cpu cores @@ -144,6 +146,7 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer : public MQConsumer { MQMessageListener* m_pMessageListener; int m_consumeMessageBatchMaxSize; int m_maxMsgCacheSize; + int m_maxReconsumeTimes = -1; boost::asio::io_service m_async_ioService; boost::scoped_ptr m_async_service_thread; diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp index 26ab1b0b0..f2829a9ae 100644 --- a/src/MQClientAPIImpl.cpp +++ b/src/MQClientAPIImpl.cpp @@ -836,11 +836,16 @@ void MQClientAPIImpl::consumerSendMessageBack(const string addr, const string& consumerGroup, int delayLevel, int timeoutMillis, + int maxReconsumeTimes, const SessionCredentials& sessionCredentials) { ConsumerSendMsgBackRequestHeader* pRequestHeader = new ConsumerSendMsgBackRequestHeader(); pRequestHeader->group = consumerGroup; pRequestHeader->offset = msg.getCommitLogOffset(); pRequestHeader->delayLevel = delayLevel; + pRequestHeader->unitMode = false; + pRequestHeader->originTopic = msg.getTopic(); + pRequestHeader->originMsgId = msg.getMsgId(); + pRequestHeader->maxReconsumeTimes = maxReconsumeTimes; // string addr = socketAddress2IPPort(msg.getStoreHost()); RemotingCommand request(CONSUMER_SEND_MSG_BACK, pRequestHeader); diff --git a/src/MQClientAPIImpl.h b/src/MQClientAPIImpl.h index 9f08dac3c..3fbf566b7 100644 --- a/src/MQClientAPIImpl.h +++ b/src/MQClientAPIImpl.h @@ -174,6 +174,7 @@ class MQClientAPIImpl { const string& consumerGroup, int delayLevel, int timeoutMillis, + int maxReconsumeTimes, const SessionCredentials& sessionCredentials); virtual void lockBatchMQ(const string& addr, diff --git a/src/consumer/ConsumeMessageConcurrentlyService.cpp b/src/consumer/ConsumeMessageConcurrentlyService.cpp index 371faa26f..9c3a05b8c 100644 --- a/src/consumer/ConsumeMessageConcurrentlyService.cpp +++ b/src/consumer/ConsumeMessageConcurrentlyService.cpp @@ -194,6 +194,9 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptrgetConsumeType() == CONSUME_PASSIVELY) { string brokerName = request->m_messageQueue.getBrokerName(); + if (m_pConsumer->isUseNameSpaceMode()) { + MessageAccessor::withNameSpace(msgs[i], m_pConsumer->getNameSpace()); + } if (!m_pConsumer->sendMessageBack(msgs[i], 0, brokerName)) { LOG_WARN("Send message back fail, MQ is:%s, its msgId is:%s, index is:%d, re-consume times is:%d", (request->m_messageQueue).toString().c_str(), msgs[i].getMsgId().c_str(), i, diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp index df77cacc6..8e2541ea7 100644 --- a/src/consumer/DefaultMQPushConsumer.cpp +++ b/src/consumer/DefaultMQPushConsumer.cpp @@ -258,8 +258,8 @@ bool DefaultMQPushConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel, s else brokerAddr = socketAddress2IPPort(msg.getStoreHost()); try { - getFactory()->getMQClientAPIImpl()->consumerSendMessageBack(brokerAddr, msg, getGroupName(), delayLevel, 3000, - getSessionCredentials()); + getFactory()->getMQClientAPIImpl()->consumerSendMessageBack(brokerAddr, msg, getGroupName(), delayLevel, + getMaxReconsumeTimes(), 3000, getSessionCredentials()); } catch (MQException& e) { LOG_ERROR(e.what()); return false; @@ -918,6 +918,21 @@ void DefaultMQPushConsumer::setConsumeThreadCount(int threadCount) { int DefaultMQPushConsumer::getConsumeThreadCount() const { return m_consumeThreadCount; } +void DefaultMQPushConsumer::setMaxReconsumeTimes(int maxReconsumeTimes) { + if (maxReconsumeTimes > 0) { + m_maxReconsumeTimes = maxReconsumeTimes; + } else { + LOG_ERROR("set maxReconsumeTimes with invalid value"); + } +} + +int DefaultMQPushConsumer::getMaxReconsumeTimes() const { + if (m_maxReconsumeTimes >= 0) { + return m_maxReconsumeTimes; + } + // return 16 as default; + return 16; +} void DefaultMQPushConsumer::setPullMsgThreadPoolCount(int threadCount) { m_pullMsgThreadPoolNum = threadCount; diff --git a/src/protocol/CommandHeader.cpp b/src/protocol/CommandHeader.cpp index 1360d0490..30dcf59c5 100644 --- a/src/protocol/CommandHeader.cpp +++ b/src/protocol/CommandHeader.cpp @@ -492,16 +492,20 @@ void ConsumerSendMsgBackRequestHeader::Encode(Json::Value& outData) { outData["group"] = group; outData["delayLevel"] = delayLevel; outData["offset"] = UtilAll::to_string(offset); -#ifdef ONS + outData["unitMode"] = UtilAll::to_string(unitMode); outData["originMsgId"] = originMsgId; outData["originTopic"] = originTopic; -#endif + outData["maxReconsumeTimes"] = maxReconsumeTimes; } void ConsumerSendMsgBackRequestHeader::SetDeclaredFieldOfCommandHeader(map& requestMap) { requestMap.insert(pair("group", group)); requestMap.insert(pair("delayLevel", UtilAll::to_string(delayLevel))); requestMap.insert(pair("offset", UtilAll::to_string(offset))); + requestMap.insert(pair("unitMode", UtilAll::to_string(unitMode))); + requestMap.insert(pair("originMsgId", originMsgId)); + requestMap.insert(pair("originTopic", originTopic)); + requestMap.insert(pair("maxReconsumeTimes", UtilAll::to_string(maxReconsumeTimes))); } //& cids) { diff --git a/src/protocol/CommandHeader.h b/src/protocol/CommandHeader.h index cedaeb97a..bf74e0709 100644 --- a/src/protocol/CommandHeader.h +++ b/src/protocol/CommandHeader.h @@ -456,6 +456,10 @@ class ConsumerSendMsgBackRequestHeader : public CommandHeader { string group; int delayLevel; int64 offset; + bool unitMode = false; + string originMsgId; + string originTopic; + int maxReconsumeTimes = 16; }; //GetGtestMockClientAPIImpl(); + Mock::AllowLeak(impl); + MockTcpRemotingClient* pClient = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockRemotingClient(); + Mock::AllowLeak(pClient); + RemotingCommand* pCommandFailed = new RemotingCommand(SYSTEM_ERROR, nullptr); + RemotingCommand* pCommandSuccuss = new RemotingCommand(SUCCESS_VALUE, nullptr); + EXPECT_CALL(*pClient, invokeSync(_, _, _)) + .Times(3) + .WillOnce(Return(nullptr)) + .WillOnce(Return(pCommandFailed)) + .WillOnce(Return(pCommandSuccuss)); + EXPECT_ANY_THROW(impl->consumerSendMessageBack("127.0.0.0:10911",msg, "testGroup", 0, 1000,16, sc)); + EXPECT_ANY_THROW(impl->consumerSendMessageBack("127.0.0.0:10911",msg, "testGroup", 0, 1000,16, sc)); + EXPECT_NO_THROW(impl->consumerSendMessageBack("127.0.0.0:10911",msg, "testGroup", 0, 1000,16, sc)); +} + int main(int argc, char* argv[]) { InitGoogleMock(&argc, argv); testing::GTEST_FLAG(filter) = "MQClientAPIImplTest.*"; diff --git a/test/src/protocol/CommandHeaderTest.cpp b/test/src/protocol/CommandHeaderTest.cpp index ccfa5ba6b..a3330f62e 100644 --- a/test/src/protocol/CommandHeaderTest.cpp +++ b/test/src/protocol/CommandHeaderTest.cpp @@ -67,6 +67,43 @@ using rocketmq::UnregisterClientRequestHeader; using rocketmq::UpdateConsumerOffsetRequestHeader; using rocketmq::ViewMessageRequestHeader; +TEST(commandHeader, ConsumerSendMsgBackRequestHeader) { + string group = "testGroup"; + int delayLevel = 2; + int64 offset = 3027; + bool unitMode = true; + string originMsgId = "testOriginMsgId"; + string originTopic = "testTopic"; + int maxReconsumeTimes = 12; + ConsumerSendMsgBackRequestHeader header; + header.group = group; + header.delayLevel = delayLevel; + header.offset = offset; + header.unitMode = unitMode; + header.originMsgId = originMsgId; + header.originTopic = originTopic; + header.maxReconsumeTimes = maxReconsumeTimes; + map requestMap; + header.SetDeclaredFieldOfCommandHeader(requestMap); + EXPECT_EQ(requestMap["group"], group); + EXPECT_EQ(requestMap["delayLevel"], "2"); + EXPECT_EQ(requestMap["offset"], "3027"); + EXPECT_EQ(requestMap["unitMode"], "1"); + EXPECT_EQ(requestMap["originMsgId"], originMsgId); + EXPECT_EQ(requestMap["originTopic"], originTopic); + EXPECT_EQ(requestMap["maxReconsumeTimes"], "12"); + + Value outData; + header.Encode(outData); + EXPECT_EQ(outData["group"], group); + EXPECT_EQ(outData["delayLevel"], "2"); + EXPECT_EQ(outData["offset"], "3027"); + EXPECT_EQ(outData["unitMode"], "1"); + EXPECT_EQ(outData["originMsgId"], originMsgId); + EXPECT_EQ(outData["originTopic"], originTopic); + EXPECT_EQ(outData["maxReconsumeTimes"], "12"); +} + TEST(commandHeader, GetRouteInfoRequestHeader) { GetRouteInfoRequestHeader header("testTopic"); map requestMap; From db3f6bd379faa89d7531965b35c1606a7185d7c9 Mon Sep 17 00:00:00 2001 From: ShannonDing Date: Tue, 14 Jan 2020 11:49:33 +0800 Subject: [PATCH 2/3] feat(version): add maxConsumerTimes to support higher client version --- test/src/MQClientAPIImpTest.cpp | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/test/src/MQClientAPIImpTest.cpp b/test/src/MQClientAPIImpTest.cpp index 7797478a7..3b2ab0789 100644 --- a/test/src/MQClientAPIImpTest.cpp +++ b/test/src/MQClientAPIImpTest.cpp @@ -172,22 +172,22 @@ TEST(MQClientAPIImplTest, sendMessage) { } TEST(MQClientAPIImplTest, consumerSendMessageBack) { - SessionCredentials sc; - MQMessageExt msg; - MockMQClientAPIImpl* impl = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockClientAPIImpl(); - Mock::AllowLeak(impl); - MockTcpRemotingClient* pClient = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockRemotingClient(); - Mock::AllowLeak(pClient); - RemotingCommand* pCommandFailed = new RemotingCommand(SYSTEM_ERROR, nullptr); - RemotingCommand* pCommandSuccuss = new RemotingCommand(SUCCESS_VALUE, nullptr); - EXPECT_CALL(*pClient, invokeSync(_, _, _)) - .Times(3) - .WillOnce(Return(nullptr)) - .WillOnce(Return(pCommandFailed)) - .WillOnce(Return(pCommandSuccuss)); - EXPECT_ANY_THROW(impl->consumerSendMessageBack("127.0.0.0:10911",msg, "testGroup", 0, 1000,16, sc)); - EXPECT_ANY_THROW(impl->consumerSendMessageBack("127.0.0.0:10911",msg, "testGroup", 0, 1000,16, sc)); - EXPECT_NO_THROW(impl->consumerSendMessageBack("127.0.0.0:10911",msg, "testGroup", 0, 1000,16, sc)); + SessionCredentials sc; + MQMessageExt msg; + MockMQClientAPIImpl* impl = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockClientAPIImpl(); + Mock::AllowLeak(impl); + MockTcpRemotingClient* pClient = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockRemotingClient(); + Mock::AllowLeak(pClient); + RemotingCommand* pCommandFailed = new RemotingCommand(SYSTEM_ERROR, nullptr); + RemotingCommand* pCommandSuccuss = new RemotingCommand(SUCCESS_VALUE, nullptr); + EXPECT_CALL(*pClient, invokeSync(_, _, _)) + .Times(3) + .WillOnce(Return(nullptr)) + .WillOnce(Return(pCommandFailed)) + .WillOnce(Return(pCommandSuccuss)); + EXPECT_ANY_THROW(impl->consumerSendMessageBack("127.0.0.0:10911", msg, "testGroup", 0, 1000, 16, sc)); + EXPECT_ANY_THROW(impl->consumerSendMessageBack("127.0.0.0:10911", msg, "testGroup", 0, 1000, 16, sc)); + EXPECT_NO_THROW(impl->consumerSendMessageBack("127.0.0.0:10911", msg, "testGroup", 0, 1000, 16, sc)); } int main(int argc, char* argv[]) { From 4456300ef0110e86d7302d6d7b2c2befba75dcf8 Mon Sep 17 00:00:00 2001 From: ShannonDing Date: Tue, 14 Jan 2020 12:22:35 +0800 Subject: [PATCH 3/3] feat(version): add maxConsumerTimes to support higher client version --- test/src/protocol/CommandHeaderTest.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/src/protocol/CommandHeaderTest.cpp b/test/src/protocol/CommandHeaderTest.cpp index a3330f62e..6874adf2f 100644 --- a/test/src/protocol/CommandHeaderTest.cpp +++ b/test/src/protocol/CommandHeaderTest.cpp @@ -96,12 +96,12 @@ TEST(commandHeader, ConsumerSendMsgBackRequestHeader) { Value outData; header.Encode(outData); EXPECT_EQ(outData["group"], group); - EXPECT_EQ(outData["delayLevel"], "2"); + EXPECT_EQ(outData["delayLevel"], 2); EXPECT_EQ(outData["offset"], "3027"); EXPECT_EQ(outData["unitMode"], "1"); EXPECT_EQ(outData["originMsgId"], originMsgId); EXPECT_EQ(outData["originTopic"], originTopic); - EXPECT_EQ(outData["maxReconsumeTimes"], "12"); + EXPECT_EQ(outData["maxReconsumeTimes"], 12); } TEST(commandHeader, GetRouteInfoRequestHeader) {