diff --git a/include/DefaultMQPushConsumer.h b/include/DefaultMQPushConsumer.h index 2ff762054..7f69258c8 100644 --- a/include/DefaultMQPushConsumer.h +++ b/include/DefaultMQPushConsumer.h @@ -111,6 +111,8 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer : public MQConsumer { */ void setConsumeThreadCount(int threadCount); int getConsumeThreadCount() const; + void setMaxReconsumeTimes(int maxReconsumeTimes); + int getMaxReconsumeTimes() const; /* set pullMsg thread count, default value is cpu cores @@ -144,6 +146,7 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer : public MQConsumer { MQMessageListener* m_pMessageListener; int m_consumeMessageBatchMaxSize; int m_maxMsgCacheSize; + int m_maxReconsumeTimes = -1; boost::asio::io_service m_async_ioService; boost::scoped_ptr m_async_service_thread; diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp index 26ab1b0b0..f2829a9ae 100644 --- a/src/MQClientAPIImpl.cpp +++ b/src/MQClientAPIImpl.cpp @@ -836,11 +836,16 @@ void MQClientAPIImpl::consumerSendMessageBack(const string addr, const string& consumerGroup, int delayLevel, int timeoutMillis, + int maxReconsumeTimes, const SessionCredentials& sessionCredentials) { ConsumerSendMsgBackRequestHeader* pRequestHeader = new ConsumerSendMsgBackRequestHeader(); pRequestHeader->group = consumerGroup; pRequestHeader->offset = msg.getCommitLogOffset(); pRequestHeader->delayLevel = delayLevel; + pRequestHeader->unitMode = false; + pRequestHeader->originTopic = msg.getTopic(); + pRequestHeader->originMsgId = msg.getMsgId(); + pRequestHeader->maxReconsumeTimes = maxReconsumeTimes; // string addr = socketAddress2IPPort(msg.getStoreHost()); RemotingCommand request(CONSUMER_SEND_MSG_BACK, pRequestHeader); diff --git a/src/MQClientAPIImpl.h b/src/MQClientAPIImpl.h index 9f08dac3c..3fbf566b7 100644 --- a/src/MQClientAPIImpl.h +++ b/src/MQClientAPIImpl.h @@ -174,6 +174,7 @@ class MQClientAPIImpl { const string& consumerGroup, int delayLevel, int timeoutMillis, + int maxReconsumeTimes, const SessionCredentials& sessionCredentials); virtual void lockBatchMQ(const string& addr, diff --git a/src/consumer/ConsumeMessageConcurrentlyService.cpp b/src/consumer/ConsumeMessageConcurrentlyService.cpp index 371faa26f..9c3a05b8c 100644 --- a/src/consumer/ConsumeMessageConcurrentlyService.cpp +++ b/src/consumer/ConsumeMessageConcurrentlyService.cpp @@ -194,6 +194,9 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptrgetConsumeType() == CONSUME_PASSIVELY) { string brokerName = request->m_messageQueue.getBrokerName(); + if (m_pConsumer->isUseNameSpaceMode()) { + MessageAccessor::withNameSpace(msgs[i], m_pConsumer->getNameSpace()); + } if (!m_pConsumer->sendMessageBack(msgs[i], 0, brokerName)) { LOG_WARN("Send message back fail, MQ is:%s, its msgId is:%s, index is:%d, re-consume times is:%d", (request->m_messageQueue).toString().c_str(), msgs[i].getMsgId().c_str(), i, diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp index df77cacc6..8e2541ea7 100644 --- a/src/consumer/DefaultMQPushConsumer.cpp +++ b/src/consumer/DefaultMQPushConsumer.cpp @@ -258,8 +258,8 @@ bool DefaultMQPushConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel, s else brokerAddr = socketAddress2IPPort(msg.getStoreHost()); try { - getFactory()->getMQClientAPIImpl()->consumerSendMessageBack(brokerAddr, msg, getGroupName(), delayLevel, 3000, - getSessionCredentials()); + getFactory()->getMQClientAPIImpl()->consumerSendMessageBack(brokerAddr, msg, getGroupName(), delayLevel, + getMaxReconsumeTimes(), 3000, getSessionCredentials()); } catch (MQException& e) { LOG_ERROR(e.what()); return false; @@ -918,6 +918,21 @@ void DefaultMQPushConsumer::setConsumeThreadCount(int threadCount) { int DefaultMQPushConsumer::getConsumeThreadCount() const { return m_consumeThreadCount; } +void DefaultMQPushConsumer::setMaxReconsumeTimes(int maxReconsumeTimes) { + if (maxReconsumeTimes > 0) { + m_maxReconsumeTimes = maxReconsumeTimes; + } else { + LOG_ERROR("set maxReconsumeTimes with invalid value"); + } +} + +int DefaultMQPushConsumer::getMaxReconsumeTimes() const { + if (m_maxReconsumeTimes >= 0) { + return m_maxReconsumeTimes; + } + // return 16 as default; + return 16; +} void DefaultMQPushConsumer::setPullMsgThreadPoolCount(int threadCount) { m_pullMsgThreadPoolNum = threadCount; diff --git a/src/protocol/CommandHeader.cpp b/src/protocol/CommandHeader.cpp index 1360d0490..30dcf59c5 100644 --- a/src/protocol/CommandHeader.cpp +++ b/src/protocol/CommandHeader.cpp @@ -492,16 +492,20 @@ void ConsumerSendMsgBackRequestHeader::Encode(Json::Value& outData) { outData["group"] = group; outData["delayLevel"] = delayLevel; outData["offset"] = UtilAll::to_string(offset); -#ifdef ONS + outData["unitMode"] = UtilAll::to_string(unitMode); outData["originMsgId"] = originMsgId; outData["originTopic"] = originTopic; -#endif + outData["maxReconsumeTimes"] = maxReconsumeTimes; } void ConsumerSendMsgBackRequestHeader::SetDeclaredFieldOfCommandHeader(map& requestMap) { requestMap.insert(pair("group", group)); requestMap.insert(pair("delayLevel", UtilAll::to_string(delayLevel))); requestMap.insert(pair("offset", UtilAll::to_string(offset))); + requestMap.insert(pair("unitMode", UtilAll::to_string(unitMode))); + requestMap.insert(pair("originMsgId", originMsgId)); + requestMap.insert(pair("originTopic", originTopic)); + requestMap.insert(pair("maxReconsumeTimes", UtilAll::to_string(maxReconsumeTimes))); } //& cids) { diff --git a/src/protocol/CommandHeader.h b/src/protocol/CommandHeader.h index cedaeb97a..bf74e0709 100644 --- a/src/protocol/CommandHeader.h +++ b/src/protocol/CommandHeader.h @@ -456,6 +456,10 @@ class ConsumerSendMsgBackRequestHeader : public CommandHeader { string group; int delayLevel; int64 offset; + bool unitMode = false; + string originMsgId; + string originTopic; + int maxReconsumeTimes = 16; }; //GetGtestMockClientAPIImpl(); + Mock::AllowLeak(impl); + MockTcpRemotingClient* pClient = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockRemotingClient(); + Mock::AllowLeak(pClient); + RemotingCommand* pCommandFailed = new RemotingCommand(SYSTEM_ERROR, nullptr); + RemotingCommand* pCommandSuccuss = new RemotingCommand(SUCCESS_VALUE, nullptr); + EXPECT_CALL(*pClient, invokeSync(_, _, _)) + .Times(3) + .WillOnce(Return(nullptr)) + .WillOnce(Return(pCommandFailed)) + .WillOnce(Return(pCommandSuccuss)); + EXPECT_ANY_THROW(impl->consumerSendMessageBack("127.0.0.0:10911", msg, "testGroup", 0, 1000, 16, sc)); + EXPECT_ANY_THROW(impl->consumerSendMessageBack("127.0.0.0:10911", msg, "testGroup", 0, 1000, 16, sc)); + EXPECT_NO_THROW(impl->consumerSendMessageBack("127.0.0.0:10911", msg, "testGroup", 0, 1000, 16, sc)); +} + int main(int argc, char* argv[]) { InitGoogleMock(&argc, argv); testing::GTEST_FLAG(filter) = "MQClientAPIImplTest.*"; diff --git a/test/src/protocol/CommandHeaderTest.cpp b/test/src/protocol/CommandHeaderTest.cpp index ccfa5ba6b..6874adf2f 100644 --- a/test/src/protocol/CommandHeaderTest.cpp +++ b/test/src/protocol/CommandHeaderTest.cpp @@ -67,6 +67,43 @@ using rocketmq::UnregisterClientRequestHeader; using rocketmq::UpdateConsumerOffsetRequestHeader; using rocketmq::ViewMessageRequestHeader; +TEST(commandHeader, ConsumerSendMsgBackRequestHeader) { + string group = "testGroup"; + int delayLevel = 2; + int64 offset = 3027; + bool unitMode = true; + string originMsgId = "testOriginMsgId"; + string originTopic = "testTopic"; + int maxReconsumeTimes = 12; + ConsumerSendMsgBackRequestHeader header; + header.group = group; + header.delayLevel = delayLevel; + header.offset = offset; + header.unitMode = unitMode; + header.originMsgId = originMsgId; + header.originTopic = originTopic; + header.maxReconsumeTimes = maxReconsumeTimes; + map requestMap; + header.SetDeclaredFieldOfCommandHeader(requestMap); + EXPECT_EQ(requestMap["group"], group); + EXPECT_EQ(requestMap["delayLevel"], "2"); + EXPECT_EQ(requestMap["offset"], "3027"); + EXPECT_EQ(requestMap["unitMode"], "1"); + EXPECT_EQ(requestMap["originMsgId"], originMsgId); + EXPECT_EQ(requestMap["originTopic"], originTopic); + EXPECT_EQ(requestMap["maxReconsumeTimes"], "12"); + + Value outData; + header.Encode(outData); + EXPECT_EQ(outData["group"], group); + EXPECT_EQ(outData["delayLevel"], 2); + EXPECT_EQ(outData["offset"], "3027"); + EXPECT_EQ(outData["unitMode"], "1"); + EXPECT_EQ(outData["originMsgId"], originMsgId); + EXPECT_EQ(outData["originTopic"], originTopic); + EXPECT_EQ(outData["maxReconsumeTimes"], 12); +} + TEST(commandHeader, GetRouteInfoRequestHeader) { GetRouteInfoRequestHeader header("testTopic"); map requestMap;