From 204c2d3a5341ea4c133fcf8becd03f129038125e Mon Sep 17 00:00:00 2001 From: ShannonDing Date: Sun, 15 Mar 2020 01:00:02 +0800 Subject: [PATCH 1/4] feat(trace): add trace message for sync producer. --- include/DefaultMQProducer.h | 2 + src/common/DefaultMQClient.cpp | 10 ++ src/common/NameSpaceUtil.cpp | 5 + src/include/DefaultMQClient.h | 5 + src/producer/DefaultMQProducer.cpp | 7 +- src/producer/DefaultMQProducerImpl.cpp | 114 ++++++++++++++++- src/producer/DefaultMQProducerImpl.h | 17 +++ src/producer/SendMessageHookImpl.cpp | 104 +++++++++++++++ src/producer/SendMessageHookImpl.h | 32 +++++ src/trace/ConsumeMessageContext.h | 29 +++++ src/trace/ConsumeMessageHook.h | 31 +++++ src/trace/SendMessageContext.cpp | 115 +++++++++++++++++ src/trace/SendMessageContext.h | 96 ++++++++++++++ src/{include => trace}/SendMessageHook.h | 34 ++--- src/trace/TraceBean.cpp | 121 ++++++++++++++++++ src/trace/TraceBean.h | 89 +++++++++++++ src/trace/TraceContant.cpp | 30 +++++ src/trace/TraceContant.h | 47 +++++++ src/trace/TraceContext.cpp | 113 ++++++++++++++++ src/trace/TraceContext.h | 88 +++++++++++++ src/trace/TraceTransferBean.cpp | 38 ++++++ src/trace/TraceTransferBean.h | 40 ++++++ src/trace/TraceUtil.cpp | 117 +++++++++++++++++ src/trace/TraceUtil.h | 33 +++++ test/src/common/NameSpaceUtilTest.cpp | 2 + .../producer/DefaultMQProducerImplTest.cpp | 45 ++++++- test/src/trace/TraceBeanTest.cpp | 77 +++++++++++ test/src/trace/TraceUtilTest.cpp | 74 +++++++++++ 28 files changed, 1478 insertions(+), 37 deletions(-) create mode 100644 src/producer/SendMessageHookImpl.cpp create mode 100644 src/producer/SendMessageHookImpl.h create mode 100644 src/trace/ConsumeMessageContext.h create mode 100644 src/trace/ConsumeMessageHook.h create mode 100644 src/trace/SendMessageContext.cpp create mode 100644 src/trace/SendMessageContext.h rename src/{include => trace}/SendMessageHook.h (54%) create mode 100644 src/trace/TraceBean.cpp create mode 100644 src/trace/TraceBean.h create mode 100644 src/trace/TraceContant.cpp create mode 100644 src/trace/TraceContant.h create mode 100644 src/trace/TraceContext.cpp create mode 100644 src/trace/TraceContext.h create mode 100644 src/trace/TraceTransferBean.cpp create mode 100644 src/trace/TraceTransferBean.h create mode 100644 src/trace/TraceUtil.cpp create mode 100644 src/trace/TraceUtil.h create mode 100644 test/src/trace/TraceBeanTest.cpp create mode 100644 test/src/trace/TraceUtilTest.cpp diff --git a/include/DefaultMQProducer.h b/include/DefaultMQProducer.h index 991d27de5..6ca52f9ae 100644 --- a/include/DefaultMQProducer.h +++ b/include/DefaultMQProducer.h @@ -136,6 +136,8 @@ class ROCKETMQCLIENT_API DefaultMQProducer { void setUnitName(std::string unitName); const std::string& getUnitName() const; + void setMessageTrace(bool messageTrace); + bool getMessageTrace() const; private: DefaultMQProducerImpl* impl; diff --git a/src/common/DefaultMQClient.cpp b/src/common/DefaultMQClient.cpp index 269ce6185..c271212ea 100644 --- a/src/common/DefaultMQClient.cpp +++ b/src/common/DefaultMQClient.cpp @@ -46,6 +46,7 @@ DefaultMQClient::DefaultMQClient() { m_tcpConnectTimeout = 3000; // 3s m_tcpTransportTryLockTimeout = 3; // 3s m_unitName = ""; + m_messageTrace = false; } DefaultMQClient::~DefaultMQClient() {} @@ -216,6 +217,14 @@ const string& DefaultMQClient::getUnitName() const { return m_unitName; } +bool DefaultMQClient::getMessageTrace() const { + return m_messageTrace; +} + +void DefaultMQClient::setMessageTrace(bool mMessageTrace) { + m_messageTrace = mMessageTrace; +} + void DefaultMQClient::setSessionCredentials(const string& input_accessKey, const string& input_secretKey, const string& input_onsChannel) { @@ -239,6 +248,7 @@ void DefaultMQClient::showClientConfigs() { LOG_WARN("PullThreadNum:%d", m_pullThreadNum); LOG_WARN("TcpConnectTimeout:%lld ms", m_tcpConnectTimeout); LOG_WARN("TcpTransportTryLockTimeout:%lld s", m_tcpTransportTryLockTimeout); + LOG_WARN("OpenMessageTrace:%s", m_messageTrace ? "true" : "false"); // LOG_WARN("*****************************************************************************"); } //= ns.length() && source.find(ns) != string::npos) { return true; } diff --git a/src/include/DefaultMQClient.h b/src/include/DefaultMQClient.h index a2e5ce56f..32b6aed80 100644 --- a/src/include/DefaultMQClient.h +++ b/src/include/DefaultMQClient.h @@ -170,6 +170,10 @@ class DefaultMQClient { virtual void setFactory(MQClientFactory*); + bool getMessageTrace() const; + + void setMessageTrace(bool mMessageTrace); + protected: virtual void start(); virtual void shutdown(); @@ -191,6 +195,7 @@ class DefaultMQClient { std::string m_unitName; SessionCredentials m_SessionCredentials; + bool m_messageTrace; }; //getUnitName(); } - +void DefaultMQProducer::setMessageTrace(bool messageTrace) { + impl->setMessageTrace(messageTrace); +} +bool DefaultMQProducer::getMessageTrace() const { + return impl->getMessageTrace(); +} SendResult DefaultMQProducer::send(MQMessage& msg, bool bSelectActiveBroker) { return impl->send(msg, bSelectActiveBroker); } diff --git a/src/producer/DefaultMQProducerImpl.cpp b/src/producer/DefaultMQProducerImpl.cpp index 037f81aa8..79fc3aea3 100644 --- a/src/producer/DefaultMQProducerImpl.cpp +++ b/src/producer/DefaultMQProducerImpl.cpp @@ -24,16 +24,15 @@ #include "CommandHeader.h" #include "CommunicationMode.h" #include "Logging.h" -#include "MQClientAPIImpl.h" #include "MQClientException.h" #include "MQClientFactory.h" -#include "MQClientManager.h" #include "MQDecoder.h" -#include "MQProtos.h" #include "MessageAccessor.h" #include "NameSpaceUtil.h" +#include "SendMessageHookImpl.h" #include "StringIdMaker.h" #include "TopicPublishInfo.h" +#include "TraceContant.h" #include "Validators.h" namespace rocketmq { @@ -46,7 +45,8 @@ DefaultMQProducerImpl::DefaultMQProducerImpl(const string& groupname) // m_retryAnotherBrokerWhenNotStoreOK(false), m_compressLevel(5), m_retryTimes(5), - m_retryTimes4Async(1) { + m_retryTimes4Async(1), + m_trace_ioService_work(m_trace_ioService) { //unregisterProducer(this); getFactory()->shutdown(); m_serviceState = SHUTDOWN_ALREADY; @@ -432,6 +438,7 @@ SendResult DefaultMQProducerImpl::sendKernelImpl(MQMessage& msg, } if (!brokerAddr.empty()) { + boost::scoped_ptr pSendMesgContext(new SendMessageContext()); try { bool isBatchMsg = std::type_index(typeid(msg)) == std::type_index(typeid(BatchMessage)); // msgId is produced by client, offsetMsgId produced by broker. (same with java sdk) @@ -444,7 +451,28 @@ SendResult DefaultMQProducerImpl::sendKernelImpl(MQMessage& msg, } LOG_DEBUG("produce before:%s to %s", msg.toString().c_str(), mq.toString().c_str()); + if (!isMessageTraceTopic(msg.getTopic()) && getMessageTrace() && hasSendMessageHook()) { + pSendMesgContext.reset(new SendMessageContext); + pSendMesgContext->setDefaultMqProducer(this); + pSendMesgContext->setProducerGroup(getGroupName()); + pSendMesgContext->setCommunicationMode(static_cast(communicationMode)); + pSendMesgContext->setBornHost(UtilAll::getLocalAddress()); + pSendMesgContext->setBrokerAddr(brokerAddr); + pSendMesgContext->setMessage(msg); + pSendMesgContext->setMessageQueue(mq); + pSendMesgContext->setMsgType(TRACE_NORMAL_MSG); + pSendMesgContext->setNameSpace(getNameSpace()); + string tranMsg = msg.getProperty(MQMessage::PROPERTY_TRANSACTION_PREPARED); + if (!tranMsg.empty() && tranMsg == "true") { + pSendMesgContext->setMsgType(TRACE_TRANS_HALF_MSG); + } + if (msg.getProperty("__STARTDELIVERTIME") != "" || + msg.getProperty(MQMessage::PROPERTY_DELAY_TIME_LEVEL) != "") { + pSendMesgContext->setMsgType(TRACE_DELAY_MSG); + } + executeSendMessageHookBefore(pSendMesgContext.get()); + } SendMessageRequestHeader* requestHeader = new SendMessageRequestHeader(); requestHeader->producerGroup = getGroupName(); requestHeader->topic = (msg.getTopic()); @@ -458,9 +486,15 @@ SendResult DefaultMQProducerImpl::sendKernelImpl(MQMessage& msg, requestHeader->batch = isBatchMsg; requestHeader->properties = (MQDecoder::messageProperties2String(msg.getProperties())); - return getFactory()->getMQClientAPIImpl()->sendMessage(brokerAddr, mq.getBrokerName(), msg, requestHeader, - getSendMsgTimeout(), getRetryTimes4Async(), - communicationMode, sendCallback, getSessionCredentials()); + SendResult sendResult = getFactory()->getMQClientAPIImpl()->sendMessage( + brokerAddr, mq.getBrokerName(), msg, requestHeader, getSendMsgTimeout(), getRetryTimes4Async(), + communicationMode, sendCallback, getSessionCredentials()); + if (!isMessageTraceTopic(msg.getTopic()) && getMessageTrace() && hasSendMessageHook() && sendCallback == NULL && + communicationMode == ComMode_SYNC) { + pSendMesgContext->setSendResult(sendResult); + executeSendMessageHookAfter(pSendMesgContext.get()); + } + return sendResult; } catch (MQException& e) { throw e; } @@ -636,6 +670,7 @@ bool DefaultMQProducerImpl::dealWithNameSpace() { } return true; } + void DefaultMQProducerImpl::logConfigs() { showClientConfigs(); @@ -646,5 +681,70 @@ void DefaultMQProducerImpl::logConfigs() { LOG_WARN("RetryTimes:%d", m_retryTimes); LOG_WARN("RetryTimes4Async:%d", m_retryTimes4Async); } + +// we should create trace message poll before producer send messages. +bool DefaultMQProducerImpl::dealWithMessageTrace() { + if (!getMessageTrace()) { + LOG_INFO("Message Trace set to false, Will not send trace messages."); + return false; + } + size_t threadpool_size = boost::thread::hardware_concurrency(); + LOG_INFO("Create send message trace threadpool: %d", threadpool_size); + for (size_t i = 0; i < threadpool_size; ++i) { + m_trace_threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &m_trace_ioService)); + } + LOG_INFO("DefaultMQProducer Open meassage trace.."); + std::shared_ptr hook(new SendMessageHookImpl); + registerSendMessageHook(hook); + return true; +} +bool DefaultMQProducerImpl::isMessageTraceTopic(string source) { + return source.find(TraceContant::TRACE_TOPIC) != string::npos; +} +bool DefaultMQProducerImpl::hasSendMessageHook() { + return !m_sendMessageHookList.empty(); +} + +void DefaultMQProducerImpl::registerSendMessageHook(std::shared_ptr& hook) { + m_sendMessageHookList.push_back(hook); + LOG_INFO("Register sendMessageHook success,hookname is %s", hook->getHookName().c_str()); +} + +void DefaultMQProducerImpl::executeSendMessageHookBefore(SendMessageContext* context) { + if (!m_sendMessageHookList.empty()) { + std::vector >::iterator it = m_sendMessageHookList.begin(); + for (; it != m_sendMessageHookList.end(); ++it) { + try { + (*it)->executeHookBefore(context); + } catch (exception e) { + } + } + } +} + +void DefaultMQProducerImpl::executeSendMessageHookAfter(SendMessageContext* context) { + if (!m_sendMessageHookList.empty()) { + std::vector >::iterator it = m_sendMessageHookList.begin(); + for (; it != m_sendMessageHookList.end(); ++it) { + try { + (*it)->executeHookAfter(context); + } catch (exception e) { + } + } + } +} + +void DefaultMQProducerImpl::submitSendTraceRequest(const MQMessage& msg, SendCallback* pSendCallback) { + m_trace_ioService.post(boost::bind(&DefaultMQProducerImpl::sendTraceMessage, this, msg, pSendCallback)); +} + +void DefaultMQProducerImpl::sendTraceMessage(MQMessage& msg, SendCallback* pSendCallback) { + try { + send(msg, pSendCallback, true); + } catch (MQException e) { + LOG_ERROR(e.what()); + // throw e; + } +} //& msgs); bool dealWithNameSpace(); void logConfigs(); + bool dealWithMessageTrace(); + bool isMessageTraceTopic(std::string topic); + bool hasSendMessageHook(); + void registerSendMessageHook(std::shared_ptr& hook); + void executeSendMessageHookBefore(SendMessageContext* context); + void executeSendMessageHookAfter(SendMessageContext* context); + + void sendTraceMessage(MQMessage& msg, SendCallback* pSendCallback); private: int m_sendMsgTimeout; @@ -113,6 +124,12 @@ class DefaultMQProducerImpl : public MQProducer { int m_compressLevel; int m_retryTimes; int m_retryTimes4Async; + + // used for trace + std::vector > m_sendMessageHookList; + boost::asio::io_service m_trace_ioService; + boost::thread_group m_trace_threadpool; + boost::asio::io_service::work m_trace_ioService_work; }; // +#include +#include "DefaultMQProducerImpl.h" +#include "MQClientException.h" +#include "SendMessageContext.h" +#include "TraceContant.h" +#include "TraceTransferBean.h" +#include "TraceUtil.h" +#include "UtilAll.h" + +using namespace std; +namespace rocketmq { + +class TraceMessageSendCallback : public SendCallback { + virtual void onSuccess(SendResult& sendResult) {} + virtual void onException(MQException& e) {} +}; +static TraceMessageSendCallback* callback = new TraceMessageSendCallback(); +std::string SendMessageHookImpl::getHookName() { + return "RocketMQSendMessageHookImpl"; +} + +void SendMessageHookImpl::executeHookBefore(SendMessageContext* context) { + if (context != NULL) { + string topic = context->getMessage()->getTopic(); + // Check if contains TraceConstants::TRACE_TOPIC + if (topic.find(TraceContant::TRACE_TOPIC) != string::npos) { + // trace message itself + return; + } + TraceContext* traceContext = new TraceContext(); + context->setTraceContext(traceContext); + } + return; +} + +void SendMessageHookImpl::executeHookAfter(SendMessageContext* context) { + if (context == NULL) { + return; + } + string topic = context->getMessage()->getTopic(); + // Check if contains TraceConstants::TRACE_TOPIC + if (topic.find(TraceContant::TRACE_TOPIC) != string::npos) { + // trace message itself + return; + } + std::shared_ptr traceContext; + traceContext.reset(context->getTraceContext()); + + // OnsTraceContext* onsContext = context->getMqTraceContext(); + traceContext->setTraceType(Pub); + traceContext->setGroupName(context->getProducerGroup()); + // boost::scoped_ptr traceBean(new OnsTraceBean()); + TraceBean traceBean; + traceBean.setTopic(context->getMessage()->getTopic()); + traceBean.setTags(context->getMessage()->getTags()); + traceBean.setKeys(context->getMessage()->getKeys()); + traceBean.setStoreHost(context->getBrokerAddr()); + traceBean.setBodyLength(context->getMessage()->getBody().size()); + traceBean.setMsgType(context->getMsgType()); + + int costTime = static_cast(UtilAll::currentTimeMillis() - traceContext->getTimeStamp()); + traceContext->setCostTime(costTime); + if (context->getSendResult()->getSendStatus() == SEND_OK) { + traceContext->setStatus(true); + } else { + traceContext->setStatus(false); + } + + traceContext->setRegionId(context->getSendResult()->getRegionId()); + traceBean.setMsgId(context->getMessage()->getProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)); + traceBean.setOffsetMsgId(context->getSendResult()->getMsgId()); + traceBean.setStoreTime(traceContext->getTimeStamp() + (costTime / 2)); + + traceContext->setTraceBean(traceBean); + + topic = TraceContant::TRACE_TOPIC + traceContext->getRegionId(); + TraceTransferBean ben = TraceUtil::CovertTraceContextToTransferBean(traceContext.get()); + // encode data + MQMessage message(topic, ben.getTransData()); + message.setKeys(ben.getTransKey()); + // send trace message. + context->getDefaultMqProducer()->submitSendTraceRequest(message, callback); + return; +} +} // namespace rocketmq \ No newline at end of file diff --git a/src/producer/SendMessageHookImpl.h b/src/producer/SendMessageHookImpl.h new file mode 100644 index 000000000..54a2ebcfd --- /dev/null +++ b/src/producer/SendMessageHookImpl.h @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __ROCKETMQ_SEND_MESSAGE_RPC_HOOK_IMPL_H__ +#define __ROCKETMQ_SEND_MESSAGE_RPC_HOOK_IMPL_H__ + +#include +#include "SendMessageContext.h" +#include "SendMessageHook.h" +namespace rocketmq { +class SendMessageHookImpl : public SendMessageHook { + public: + virtual ~SendMessageHookImpl() {} + virtual std::string getHookName(); + virtual void executeHookBefore(SendMessageContext* context); + virtual void executeHookAfter(SendMessageContext* context); +}; +} // namespace rocketmq +#endif \ No newline at end of file diff --git a/src/trace/ConsumeMessageContext.h b/src/trace/ConsumeMessageContext.h new file mode 100644 index 000000000..a4fae4a60 --- /dev/null +++ b/src/trace/ConsumeMessageContext.h @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __ROCKETMQ_CONSUME_MESSAGE_CONTEXT_H__ +#define __ROCKETMQ_CONSUME_MESSAGE_CONTEXT_H__ + +#include + +namespace rocketmq { +class ConsumeMessageContext { + public: + virtual ~ConsumeMessageContext() {} + std::string m_context; +}; +} // namespace rocketmq +#endif \ No newline at end of file diff --git a/src/trace/ConsumeMessageHook.h b/src/trace/ConsumeMessageHook.h new file mode 100644 index 000000000..d6317fb11 --- /dev/null +++ b/src/trace/ConsumeMessageHook.h @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __ROCKETMQ_COUNSUME_MESSAGE_RPC_HOOK_H__ +#define __ROCKETMQ_COUNSUME_MESSAGE_RPC_HOOK_H__ + +#include +#include "ConsumeMessageContext.h" +namespace rocketmq { +class ConsumeMessageHook { + public: + virtual ~ConsumeMessageHook() {} + virtual std::string getHookName() = 0; + virtual void executeHookBefore(ConsumeMessageContext* context) = 0; + virtual void executeHookAfter(ConsumeMessageContext* context) = 0; +}; +} // namespace rocketmq +#endif \ No newline at end of file diff --git a/src/trace/SendMessageContext.cpp b/src/trace/SendMessageContext.cpp new file mode 100644 index 000000000..9cf076627 --- /dev/null +++ b/src/trace/SendMessageContext.cpp @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "SendMessageContext.h" +#include + +namespace rocketmq { +SendMessageContext::SendMessageContext() {} + +SendMessageContext::~SendMessageContext() { + m_defaultMQProducer = NULL; +} + +std::string SendMessageContext::getProducerGroup() { + return m_producerGroup; +} + +void SendMessageContext::setProducerGroup(const std::string& mProducerGroup) { + m_producerGroup = mProducerGroup; +} + +MQMessage* SendMessageContext::getMessage() { + return &m_message; +} + +void SendMessageContext::setMessage(const MQMessage& mMessage) { + m_message = mMessage; +} + +TraceMessageType SendMessageContext::getMsgType() { + return m_msgType; +} + +void SendMessageContext::setMsgType(TraceMessageType mMsgType) { + m_msgType = mMsgType; +} + +MQMessageQueue* SendMessageContext::getMessageQueue() { + return &m_messageQueue; +} + +void SendMessageContext::setMessageQueue(const MQMessageQueue& mMq) { + m_messageQueue = mMq; +} + +std::string SendMessageContext::getBrokerAddr() { + return m_brokerAddr; +} + +void SendMessageContext::setBrokerAddr(const std::string& mBrokerAddr) { + m_brokerAddr = mBrokerAddr; +} + +std::string SendMessageContext::getBornHost() { + return m_bornHost; +} + +void SendMessageContext::setBornHost(const std::string& mBornHost) { + m_bornHost = mBornHost; +} + +CommunicationMode SendMessageContext::getCommunicationMode() { + return m_communicationMode; +} + +void SendMessageContext::setCommunicationMode(CommunicationMode mCommunicationMode) { + m_communicationMode = mCommunicationMode; +} + +DefaultMQProducerImpl* SendMessageContext::getDefaultMqProducer() { + return m_defaultMQProducer; +} + +void SendMessageContext::setDefaultMqProducer(DefaultMQProducerImpl* mDefaultMqProducer) { + m_defaultMQProducer = mDefaultMqProducer; +} + +SendResult* SendMessageContext::getSendResult() { + return &m_sendResult; +} + +void SendMessageContext::setSendResult(const SendResult& mSendResult) { + m_sendResult = mSendResult; +} + +TraceContext* SendMessageContext::getTraceContext() { + return m_traceContext; +} + +void SendMessageContext::setTraceContext(TraceContext* mTraceContext) { + m_traceContext = mTraceContext; +} + +std::string SendMessageContext::getNameSpace() { + return m_nameSpace; +} + +void SendMessageContext::setNameSpace(const std::string& mNameSpace) { + m_nameSpace = mNameSpace; +} +} // namespace rocketmq diff --git a/src/trace/SendMessageContext.h b/src/trace/SendMessageContext.h new file mode 100644 index 000000000..9cd985ad1 --- /dev/null +++ b/src/trace/SendMessageContext.h @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __ROCKETMQ_SEND_MESSAGE_CONTEXT_H__ +#define __ROCKETMQ_SEND_MESSAGE_CONTEXT_H__ + +#include +#include "CommunicationMode.h" +#include "MQMessage.h" +#include "MQMessageQueue.h" +#include "SendResult.h" +#include "TraceBean.h" +#include "TraceContant.h" +#include "TraceContext.h" + +namespace rocketmq { +class DefaultMQProducerImpl; +class SendMessageContext { + public: + SendMessageContext(); + + virtual ~SendMessageContext(); + + std::string getProducerGroup(); + + void setProducerGroup(const std::string& mProducerGroup); + + MQMessage* getMessage(); + + void setMessage(const MQMessage& mMessage); + + TraceMessageType getMsgType(); + + void setMsgType(TraceMessageType mMsgType); + + MQMessageQueue* getMessageQueue(); + + void setMessageQueue(const MQMessageQueue& mMq); + + std::string getBrokerAddr(); + + void setBrokerAddr(const std::string& mBrokerAddr); + + std::string getBornHost(); + + void setBornHost(const std::string& mBornHost); + + CommunicationMode getCommunicationMode(); + + void setCommunicationMode(CommunicationMode mCommunicationMode); + + DefaultMQProducerImpl* getDefaultMqProducer(); + + void setDefaultMqProducer(DefaultMQProducerImpl* mDefaultMqProducer); + + SendResult* getSendResult(); + + void setSendResult(const SendResult& mSendResult); + + TraceContext* getTraceContext(); + + void setTraceContext(TraceContext* mTraceContext); + + std::string getNameSpace(); + + void setNameSpace(const std::string& mNameSpace); + + private: + std::string m_producerGroup; + MQMessage m_message; + TraceMessageType m_msgType; + MQMessageQueue m_messageQueue; + std::string m_brokerAddr; + std::string m_bornHost; + CommunicationMode m_communicationMode; + DefaultMQProducerImpl* m_defaultMQProducer; + SendResult m_sendResult; + TraceContext* m_traceContext; + std::string m_nameSpace; +}; + +} // namespace rocketmq +#endif \ No newline at end of file diff --git a/src/include/SendMessageHook.h b/src/trace/SendMessageHook.h similarity index 54% rename from src/include/SendMessageHook.h rename to src/trace/SendMessageHook.h index 0d3e1e3b1..70c39277a 100644 --- a/src/include/SendMessageHook.h +++ b/src/trace/SendMessageHook.h @@ -14,36 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#ifndef __SENDMESSAGEHOOK_H__ -#define __SENDMESSAGEHOOK_H__ - -#include "MQClientException.h" -#include "MQMessage.h" -#include "MQMessageQueue.h" -#include "RocketMQClient.h" -#include "SendResult.h" +#ifndef __ROCKETMQ_SEND_MESSAGE_RPC_HOOK_H__ +#define __ROCKETMQ_SEND_MESSAGE_RPC_HOOK_H__ +#include +#include "SendMessageContext.h" namespace rocketmq { -// +#include + +namespace rocketmq { +TraceBean::TraceBean() + : m_topic("null"), + m_msgId("null"), + m_offsetMsgId("null"), + m_tags("null"), + m_keys("null"), + m_storeHost("null"), + m_clientHost("null") {} + +TraceBean::~TraceBean() {} + +const std::string& TraceBean::getTopic() const { + return m_topic; +} + +void TraceBean::setTopic(const std::string& topic) { + m_topic = topic; +} + +const std::string& TraceBean::getMsgId() const { + return m_msgId; +} + +void TraceBean::setMsgId(const std::string& msgId) { + m_msgId = msgId; +} + +const std::string& TraceBean::getOffsetMsgId() const { + return m_offsetMsgId; +} + +void TraceBean::setOffsetMsgId(const std::string& offsetMsgId) { + m_offsetMsgId = offsetMsgId; +} + +const std::string& TraceBean::getTags() const { + return m_tags; +} + +void TraceBean::setTags(const std::string& tags) { + m_tags = tags; +} + +const std::string& TraceBean::getKeys() const { + return m_keys; +} + +void TraceBean::setKeys(const std::string& keys) { + m_keys = keys; +} + +const std::string& TraceBean::getStoreHost() const { + return m_storeHost; +} + +void TraceBean::setStoreHost(const std::string& storeHost) { + m_storeHost = storeHost; +} + +const std::string& TraceBean::getClientHost() const { + return m_clientHost; +} + +void TraceBean::setClientHost(const std::string& clientHost) { + m_clientHost = clientHost; +} + +TraceMessageType TraceBean::getMsgType() const { + return m_msgType; +} + +void TraceBean::setMsgType(TraceMessageType msgType) { + m_msgType = msgType; +} + +long long int TraceBean::getStoreTime() const { + return m_storeTime; +} + +void TraceBean::setStoreTime(long long int storeTime) { + m_storeTime = storeTime; +} + +int TraceBean::getRetryTimes() const { + return m_retryTimes; +} + +void TraceBean::setRetryTimes(int retryTimes) { + m_retryTimes = retryTimes; +} + +int TraceBean::getBodyLength() const { + return m_bodyLength; +} + +void TraceBean::setBodyLength(int bodyLength) { + m_bodyLength = bodyLength; +} +} // namespace rocketmq \ No newline at end of file diff --git a/src/trace/TraceBean.h b/src/trace/TraceBean.h new file mode 100644 index 000000000..b6edce918 --- /dev/null +++ b/src/trace/TraceBean.h @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __ROCKETMQ_TRACE_BEAN_H__ +#define __ROCKETMQ_TRACE_BEAN_H__ + +#include +#include +#include "TraceContant.h" + +namespace rocketmq { +class TraceBean { + public: + TraceBean(); + virtual ~TraceBean(); + + const std::string& getTopic() const; + + void setTopic(const std::string& topic); + + const std::string& getMsgId() const; + + void setMsgId(const std::string& msgId); + + const std::string& getOffsetMsgId() const; + + void setOffsetMsgId(const std::string& offsetMsgId); + + const std::string& getTags() const; + + void setTags(const std::string& tags); + + const std::string& getKeys() const; + + void setKeys(const std::string& keys); + + const std::string& getStoreHost() const; + + void setStoreHost(const std::string& storeHost); + + const std::string& getClientHost() const; + + void setClientHost(const std::string& clientHost); + + TraceMessageType getMsgType() const; + + void setMsgType(TraceMessageType msgType); + + long long int getStoreTime() const; + + void setStoreTime(long long int storeTime); + + int getRetryTimes() const; + + void setRetryTimes(int retryTimes); + + int getBodyLength() const; + + void setBodyLength(int bodyLength); + + private: + std::string m_topic; + std::string m_msgId; + std::string m_offsetMsgId; + std::string m_tags; + std::string m_keys; + std::string m_storeHost; + std::string m_clientHost; + TraceMessageType m_msgType; + long long m_storeTime; + int m_retryTimes; + int m_bodyLength; +}; +} // namespace rocketmq +#endif \ No newline at end of file diff --git a/src/trace/TraceContant.cpp b/src/trace/TraceContant.cpp new file mode 100644 index 000000000..997a730b0 --- /dev/null +++ b/src/trace/TraceContant.cpp @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "TraceContant.h" +#include + +namespace rocketmq { +std::string TraceContant::GROUP_NAME = "_INNER_TRACE_PRODUCER"; +std::string TraceContant::TRACE_TOPIC = "rmq_sys_TRACE_DATA_"; +std::string TraceContant::DEFAULT_REDION = "DEFAULT_REGION"; +char TraceContant::CONTENT_SPLITOR = 1; +char TraceContant::FIELD_SPLITOR = 2; +std::string TraceContant::TRACE_TYPE_PUB = "Pub"; +std::string TraceContant::TRACE_TYPE_BEFORE = "SubBefore"; +std::string TraceContant::TRACE_TYPE_AFTER = "SubAfter"; +} // namespace rocketmq diff --git a/src/trace/TraceContant.h b/src/trace/TraceContant.h new file mode 100644 index 000000000..3a5f30fe6 --- /dev/null +++ b/src/trace/TraceContant.h @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __ROCKETMQ_TRACE_CONTANT_H_ +#define __ROCKETMQ_TRACE_CONTANT_H_ + +#include + +namespace rocketmq { +class TraceContant { + public: + static std::string GROUP_NAME; + static std::string TRACE_TOPIC; + static std::string DEFAULT_REDION; + static char CONTENT_SPLITOR; + static char FIELD_SPLITOR; + static std::string TRACE_TYPE_PUB; + static std::string TRACE_TYPE_BEFORE; + static std::string TRACE_TYPE_AFTER; +}; +enum TraceMessageType { + TRACE_NORMAL_MSG = 0, + TRACE_TRANS_HALF_MSG, + TRACE_TRANS_COMMIT_MSG, + TRACE_DELAY_MSG, +}; +enum TraceType { + Pub, // for send message + SubBefore, // for consume message before + SubAfter, // for consum message after +}; +} // namespace rocketmq +#endif // diff --git a/src/trace/TraceContext.cpp b/src/trace/TraceContext.cpp new file mode 100644 index 000000000..6696f069b --- /dev/null +++ b/src/trace/TraceContext.cpp @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "TraceContext.h" +#include +#include + +#include "StringIdMaker.h" +#include "UtilAll.h" + +namespace rocketmq { +TraceContext::TraceContext() : m_timeStamp(UtilAll::currentTimeMillis()) { + m_requestId = StringIdMaker::getInstance().createUniqID(); +} + +TraceContext::TraceContext(const std::string& mGroupName) : m_groupName(mGroupName) {} + +TraceContext::~TraceContext() {} + +TraceMessageType TraceContext::getMsgType() const { + return m_msgType; +} + +void TraceContext::setMsgType(TraceMessageType msgType) { + m_msgType = msgType; +} + +TraceType TraceContext::getTraceType() const { + return m_traceType; +} + +void TraceContext::setTraceType(TraceType traceType) { + m_traceType = traceType; +} + +long long int TraceContext::getTimeStamp() const { + return m_timeStamp; +} + +void TraceContext::setTimeStamp(long long int timeStamp) { + m_timeStamp = timeStamp; +} + +const string& TraceContext::getRegionId() const { + return m_regionId; +} + +void TraceContext::setRegionId(const string& regionId) { + m_regionId = regionId; +} + +const string& TraceContext::getGroupName() const { + return m_groupName; +} + +void TraceContext::setGroupName(const string& groupName) { + m_groupName = groupName; +} + +int TraceContext::getCostTime() const { + return m_costTime; +} + +void TraceContext::setCostTime(int costTime) { + m_costTime = costTime; +} + +bool TraceContext::getStatus() const { + return m_status; +} + +void TraceContext::setStatus(bool isSuccess) { + m_status = isSuccess; +} + +const string& TraceContext::getRequestId() const { + return m_requestId; +} + +void TraceContext::setRequestId(const string& requestId) { + m_requestId = requestId; +} + +int TraceContext::getTraceBeanIndex() const { + return m_traceBeanIndex; +} + +void TraceContext::setTraceBeanIndex(int traceBeanIndex) { + m_traceBeanIndex = traceBeanIndex; +} + +const vector& TraceContext::getTraceBeans() const { + return m_traceBeans; +} + +void TraceContext::setTraceBean(const TraceBean& traceBean) { + m_traceBeans.push_back(traceBean); +} +} // namespace rocketmq \ No newline at end of file diff --git a/src/trace/TraceContext.h b/src/trace/TraceContext.h new file mode 100644 index 000000000..1f617546f --- /dev/null +++ b/src/trace/TraceContext.h @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __ROCKETMQ_TRACE_CONTEXT_H__ +#define __ROCKETMQ_TRACE_CONTEXT_H__ + +#include +#include +#include "TraceBean.h" +#include "TraceContant.h" + +namespace rocketmq { +class TraceContext { + public: + TraceContext(); + + TraceContext(const std::string& mGroupName); + + virtual ~TraceContext(); + + TraceMessageType getMsgType() const; + + void setMsgType(TraceMessageType msgType); + + TraceType getTraceType() const; + + void setTraceType(TraceType traceType); + + long long int getTimeStamp() const; + + void setTimeStamp(long long int timeStamp); + + const std::string& getRegionId() const; + + void setRegionId(const std::string& regionId); + + const std::string& getGroupName() const; + + void setGroupName(const std::string& groupName); + + int getCostTime() const; + + void setCostTime(int costTime); + + bool getStatus() const; + + void setStatus(bool isSuccess); + + const std::string& getRequestId() const; + + void setRequestId(const std::string& requestId); + + int getTraceBeanIndex() const; + + void setTraceBeanIndex(int traceBeanIndex); + + const std::vector& getTraceBeans() const; + + void setTraceBean(const TraceBean& traceBean); + + private: + TraceMessageType m_msgType; + TraceType m_traceType; + long long m_timeStamp; + std::string m_regionId; + std::string m_groupName; + int m_costTime; + bool m_status; + std::string m_requestId; + int m_traceBeanIndex; + std::vector m_traceBeans; +}; +} // namespace rocketmq +#endif \ No newline at end of file diff --git a/src/trace/TraceTransferBean.cpp b/src/trace/TraceTransferBean.cpp new file mode 100644 index 000000000..bd04c333e --- /dev/null +++ b/src/trace/TraceTransferBean.cpp @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "TraceTransferBean.h" +#include +#include + +namespace rocketmq { +std::string TraceTransferBean::getTransData() { + return m_transData; +} + +void TraceTransferBean::setTransData(const std::string& transData) { + m_transData = transData; +} + +std::vector TraceTransferBean::getTransKey() { + return m_transKey; +} + +void TraceTransferBean::setTransKey(const std::string& transkey) { + m_transKey.push_back(transkey); +} +} // namespace rocketmq \ No newline at end of file diff --git a/src/trace/TraceTransferBean.h b/src/trace/TraceTransferBean.h new file mode 100644 index 000000000..5ca054a11 --- /dev/null +++ b/src/trace/TraceTransferBean.h @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __ROCKETMQ_TRACE_TRANSFER_BEAN_H__ +#define __ROCKETMQ_TRACE_TRANSFER_BEAN_H__ + +#include +#include + +namespace rocketmq { +class TraceTransferBean { + public: + std::string getTransData(); + + void setTransData(const std::string& transData); + + std::vector getTransKey(); + + void setTransKey(const std::string& transkey); + + private: + std::string m_transData; + std::vector m_transKey; +}; +} // namespace rocketmq +#endif \ No newline at end of file diff --git a/src/trace/TraceUtil.cpp b/src/trace/TraceUtil.cpp new file mode 100644 index 000000000..d0ab792bb --- /dev/null +++ b/src/trace/TraceUtil.cpp @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "TraceUtil.h" +#include +#include +#include "TraceContant.h" + +namespace rocketmq { +std::string TraceUtil::CovertTraceTypeToString(TraceType type) { + switch (type) { + case Pub: + return TraceContant::TRACE_TYPE_PUB; + case SubBefore: + return TraceContant::TRACE_TYPE_BEFORE; + case SubAfter: + return TraceContant::TRACE_TYPE_AFTER; + default: + return TraceContant::TRACE_TYPE_PUB; + } +} + +TraceTransferBean TraceUtil::CovertTraceContextToTransferBean(TraceContext* ctx) { + std::ostringstream ss; + std::vector bens = ctx->getTraceBeans(); + switch (ctx->getTraceType()) { + case Pub: { + TraceBean* ben = &bens[0]; + ss << TraceUtil::CovertTraceTypeToString(ctx->getTraceType()) << TraceContant::CONTENT_SPLITOR; + ss << ctx->getTimeStamp() << TraceContant::CONTENT_SPLITOR; + ss << ctx->getRegionId() << TraceContant::CONTENT_SPLITOR; + ss << ctx->getGroupName() << TraceContant::CONTENT_SPLITOR; + ss << ben->getTopic() << TraceContant::CONTENT_SPLITOR; + ss << ben->getMsgId() << TraceContant::CONTENT_SPLITOR; + ss << ben->getTags() << TraceContant::CONTENT_SPLITOR; + ss << ben->getKeys() << TraceContant::CONTENT_SPLITOR; + ss << ben->getStoreHost() << TraceContant::CONTENT_SPLITOR; + ss << ben->getBodyLength() << TraceContant::CONTENT_SPLITOR; + ss << ctx->getCostTime() << TraceContant::CONTENT_SPLITOR; + ss << ben->getMsgType() << TraceContant::CONTENT_SPLITOR; + ss << ben->getOffsetMsgId() << TraceContant::CONTENT_SPLITOR; + ss << (ctx->getStatus() ? "true" : "false") << TraceContant::FIELD_SPLITOR; + } break; + + case SubBefore: { + std::vector::iterator it = bens.begin(); + for (; it != bens.end(); ++it) { + ss << TraceUtil::CovertTraceTypeToString(ctx->getTraceType()) << TraceContant::CONTENT_SPLITOR; + ss << ctx->getTimeStamp() << TraceContant::CONTENT_SPLITOR; + ss << ctx->getRegionId() << TraceContant::CONTENT_SPLITOR; + ss << ctx->getGroupName() << TraceContant::CONTENT_SPLITOR; + ss << ctx->getRequestId() << TraceContant::CONTENT_SPLITOR; + ss << (*it).getMsgId() << TraceContant::CONTENT_SPLITOR; + ss << (*it).getRetryTimes() << TraceContant::CONTENT_SPLITOR; + ss << (*it).getKeys() << TraceContant::FIELD_SPLITOR; + } + } break; + + case SubAfter: { + // TraceBean* bean = &bens[ctx->getTraceBeanIndex()]; + TraceBean* bean = &bens[0]; + ss << TraceUtil::CovertTraceTypeToString(ctx->getTraceType()) << TraceContant::CONTENT_SPLITOR; + ss << ctx->getRequestId() << TraceContant::CONTENT_SPLITOR; + ss << bean->getMsgId() << TraceContant::CONTENT_SPLITOR; + ss << ctx->getCostTime() << TraceContant::CONTENT_SPLITOR; + ss << (ctx->getStatus() ? "true" : "false") << TraceContant::CONTENT_SPLITOR; + ss << bean->getKeys() << TraceContant::FIELD_SPLITOR; + } break; + + default: + break; + } + + TraceTransferBean transferBean; + transferBean.setTransData(ss.str()); + + switch (ctx->getTraceType()) { + case Pub: { + transferBean.setTransKey(bens[0].getMsgId()); + if (bens[0].getKeys() != "") + transferBean.setTransKey(bens[0].getKeys()); + } break; + case SubBefore: { + std::vector::iterator it = bens.begin(); + for (; it != bens.end(); ++it) { + transferBean.setTransKey((*it).getMsgId()); + if ((*it).getKeys() != "") + transferBean.setTransKey((*it).getKeys()); + } + + } break; + case SubAfter: { + transferBean.setTransKey(bens[0].getMsgId()); + if (bens[0].getKeys() != "") + transferBean.setTransKey(bens[0].getKeys()); + } break; + default: + break; + } + + return transferBean; +} +} // namespace rocketmq diff --git a/src/trace/TraceUtil.h b/src/trace/TraceUtil.h new file mode 100644 index 000000000..f924844ae --- /dev/null +++ b/src/trace/TraceUtil.h @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __ROCKETMQ_TRACE_UTIL_H_ +#define __ROCKETMQ_TRACE_UTIL_H_ + +#include +#include "TraceContant.h" +#include "TraceContext.h" +#include "TraceTransferBean.h" + +namespace rocketmq { +class TraceUtil { + public: + static std::string CovertTraceTypeToString(TraceType type); + static TraceTransferBean CovertTraceContextToTransferBean(TraceContext* ctx); +}; +} // namespace rocketmq +#endif // diff --git a/test/src/common/NameSpaceUtilTest.cpp b/test/src/common/NameSpaceUtilTest.cpp index 8076c0465..08873b5f7 100644 --- a/test/src/common/NameSpaceUtilTest.cpp +++ b/test/src/common/NameSpaceUtilTest.cpp @@ -80,9 +80,11 @@ TEST(NameSpaceUtil, hasNameSpace) { string source = "testTopic"; string ns = "MQ_INST_UNITTEST"; string nsSource = "MQ_INST_UNITTEST%testTopic"; + string nsTraceSource = "rmq_sys_TRACE_DATA_Region"; EXPECT_TRUE(NameSpaceUtil::hasNameSpace(nsSource, ns)); EXPECT_FALSE(NameSpaceUtil::hasNameSpace(source, ns)); EXPECT_FALSE(NameSpaceUtil::hasNameSpace(source, "")); + EXPECT_TRUE(NameSpaceUtil::hasNameSpace(nsTraceSource, ns)); } int main(int argc, char* argv[]) { InitGoogleMock(&argc, argv); diff --git a/test/src/producer/DefaultMQProducerImplTest.cpp b/test/src/producer/DefaultMQProducerImplTest.cpp index b9a042beb..3e42e69c5 100644 --- a/test/src/producer/DefaultMQProducerImplTest.cpp +++ b/test/src/producer/DefaultMQProducerImplTest.cpp @@ -103,9 +103,8 @@ TEST(DefaultMQProducerImplTest, init) { EXPECT_EQ(impl->getNamesrvAddr(), "rocketmq.nameserver.com"); impl->setNameSpace("MQ_INST_NAMESPACE_TEST"); EXPECT_EQ(impl->getNameSpace(), "MQ_INST_NAMESPACE_TEST"); - // impl->start(); - // EXPECT_EQ(impl->getGroupName(), "MQ_INST_NAMESPACE_TEST%testMQProducerGroup"); - // impl->shutdown(); + impl->setMessageTrace(true); + EXPECT_TRUE(impl->getMessageTrace()); } TEST(DefaultMQProducerImplTest, Sends) { DefaultMQProducerImpl* impl = new DefaultMQProducerImpl("testMockSendMQProducerGroup"); @@ -187,6 +186,46 @@ TEST(DefaultMQProducerImplTest, Sends) { delete mockFactory; delete apiImpl; } +TEST(DefaultMQProducerImplTest, Trace) { + DefaultMQProducerImpl* impl = new DefaultMQProducerImpl("testMockProducerTraceGroup"); + MockMQClientFactory* mockFactory = new MockMQClientFactory("testTraceClientId"); + MockMQClientAPIImpl* apiImpl = new MockMQClientAPIImpl(); + + impl->setFactory(mockFactory); + impl->setNamesrvAddr("http://rocketmq.nameserver.com"); + impl->setMessageTrace(true); + + // prepare send + boost::shared_ptr topicPublishInfo = boost::make_shared(); + MQMessageQueue mqA("TestTraceTopic", "BrokerA", 0); + MQMessageQueue mqB("TestTraceTopic", "BrokerB", 0); + topicPublishInfo->updateMessageQueueList(mqA); + topicPublishInfo->updateMessageQueueList(mqB); + + SendResult okMQAResult(SEND_OK, "MSSAGEID", "OFFSETID", mqA, 1024, "DEFAULT_REGION"); + + EXPECT_CALL(*mockFactory, start()).Times(1).WillOnce(Return()); + EXPECT_CALL(*mockFactory, shutdown()).Times(1).WillOnce(Return()); + EXPECT_CALL(*mockFactory, registerProducer(_)).Times(1).WillOnce(Return(true)); + EXPECT_CALL(*mockFactory, unregisterProducer(_)).Times(1).WillOnce(Return()); + EXPECT_CALL(*mockFactory, sendHeartbeatToAllBroker()).Times(1).WillOnce(Return()); + EXPECT_CALL(*mockFactory, tryToFindTopicPublishInfo(_, _)).WillRepeatedly(Return(topicPublishInfo)); + EXPECT_CALL(*mockFactory, findBrokerAddressInPublish(_)).WillRepeatedly(Return("BrokerA")); + EXPECT_CALL(*mockFactory, getMQClientAPIImpl()).WillRepeatedly(Return(apiImpl)); + + EXPECT_CALL(*apiImpl, sendMessage(_, _, _, _, _, _, _, _, _)).WillRepeatedly(Return(okMQAResult)); + + // Start Producer. + impl->start(); + + MQMessage msg("TestTraceTopic", "testTag", "testKey", "testBodysA"); + SendResult s1 = impl->send(msg); + EXPECT_EQ(s1.getSendStatus(), SEND_OK); + + impl->shutdown(); + delete mockFactory; + delete apiImpl; +} int main(int argc, char* argv[]) { InitGoogleMock(&argc, argv); return RUN_ALL_TESTS(); diff --git a/test/src/trace/TraceBeanTest.cpp b/test/src/trace/TraceBeanTest.cpp new file mode 100644 index 000000000..7fcbe32c0 --- /dev/null +++ b/test/src/trace/TraceBeanTest.cpp @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +#include "TraceBean.h" + +using std::string; + +using ::testing::InitGoogleMock; +using ::testing::InitGoogleTest; +using testing::Return; + +using rocketmq::TraceBean; +using rocketmq::TraceMessageType; + +TEST(TraceBean, Init) { + std::string m_topic("topic"); + std::string m_msgId("msgid"); + std::string m_offsetMsgId("offsetmsgid"); + std::string m_tags("tag"); + std::string m_keys("ksy"); + std::string m_storeHost("storehost"); + std::string m_clientHost("clienthost"); + TraceMessageType m_msgType = TraceMessageType::TRACE_NORMAL_MSG; + long long m_storeTime = 100; + int m_retryTimes = 2; + int m_bodyLength = 1024; + TraceBean bean; + bean.setTopic(m_topic); + bean.setMsgId(m_msgId); + bean.setOffsetMsgId(m_offsetMsgId); + bean.setTags(m_tags); + bean.setKeys(m_keys); + bean.setStoreHost(m_storeHost); + bean.setClientHost(m_clientHost); + bean.setMsgType(m_msgType); + bean.setStoreTime(m_storeTime); + bean.setRetryTimes(m_retryTimes); + bean.setBodyLength(m_bodyLength); + EXPECT_EQ(bean.getTopic(), m_topic); + EXPECT_EQ(bean.getMsgId(), m_msgId); + EXPECT_EQ(bean.getOffsetMsgId(), m_offsetMsgId); + EXPECT_EQ(bean.getTags(), m_tags); + EXPECT_EQ(bean.getKeys(), m_keys); + EXPECT_EQ(bean.getStoreHost(), m_storeHost); + EXPECT_EQ(bean.getClientHost(), m_clientHost); + EXPECT_EQ(bean.getMsgType(), m_msgType); + EXPECT_EQ(bean.getStoreTime(), m_storeTime); + EXPECT_EQ(bean.getRetryTimes(), m_retryTimes); + EXPECT_EQ(bean.getBodyLength(), m_bodyLength); +} + +int main(int argc, char* argv[]) { + InitGoogleMock(&argc, argv); + testing::GTEST_FLAG(throw_on_failure) = true; + testing::GTEST_FLAG(filter) = "TraceBean.*"; + int itestts = RUN_ALL_TESTS(); + return itestts; +} diff --git a/test/src/trace/TraceUtilTest.cpp b/test/src/trace/TraceUtilTest.cpp new file mode 100644 index 000000000..38d8c3d27 --- /dev/null +++ b/test/src/trace/TraceUtilTest.cpp @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +#include "TraceContant.h" +#include "TraceUtil.h" + +using std::string; + +using ::testing::InitGoogleMock; +using ::testing::InitGoogleTest; +using testing::Return; + +using rocketmq::TraceBean; +using rocketmq::TraceContant; +using rocketmq::TraceContext; +using rocketmq::TraceMessageType; +using rocketmq::TraceTransferBean; +using rocketmq::TraceType; +using rocketmq::TraceUtil; + +TEST(TraceUtil, CovertTraceTypeToString) { + EXPECT_EQ(TraceUtil::CovertTraceTypeToString(TraceType::Pub), TraceContant::TRACE_TYPE_PUB); + EXPECT_EQ(TraceUtil::CovertTraceTypeToString(TraceType::SubBefore), TraceContant::TRACE_TYPE_BEFORE); + EXPECT_EQ(TraceUtil::CovertTraceTypeToString(TraceType::SubAfter), TraceContant::TRACE_TYPE_AFTER); +} +TEST(TraceUtil, CovertTraceContextToTransferBean) { + TraceContext context; + TraceBean bean; + bean.setMsgType(TraceMessageType::TRACE_NORMAL_MSG); + bean.setMsgId("MessageID"); + bean.setKeys("MessageKey"); + context.setRegionId("region"); + context.setMsgType(TraceMessageType::TRACE_TRANS_COMMIT_MSG); + context.setTraceType(TraceType::Pub); + context.setGroupName("PubGroup"); + context.setCostTime(50); + context.setStatus(true); + context.setTraceBean(bean); + TraceTransferBean beanPub = TraceUtil::CovertTraceContextToTransferBean(&context); + EXPECT_GT(beanPub.getTransKey().size(), 0); + context.setTraceType(TraceType::SubBefore); + TraceTransferBean beanBefore = TraceUtil::CovertTraceContextToTransferBean(&context); + EXPECT_GT(beanBefore.getTransKey().size(), 0); + + context.setTraceType(TraceType::SubAfter); + TraceTransferBean beanAfter = TraceUtil::CovertTraceContextToTransferBean(&context); + EXPECT_GT(beanAfter.getTransKey().size(), 0); +} +int main(int argc, char* argv[]) { + InitGoogleMock(&argc, argv); + testing::GTEST_FLAG(throw_on_failure) = true; + testing::GTEST_FLAG(filter) = "TraceUtil.*"; + int itestts = RUN_ALL_TESTS(); + return itestts; +} From 784a4bb54ac60c4360bdfe5372eb21333dced5e4 Mon Sep 17 00:00:00 2001 From: ShannonDing Date: Sun, 15 Mar 2020 17:05:26 +0800 Subject: [PATCH 2/4] feat(trace): add message trace for push consumer --- include/DefaultMQPushConsumer.h | 2 + src/MQClientFactory.cpp | 2 + .../ConsumeMessageConcurrentlyService.cpp | 66 ++++++++-- src/consumer/ConsumeMessageHookImpl.cpp | 113 ++++++++++++++++++ src/consumer/ConsumeMessageHookImpl.h | 32 +++++ src/consumer/ConsumeMessageOrderlyService.cpp | 25 ++++ src/consumer/DefaultMQPushConsumer.cpp | 7 +- src/consumer/DefaultMQPushConsumerImpl.cpp | 80 +++++++++++++ src/consumer/DefaultMQPushConsumerImpl.h | 16 +++ src/producer/DefaultMQProducerImpl.cpp | 20 +++- src/producer/DefaultMQProducerImpl.h | 2 + src/producer/SendMessageHookImpl.cpp | 2 +- src/trace/ConsumeMessageContext.cpp | 102 ++++++++++++++++ src/trace/ConsumeMessageContext.h | 61 +++++++++- src/trace/TraceUtil.cpp | 55 ++++----- 15 files changed, 540 insertions(+), 45 deletions(-) create mode 100644 src/consumer/ConsumeMessageHookImpl.cpp create mode 100644 src/consumer/ConsumeMessageHookImpl.h create mode 100644 src/trace/ConsumeMessageContext.cpp diff --git a/include/DefaultMQPushConsumer.h b/include/DefaultMQPushConsumer.h index 5b10b3df0..46f76cf18 100644 --- a/include/DefaultMQPushConsumer.h +++ b/include/DefaultMQPushConsumer.h @@ -132,6 +132,8 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer { const std::string& getUnitName() const; void setAsyncPull(bool asyncFlag); + void setMessageTrace(bool messageTrace); + bool getMessageTrace() const; private: DefaultMQPushConsumerImpl* impl; diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp index cb1a491bb..d2109656f 100644 --- a/src/MQClientFactory.cpp +++ b/src/MQClientFactory.cpp @@ -77,6 +77,8 @@ void MQClientFactory::start() { m_serviceState = RUNNING; break; case RUNNING: + LOG_INFO("The Factory object:%s start before with now state:%d", m_clientId.c_str(), m_serviceState); + break; case SHUTDOWN_ALREADY: case START_FAILED: LOG_INFO("The Factory object:%s start failed with fault state:%d", m_clientId.c_str(), m_serviceState); diff --git a/src/consumer/ConsumeMessageConcurrentlyService.cpp b/src/consumer/ConsumeMessageConcurrentlyService.cpp index deda8ac1e..ef43bb729 100644 --- a/src/consumer/ConsumeMessageConcurrentlyService.cpp +++ b/src/consumer/ConsumeMessageConcurrentlyService.cpp @@ -17,11 +17,13 @@ #if !defined(WIN32) && !defined(__APPLE__) #include #endif + #include "ConsumeMsgService.h" #include "DefaultMQPushConsumer.h" #include "Logging.h" #include "MessageAccessor.h" #include "UtilAll.h" + namespace rocketmq { //m_messageQueue.toString().c_str()); } } + void ConsumeMessageConcurrentlyService::submitConsumeRequestLater(boost::weak_ptr pullRequest, vector& msgs, int millis) { @@ -146,14 +149,26 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptrisDropped()) { LOG_WARN("the pull request for %s Had been dropped before", request->m_messageQueue.toString().c_str()); request->clearAllMsgs(); // add clear operation to avoid bad state when - // dropped pullRequest returns normal + // dropped pullRequest returns normal return; } if (msgs.empty()) { LOG_WARN("the msg of pull result is NULL,its mq:%s", (request->m_messageQueue).toString().c_str()); return; } - + ConsumeMessageContext consumeMessageContext; + DefaultMQPushConsumerImpl* pConsumer = dynamic_cast(m_pConsumer); + if (pConsumer) { + if (pConsumer->getMessageTrace() && pConsumer->hasConsumeMessageHook()) { + consumeMessageContext.setDefaultMQPushConsumer(pConsumer); + consumeMessageContext.setConsumerGroup(pConsumer->getGroupName()); + consumeMessageContext.setMessageQueue(request->m_messageQueue); + consumeMessageContext.setMsgList(msgs); + consumeMessageContext.setSuccess(false); + consumeMessageContext.setNameSpace(pConsumer->getNameSpace()); + pConsumer->executeConsumeMessageHookBefore(&consumeMessageContext); + } + } ConsumeStatus status = CONSUME_SUCCESS; if (m_pMessageListener != NULL) { resetRetryTopic(msgs); @@ -163,11 +178,48 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptrisUseNameSpaceMode()) { MessageAccessor::withoutNameSpace(msgs, m_pConsumer->getNameSpace()); } - try { - status = m_pMessageListener->consumeMessage(msgs); - } catch (...) { - status = RECONSUME_LATER; - LOG_ERROR("Consumer's code is buggy. Un-caught exception raised"); + + if (pConsumer->getMessageTrace() && pConsumer->hasConsumeMessageHook()) { + // For open trace message, consume message one by one. + for (size_t i = 0; i < msgs.size(); ++i) { + LOG_DEBUG("=====Trace Receive Messages,Topic[%s], MsgId[%s],Body[%s],RetryTimes[%d]", + msgs[i].getTopic().c_str(), msgs[i].getMsgId().c_str(), msgs[i].getBody().c_str(), + msgs[i].getReconsumeTimes()); + std::vector msgInner; + msgInner.push_back(msgs[i]); + if (status != CONSUME_SUCCESS) { + // all the Messages behind should be set to failed. + status = RECONSUME_LATER; + consumeMessageContext.setMsgIndex(i); + consumeMessageContext.setStatus("RECONSUME_LATER"); + consumeMessageContext.setSuccess(false); + pConsumer->executeConsumeMessageHookAfter(&consumeMessageContext); + continue; + } + try { + status = m_pMessageListener->consumeMessage(msgInner); + } catch (...) { + status = RECONSUME_LATER; + LOG_ERROR("Consumer's code is buggy. Un-caught exception raised"); + } + consumeMessageContext.setMsgIndex(i); // indicate message position,not support batch consumer + if (status == CONSUME_SUCCESS) { + consumeMessageContext.setStatus("CONSUME_SUCCESS"); + consumeMessageContext.setSuccess(true); + } else { + status = RECONSUME_LATER; + consumeMessageContext.setStatus("RECONSUME_LATER"); + consumeMessageContext.setSuccess(false); + } + pConsumer->executeConsumeMessageHookAfter(&consumeMessageContext); + } + } else { + try { + status = m_pMessageListener->consumeMessage(msgs); + } catch (...) { + status = RECONSUME_LATER; + LOG_ERROR("Consumer's code is buggy. Un-caught exception raised"); + } } } diff --git a/src/consumer/ConsumeMessageHookImpl.cpp b/src/consumer/ConsumeMessageHookImpl.cpp new file mode 100644 index 000000000..dc3da3cc1 --- /dev/null +++ b/src/consumer/ConsumeMessageHookImpl.cpp @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ConsumeMessageHookImpl.h" +#include +#include +#include "ConsumeMessageContext.h" +#include "DefaultMQPushConsumerImpl.h" +#include "Logging.h" +#include "MQClientException.h" +#include "TraceContant.h" +#include "TraceContext.h" +#include "TraceTransferBean.h" +#include "TraceUtil.h" +#include "UtilAll.h" +namespace rocketmq { + +class TraceMessageConsumeCallback : public SendCallback { + virtual void onSuccess(SendResult& sendResult) { + LOG_DEBUG("TraceMessageConsumeCallback, MsgId:[%s],OffsetMsgId[%s]", sendResult.getMsgId().c_str(), + sendResult.getOffsetMsgId().c_str()); + } + virtual void onException(MQException& e) {} +}; +static TraceMessageConsumeCallback* consumeTraceCallback = new TraceMessageConsumeCallback(); +std::string ConsumeMessageHookImpl::getHookName() { + return "RocketMQConsumeMessageHookImpl"; +} + +void ConsumeMessageHookImpl::executeHookBefore(ConsumeMessageContext* context) { + if (context == NULL || context->getMsgList().empty()) { + return; + } + TraceContext* traceContext = new TraceContext(); + context->setTraceContext(traceContext); + traceContext->setTraceType(SubBefore); + traceContext->setGroupName(context->getConsumerGroup()); + std::vector beans; + + std::vector msgs = context->getMsgList(); + std::vector::iterator it = msgs.begin(); + for (; it != msgs.end(); ++it) { + TraceBean bean; + bean.setTopic((*it).getTopic()); + bean.setMsgId((*it).getMsgId()); + bean.setTags((*it).getTags()); + bean.setKeys((*it).getKeys()); + bean.setStoreHost((*it).getStoreHostString()); + bean.setStoreTime((*it).getStoreTimestamp()); + bean.setBodyLength((*it).getStoreSize()); + bean.setRetryTimes((*it).getReconsumeTimes()); + std::string regionId = (*it).getProperty(MQMessage::PROPERTY_MSG_REGION); + if (regionId.empty()) { + regionId = TraceContant::DEFAULT_REDION; + } + traceContext->setRegionId(regionId); + traceContext->setTraceBean(bean); + } + traceContext->setTimeStamp(UtilAll::currentTimeMillis()); + + std::string topic = TraceContant::TRACE_TOPIC + traceContext->getRegionId(); + + TraceTransferBean ben = TraceUtil::CovertTraceContextToTransferBean(traceContext); + MQMessage message(topic, ben.getTransData()); + message.setKeys(ben.getTransKey()); + + // send trace message async. + context->getDefaultMQPushConsumer()->submitSendTraceRequest(message, consumeTraceCallback); + return; +} + +void ConsumeMessageHookImpl::executeHookAfter(ConsumeMessageContext* context) { + if (context == NULL || context->getMsgList().empty()) { + return; + } + + std::shared_ptr subBeforeContext = context->getTraceContext(); + TraceContext subAfterContext; + subAfterContext.setTraceType(SubAfter); + subAfterContext.setRegionId(subBeforeContext->getRegionId()); + subAfterContext.setGroupName(subBeforeContext->getGroupName()); + subAfterContext.setRequestId(subBeforeContext->getRequestId()); + subAfterContext.setStatus(context->getSuccess()); + int costTime = static_cast(UtilAll::currentTimeMillis() - subBeforeContext->getTimeStamp()); + subAfterContext.setCostTime(costTime); + subAfterContext.setTraceBeanIndex(context->getMsgIndex()); + TraceBean bean = subBeforeContext->getTraceBeans()[subAfterContext.getTraceBeanIndex()]; + subAfterContext.setTraceBean(bean); + + std::string topic = TraceContant::TRACE_TOPIC + subAfterContext.getRegionId(); + TraceTransferBean ben = TraceUtil::CovertTraceContextToTransferBean(&subAfterContext); + MQMessage message(topic, ben.getTransData()); + message.setKeys(ben.getTransKey()); + + // send trace message async. + context->getDefaultMQPushConsumer()->submitSendTraceRequest(message, consumeTraceCallback); + return; +} +} // namespace rocketmq diff --git a/src/consumer/ConsumeMessageHookImpl.h b/src/consumer/ConsumeMessageHookImpl.h new file mode 100644 index 000000000..30852bac0 --- /dev/null +++ b/src/consumer/ConsumeMessageHookImpl.h @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __ROCKETMQ_CONSUME_MESSAGE_RPC_HOOK_IMPL_H__ +#define __ROCKETMQ_CONSUME_MESSAGE_RPC_HOOK_IMPL_H__ + +#include +#include "ConsumeMessageContext.h" +#include "ConsumeMessageHook.h" +namespace rocketmq { +class ConsumeMessageHookImpl : public ConsumeMessageHook { + public: + virtual ~ConsumeMessageHookImpl() {} + virtual std::string getHookName(); + virtual void executeHookBefore(ConsumeMessageContext* context); + virtual void executeHookAfter(ConsumeMessageContext* context); +}; +} // namespace rocketmq +#endif \ No newline at end of file diff --git a/src/consumer/ConsumeMessageOrderlyService.cpp b/src/consumer/ConsumeMessageOrderlyService.cpp index fcff4a406..849fa8f8d 100644 --- a/src/consumer/ConsumeMessageOrderlyService.cpp +++ b/src/consumer/ConsumeMessageOrderlyService.cpp @@ -185,8 +185,27 @@ void ConsumeMessageOrderlyService::ConsumeRequest(boost::weak_ptr p if (m_pConsumer->isUseNameSpaceMode()) { MessageAccessor::withoutNameSpace(msgs, m_pConsumer->getNameSpace()); } + ConsumeMessageContext consumeMessageContext; + DefaultMQPushConsumerImpl* pConsumer = dynamic_cast(m_pConsumer); + if (pConsumer) { + if (pConsumer->getMessageTrace() && pConsumer->hasConsumeMessageHook()) { + consumeMessageContext.setDefaultMQPushConsumer(pConsumer); + consumeMessageContext.setConsumerGroup(pConsumer->getGroupName()); + consumeMessageContext.setMessageQueue(request->m_messageQueue); + consumeMessageContext.setMsgList(msgs); + consumeMessageContext.setSuccess(false); + consumeMessageContext.setNameSpace(pConsumer->getNameSpace()); + pConsumer->executeConsumeMessageHookBefore(&consumeMessageContext); + } + } ConsumeStatus consumeStatus = m_pMessageListener->consumeMessage(msgs); if (consumeStatus == RECONSUME_LATER) { + if (pConsumer) { + consumeMessageContext.setMsgIndex(0); + consumeMessageContext.setStatus("RECONSUME_LATER"); + consumeMessageContext.setSuccess(false); + pConsumer->executeConsumeMessageHookAfter(&consumeMessageContext); + } if (msgs[0].getReconsumeTimes() <= 15) { msgs[0].setReconsumeTimes(msgs[0].getReconsumeTimes() + 1); request->makeMessageToCosumeAgain(msgs); @@ -202,6 +221,12 @@ void ConsumeMessageOrderlyService::ConsumeRequest(boost::weak_ptr p tryLockLaterAndReconsumeDelay(request, false, 5000); } } else { + if (pConsumer) { + consumeMessageContext.setMsgIndex(0); + consumeMessageContext.setStatus("CONSUME_SUCCESS"); + consumeMessageContext.setSuccess(true); + pConsumer->executeConsumeMessageHookAfter(&consumeMessageContext); + } m_pConsumer->updateConsumeOffset(request->m_messageQueue, request->commit()); } } else { diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp index 5034051b5..e0cb5bf41 100644 --- a/src/consumer/DefaultMQPushConsumer.cpp +++ b/src/consumer/DefaultMQPushConsumer.cpp @@ -203,5 +203,10 @@ uint64_t DefaultMQPushConsumer::getTcpTransportTryLockTimeout() const { void DefaultMQPushConsumer::setAsyncPull(bool asyncFlag) { impl->setAsyncPull(asyncFlag); } -//setMessageTrace(messageTrace); +} +bool DefaultMQPushConsumer::getMessageTrace() const { + return impl->getMessageTrace(); +} } // namespace rocketmq diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp b/src/consumer/DefaultMQPushConsumerImpl.cpp index 5fe6f9c3b..954960380 100644 --- a/src/consumer/DefaultMQPushConsumerImpl.cpp +++ b/src/consumer/DefaultMQPushConsumerImpl.cpp @@ -17,6 +17,7 @@ #include "DefaultMQPushConsumerImpl.h" #include "CommunicationMode.h" +#include "ConsumeMessageHookImpl.h" #include "ConsumeMsgService.h" #include "ConsumerRunningInfo.h" #include "FilterAPI.h" @@ -316,6 +317,7 @@ void DefaultMQPushConsumerImpl::start() { case CREATE_JUST: { m_serviceState = START_FAILED; DefaultMQClient::start(); + dealWithMessageTrace(); LOG_INFO("DefaultMQPushConsumerImpl:%s start", m_GroupName.c_str()); //interrupt(); m_async_service_thread->join(); @@ -1080,5 +1083,82 @@ void DefaultMQPushConsumerImpl::logConfigs() { LOG_WARN("AsyncPullMode:%s", m_asyncPull ? "true" : "false"); LOG_WARN("AsyncPullTimeout:%d ms", m_asyncPullTimeout); } +// we should create trace message poll before producer send messages. +bool DefaultMQPushConsumerImpl::dealWithMessageTrace() { + if (!getMessageTrace()) { + LOG_INFO("Message Trace set to false, Will not send trace messages."); + return false; + } + // Try to create default producer inner. + LOG_INFO("DefaultMQPushConsumer Open message trace.."); + + createMessageTraceInnerProducer(); + std::shared_ptr hook(new ConsumeMessageHookImpl()); + registerConsumeMessageHook(hook); + return true; +} + +void DefaultMQPushConsumerImpl::createMessageTraceInnerProducer() { + m_DefaultMQProducerImpl = std::make_shared(getGroupName()); + m_DefaultMQProducerImpl->setMessageTrace(false); + m_DefaultMQProducerImpl->setInstanceName(getInstanceName()); + const SessionCredentials& session = getSessionCredentials(); + m_DefaultMQProducerImpl->setSessionCredentials(session.getAccessKey(), session.getSecretKey(), + session.getAuthChannel()); + if (!getNamesrvAddr().empty()) { + m_DefaultMQProducerImpl->setNamesrvAddr(getNamesrvAddr()); + } + m_DefaultMQProducerImpl->setNameSpace(getNameSpace()); + // m_DefaultMQProducerImpl->setNamesrvDomain(getNamesrvDomain()); + m_DefaultMQProducerImpl->start(false); +} +void DefaultMQPushConsumerImpl::shutdownMessageTraceInnerProducer() { + LOG_INFO("Shutdown Message Trace Inner Producer In Consumer."); + m_DefaultMQProducerImpl->shutdown(false); +} +bool DefaultMQPushConsumerImpl::hasConsumeMessageHook() { + return !m_consumeMessageHookList.empty(); +} + +void DefaultMQPushConsumerImpl::registerConsumeMessageHook(std::shared_ptr& hook) { + m_consumeMessageHookList.push_back(hook); + LOG_INFO("Register ConsumeMessageHook success,hookname is %s", hook->getHookName().c_str()); +} + +void DefaultMQPushConsumerImpl::executeConsumeMessageHookBefore(ConsumeMessageContext* context) { + if (!m_consumeMessageHookList.empty()) { + std::vector>::iterator it = m_consumeMessageHookList.begin(); + for (; it != m_consumeMessageHookList.end(); ++it) { + try { + (*it)->executeHookBefore(context); + } catch (exception e) { + } + } + } +} + +void DefaultMQPushConsumerImpl::executeConsumeMessageHookAfter(ConsumeMessageContext* context) { + if (!m_consumeMessageHookList.empty()) { + std::vector>::iterator it = m_consumeMessageHookList.begin(); + for (; it != m_consumeMessageHookList.end(); ++it) { + try { + (*it)->executeHookAfter(context); + } catch (exception e) { + } + } + } +} + +void DefaultMQPushConsumerImpl::submitSendTraceRequest(MQMessage& msg, SendCallback* pSendCallback) { + if (getMessageTrace()) { + try { + LOG_DEBUG("=====Send Trace Messages,Topic[%s],Body[%s]", msg.getTopic().c_str(), msg.getBody().c_str()); + // m_DefaultMQProducerImpl->submitSendTraceRequest(msg, pSendCallback); + m_DefaultMQProducerImpl->send(msg, pSendCallback, false); + } catch (exception e) { + LOG_INFO(e.what()); + } + } +} // #include #include "AsyncCallback.h" +#include "ConsumeMessageContext.h" +#include "ConsumeMessageHook.h" +#include "DefaultMQProducerImpl.h" #include "MQConsumer.h" #include "MQMessageListener.h" #include "MQMessageQueue.h" @@ -128,6 +131,10 @@ class DefaultMQPushConsumerImpl : public MQConsumer { */ void setMaxCacheMsgSizePerQueue(int maxCacheSize); int getMaxCacheMsgSizePerQueue() const; + void submitSendTraceRequest(MQMessage& msg, SendCallback* pSendCallback); + bool hasConsumeMessageHook(); + void executeConsumeMessageHookBefore(ConsumeMessageContext* context); + void executeConsumeMessageHookAfter(ConsumeMessageContext* context); private: void checkConfig(); @@ -136,6 +143,11 @@ class DefaultMQPushConsumerImpl : public MQConsumer { bool dealWithNameSpace(); void logConfigs(); + bool dealWithMessageTrace(); + void createMessageTraceInnerProducer(); + void shutdownMessageTraceInnerProducer(); + void registerConsumeMessageHook(std::shared_ptr& hook); + private: uint64_t m_startTime; ConsumeFromWhere m_consumeFromWhere; @@ -161,6 +173,10 @@ class DefaultMQPushConsumerImpl : public MQConsumer { private: TaskQueue* m_pullmsgQueue; std::unique_ptr m_pullmsgThread; + + // used for trace + std::vector > m_consumeMessageHookList; + std::shared_ptr m_DefaultMQProducerImpl; }; //start(); - getFactory()->sendHeartbeatToAllBroker(); + if (factoryStart) { + getFactory()->start(); + getFactory()->sendHeartbeatToAllBroker(); + } m_serviceState = RUNNING; break; } @@ -97,6 +101,9 @@ void DefaultMQProducerImpl::start() { } void DefaultMQProducerImpl::shutdown() { + shutdown(true); +} +void DefaultMQProducerImpl::shutdown(bool factoryStart) { switch (m_serviceState) { case RUNNING: { LOG_INFO("DefaultMQProducerImpl shutdown"); @@ -106,7 +113,9 @@ void DefaultMQProducerImpl::shutdown() { m_trace_threadpool.join_all(); } getFactory()->unregisterProducer(this); - getFactory()->shutdown(); + if (factoryStart) { + getFactory()->shutdown(); + } m_serviceState = SHUTDOWN_ALREADY; break; } @@ -740,6 +749,7 @@ void DefaultMQProducerImpl::submitSendTraceRequest(const MQMessage& msg, SendCal void DefaultMQProducerImpl::sendTraceMessage(MQMessage& msg, SendCallback* pSendCallback) { try { + LOG_DEBUG("=====Send Trace Messages,Topic[%s],Body[%s]", msg.getTopic().c_str(), msg.getBody().c_str()); send(msg, pSendCallback, true); } catch (MQException e) { LOG_ERROR(e.what()); diff --git a/src/producer/DefaultMQProducerImpl.h b/src/producer/DefaultMQProducerImpl.h index a18723924..637c19331 100644 --- a/src/producer/DefaultMQProducerImpl.h +++ b/src/producer/DefaultMQProducerImpl.h @@ -36,6 +36,8 @@ class DefaultMQProducerImpl : public MQProducer { //setRegionId(context->getSendResult()->getRegionId()); traceBean.setMsgId(context->getMessage()->getProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)); - traceBean.setOffsetMsgId(context->getSendResult()->getMsgId()); + traceBean.setOffsetMsgId(context->getSendResult()->getOffsetMsgId()); traceBean.setStoreTime(traceContext->getTimeStamp() + (costTime / 2)); traceContext->setTraceBean(traceBean); diff --git a/src/trace/ConsumeMessageContext.cpp b/src/trace/ConsumeMessageContext.cpp new file mode 100644 index 000000000..149708b75 --- /dev/null +++ b/src/trace/ConsumeMessageContext.cpp @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ConsumeMessageContext.h" +#include +#include +#include "DefaultMQPushConsumerImpl.h" + +namespace rocketmq { +ConsumeMessageContext::ConsumeMessageContext() { + m_defaultMQPushConsumer = NULL; + // m_traceContext = NULL; +} +ConsumeMessageContext::~ConsumeMessageContext() { + m_traceContext.reset(); +} +std::string ConsumeMessageContext::getConsumerGroup() { + return m_consumerGroup; +} + +void ConsumeMessageContext::setConsumerGroup(const std::string& mConsumerGroup) { + m_consumerGroup = mConsumerGroup; +} + +bool ConsumeMessageContext::getSuccess() { + return m_success; +} + +void ConsumeMessageContext::setSuccess(bool mSuccess) { + m_success = mSuccess; +} + +std::vector ConsumeMessageContext::getMsgList() { + return m_msgList; +} + +void ConsumeMessageContext::setMsgList(std::vector mMsgList) { + m_msgList = mMsgList; +} + +std::string ConsumeMessageContext::getStatus() { + return m_status; +} + +void ConsumeMessageContext::setStatus(const std::string& mStatus) { + m_status = mStatus; +} + +int ConsumeMessageContext::getMsgIndex() { + return m_msgIndex; +} + +void ConsumeMessageContext::setMsgIndex(int mMsgIndex) { + m_msgIndex = mMsgIndex; +} + +MQMessageQueue ConsumeMessageContext::getMessageQueue() { + return m_messageQueue; +} + +void ConsumeMessageContext::setMessageQueue(const MQMessageQueue& mMessageQueue) { + m_messageQueue = mMessageQueue; +} + +DefaultMQPushConsumerImpl* ConsumeMessageContext::getDefaultMQPushConsumer() { + return m_defaultMQPushConsumer; +} + +void ConsumeMessageContext::setDefaultMQPushConsumer(DefaultMQPushConsumerImpl* mDefaultMqPushConsumer) { + m_defaultMQPushConsumer = mDefaultMqPushConsumer; +} + +std::shared_ptr ConsumeMessageContext::getTraceContext() { + return m_traceContext; +} + +void ConsumeMessageContext::setTraceContext(TraceContext* mTraceContext) { + m_traceContext.reset(mTraceContext); +} + +std::string ConsumeMessageContext::getNameSpace() { + return m_nameSpace; +} + +void ConsumeMessageContext::setNameSpace(const std::string& mNameSpace) { + m_nameSpace = mNameSpace; +} +} // namespace rocketmq diff --git a/src/trace/ConsumeMessageContext.h b/src/trace/ConsumeMessageContext.h index a4fae4a60..67024a6cd 100644 --- a/src/trace/ConsumeMessageContext.h +++ b/src/trace/ConsumeMessageContext.h @@ -17,13 +17,70 @@ #ifndef __ROCKETMQ_CONSUME_MESSAGE_CONTEXT_H__ #define __ROCKETMQ_CONSUME_MESSAGE_CONTEXT_H__ +#include #include +#include +#include "MQMessageExt.h" +#include "MQMessageQueue.h" +#include "TraceBean.h" +#include "TraceContant.h" +#include "TraceContext.h" namespace rocketmq { +class DefaultMQPushConsumerImpl; class ConsumeMessageContext { public: - virtual ~ConsumeMessageContext() {} - std::string m_context; + ConsumeMessageContext(); + + virtual ~ConsumeMessageContext(); + + std::string getConsumerGroup(); + + void setConsumerGroup(const std::string& mConsumerGroup); + + bool getSuccess(); + + void setSuccess(bool mSuccess); + + std::vector getMsgList(); + + void setMsgList(std::vector mMsgList); + + std::string getStatus(); + + void setStatus(const std::string& mStatus); + + int getMsgIndex(); + + void setMsgIndex(int mMsgIndex); + + MQMessageQueue getMessageQueue(); + + void setMessageQueue(const MQMessageQueue& mMessageQueue); + + DefaultMQPushConsumerImpl* getDefaultMQPushConsumer(); + + void setDefaultMQPushConsumer(DefaultMQPushConsumerImpl* mDefaultMqPushConsumer); + + std::shared_ptr getTraceContext(); + + void setTraceContext(TraceContext* mTraceContext); + + std::string getNameSpace(); + + void setNameSpace(const std::string& mNameSpace); + + private: + std::string m_consumerGroup; + bool m_success; + std::vector m_msgList; + std::string m_status; + int m_msgIndex; + MQMessageQueue m_messageQueue; + DefaultMQPushConsumerImpl* m_defaultMQPushConsumer; + // TraceContext* m_traceContext; + std::shared_ptr m_traceContext; + std::string m_nameSpace; }; } // namespace rocketmq #endif \ No newline at end of file diff --git a/src/trace/TraceUtil.cpp b/src/trace/TraceUtil.cpp index d0ab792bb..eaeb4de2a 100644 --- a/src/trace/TraceUtil.cpp +++ b/src/trace/TraceUtil.cpp @@ -36,29 +36,29 @@ std::string TraceUtil::CovertTraceTypeToString(TraceType type) { TraceTransferBean TraceUtil::CovertTraceContextToTransferBean(TraceContext* ctx) { std::ostringstream ss; - std::vector bens = ctx->getTraceBeans(); + std::vector beans = ctx->getTraceBeans(); switch (ctx->getTraceType()) { case Pub: { - TraceBean* ben = &bens[0]; + std::vector::iterator it = beans.begin(); ss << TraceUtil::CovertTraceTypeToString(ctx->getTraceType()) << TraceContant::CONTENT_SPLITOR; ss << ctx->getTimeStamp() << TraceContant::CONTENT_SPLITOR; ss << ctx->getRegionId() << TraceContant::CONTENT_SPLITOR; ss << ctx->getGroupName() << TraceContant::CONTENT_SPLITOR; - ss << ben->getTopic() << TraceContant::CONTENT_SPLITOR; - ss << ben->getMsgId() << TraceContant::CONTENT_SPLITOR; - ss << ben->getTags() << TraceContant::CONTENT_SPLITOR; - ss << ben->getKeys() << TraceContant::CONTENT_SPLITOR; - ss << ben->getStoreHost() << TraceContant::CONTENT_SPLITOR; - ss << ben->getBodyLength() << TraceContant::CONTENT_SPLITOR; + ss << it->getTopic() << TraceContant::CONTENT_SPLITOR; + ss << it->getMsgId() << TraceContant::CONTENT_SPLITOR; + ss << it->getTags() << TraceContant::CONTENT_SPLITOR; + ss << it->getKeys() << TraceContant::CONTENT_SPLITOR; + ss << it->getStoreHost() << TraceContant::CONTENT_SPLITOR; + ss << it->getBodyLength() << TraceContant::CONTENT_SPLITOR; ss << ctx->getCostTime() << TraceContant::CONTENT_SPLITOR; - ss << ben->getMsgType() << TraceContant::CONTENT_SPLITOR; - ss << ben->getOffsetMsgId() << TraceContant::CONTENT_SPLITOR; + ss << it->getMsgType() << TraceContant::CONTENT_SPLITOR; + ss << it->getOffsetMsgId() << TraceContant::CONTENT_SPLITOR; ss << (ctx->getStatus() ? "true" : "false") << TraceContant::FIELD_SPLITOR; } break; case SubBefore: { - std::vector::iterator it = bens.begin(); - for (; it != bens.end(); ++it) { + std::vector::iterator it = beans.begin(); + for (; it != beans.end(); ++it) { ss << TraceUtil::CovertTraceTypeToString(ctx->getTraceType()) << TraceContant::CONTENT_SPLITOR; ss << ctx->getTimeStamp() << TraceContant::CONTENT_SPLITOR; ss << ctx->getRegionId() << TraceContant::CONTENT_SPLITOR; @@ -71,14 +71,13 @@ TraceTransferBean TraceUtil::CovertTraceContextToTransferBean(TraceContext* ctx) } break; case SubAfter: { - // TraceBean* bean = &bens[ctx->getTraceBeanIndex()]; - TraceBean* bean = &bens[0]; + std::vector::iterator it = beans.begin(); ss << TraceUtil::CovertTraceTypeToString(ctx->getTraceType()) << TraceContant::CONTENT_SPLITOR; ss << ctx->getRequestId() << TraceContant::CONTENT_SPLITOR; - ss << bean->getMsgId() << TraceContant::CONTENT_SPLITOR; + ss << it->getMsgId() << TraceContant::CONTENT_SPLITOR; ss << ctx->getCostTime() << TraceContant::CONTENT_SPLITOR; ss << (ctx->getStatus() ? "true" : "false") << TraceContant::CONTENT_SPLITOR; - ss << bean->getKeys() << TraceContant::FIELD_SPLITOR; + ss << it->getKeys() << TraceContant::FIELD_SPLITOR; } break; default: @@ -89,24 +88,22 @@ TraceTransferBean TraceUtil::CovertTraceContextToTransferBean(TraceContext* ctx) transferBean.setTransData(ss.str()); switch (ctx->getTraceType()) { - case Pub: { - transferBean.setTransKey(bens[0].getMsgId()); - if (bens[0].getKeys() != "") - transferBean.setTransKey(bens[0].getKeys()); + case Pub: + case SubAfter: { + std::vector::iterator it = beans.begin(); + transferBean.setTransKey(it->getMsgId()); + if (it->getKeys() != "") { + transferBean.setTransKey(it->getKeys()); + } } break; case SubBefore: { - std::vector::iterator it = bens.begin(); - for (; it != bens.end(); ++it) { + std::vector::iterator it = beans.begin(); + for (; it != beans.end(); ++it) { transferBean.setTransKey((*it).getMsgId()); - if ((*it).getKeys() != "") + if ((*it).getKeys() != "") { transferBean.setTransKey((*it).getKeys()); + } } - - } break; - case SubAfter: { - transferBean.setTransKey(bens[0].getMsgId()); - if (bens[0].getKeys() != "") - transferBean.setTransKey(bens[0].getKeys()); } break; default: break; From 2f5a190aae41bf2554453750b4a1537d41ffc5d5 Mon Sep 17 00:00:00 2001 From: ShannonDing Date: Sun, 15 Mar 2020 22:14:13 +0800 Subject: [PATCH 3/4] feat(trace): add test case for trace message of push consumer --- src/consumer/DefaultMQPushConsumerImpl.cpp | 5 +- src/consumer/DefaultMQPushConsumerImpl.h | 4 +- .../DefaultMQPushConsumerImplTest.cpp | 119 ++++++++++++++++++ test/src/trace/TraceUtilTest.cpp | 11 ++ 4 files changed, 137 insertions(+), 2 deletions(-) create mode 100644 test/src/consumer/DefaultMQPushConsumerImplTest.cpp diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp b/src/consumer/DefaultMQPushConsumerImpl.cpp index 954960380..61ac82109 100644 --- a/src/consumer/DefaultMQPushConsumerImpl.cpp +++ b/src/consumer/DefaultMQPushConsumerImpl.cpp @@ -1160,5 +1160,8 @@ void DefaultMQPushConsumerImpl::submitSendTraceRequest(MQMessage& msg, SendCallb } } } -//& hook); + void setDefaultMqProducerImpl(DefaultMQProducerImpl* DefaultMqProducerImpl); void executeConsumeMessageHookBefore(ConsumeMessageContext* context); void executeConsumeMessageHookAfter(ConsumeMessageContext* context); @@ -146,7 +149,6 @@ class DefaultMQPushConsumerImpl : public MQConsumer { bool dealWithMessageTrace(); void createMessageTraceInnerProducer(); void shutdownMessageTraceInnerProducer(); - void registerConsumeMessageHook(std::shared_ptr& hook); private: uint64_t m_startTime; diff --git a/test/src/consumer/DefaultMQPushConsumerImplTest.cpp b/test/src/consumer/DefaultMQPushConsumerImplTest.cpp new file mode 100644 index 000000000..cfdc95fc6 --- /dev/null +++ b/test/src/consumer/DefaultMQPushConsumerImplTest.cpp @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +#include "ConsumeMessageContext.h" +#include "ConsumeMessageHookImpl.h" +#include "DefaultMQProducerImpl.h" +#include "DefaultMQPushConsumerImpl.h" +#include "MQMessageExt.h" +#include "MQMessageQueue.h" + +using namespace std; +using namespace rocketmq; +using rocketmq::DefaultMQProducerImpl; +using rocketmq::DefaultMQPushConsumerImpl; +using testing::_; +using ::testing::InitGoogleMock; +using ::testing::InitGoogleTest; +using testing::Return; + +class MockDefaultMQProducerImpl : public DefaultMQProducerImpl { + public: + MockDefaultMQProducerImpl(const string& groupId) : DefaultMQProducerImpl(groupId) {} + MOCK_METHOD3(send, void(MQMessage&, SendCallback*, bool)); +}; +TEST(DefaultMQPushConsumerImplTest, init) { + DefaultMQPushConsumerImpl* impl = new DefaultMQPushConsumerImpl("testMQConsumerGroup"); + EXPECT_EQ(impl->getGroupName(), "testMQConsumerGroup"); + impl->setUnitName("testUnit"); + EXPECT_EQ(impl->getUnitName(), "testUnit"); + impl->setTcpTransportPullThreadNum(64); + EXPECT_EQ(impl->getTcpTransportPullThreadNum(), 64); + impl->setTcpTransportConnectTimeout(2000); + EXPECT_EQ(impl->getTcpTransportConnectTimeout(), 2000); + impl->setTcpTransportTryLockTimeout(3000); + EXPECT_EQ(impl->getTcpTransportTryLockTimeout(), 3); + impl->setNamesrvAddr("http://rocketmq.nameserver.com"); + EXPECT_EQ(impl->getNamesrvAddr(), "rocketmq.nameserver.com"); + impl->setNameSpace("MQ_INST_NAMESPACE_TEST"); + EXPECT_EQ(impl->getNameSpace(), "MQ_INST_NAMESPACE_TEST"); + impl->setMessageTrace(true); + EXPECT_TRUE(impl->getMessageTrace()); + impl->setAsyncPull(true); + + impl->setConsumeMessageBatchMaxSize(3000); + EXPECT_EQ(impl->getConsumeMessageBatchMaxSize(), 3000); + + impl->setConsumeThreadCount(3); + EXPECT_EQ(impl->getConsumeThreadCount(), 3); + + impl->setMaxReconsumeTimes(30); + EXPECT_EQ(impl->getMaxReconsumeTimes(), 30); + + impl->setMaxCacheMsgSizePerQueue(3000); + EXPECT_EQ(impl->getMaxCacheMsgSizePerQueue(), 3000); + + impl->setPullMsgThreadPoolCount(10); + EXPECT_EQ(impl->getPullMsgThreadPoolCount(), 10); +} + +TEST(DefaultMQPushConsumerImpl, Trace) { + DefaultMQPushConsumerImpl* impl = new DefaultMQPushConsumerImpl(); + MockDefaultMQProducerImpl* implProducer = new MockDefaultMQProducerImpl("testMockProducerTraceGroup"); + std::shared_ptr hook(new ConsumeMessageHookImpl()); + impl->setMessageTrace(true); + impl->setDefaultMqProducerImpl(implProducer); + impl->registerConsumeMessageHook(hook); + EXPECT_CALL(*implProducer, send(_, _, _)).WillRepeatedly(Return()); + + ConsumeMessageContext consumeMessageContext; + MQMessageQueue messageQueue("TestTopic", "BrokerA", 0); + MQMessageExt messageExt; + messageExt.setMsgId("MessageID"); + messageExt.setKeys("MessageKey"); + vector msgs; + consumeMessageContext.setDefaultMQPushConsumer(impl); + consumeMessageContext.setConsumerGroup("testMockProducerTraceGroup"); + consumeMessageContext.setMessageQueue(messageQueue); + consumeMessageContext.setMsgList(msgs); + consumeMessageContext.setSuccess(false); + consumeMessageContext.setNameSpace("NameSpace"); + impl->executeConsumeMessageHookBefore(&consumeMessageContext); + impl->executeConsumeMessageHookAfter(&consumeMessageContext); + + msgs.push_back(messageExt); + consumeMessageContext.setMsgList(msgs); + + impl->executeConsumeMessageHookBefore(&consumeMessageContext); + + consumeMessageContext.setMsgIndex(0); + consumeMessageContext.setStatus("CONSUME_SUCCESS"); + consumeMessageContext.setSuccess(true); + impl->executeConsumeMessageHookAfter(&consumeMessageContext); + EXPECT_TRUE(impl->hasConsumeMessageHook()); + delete implProducer; +} +int main(int argc, char* argv[]) { + InitGoogleMock(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/test/src/trace/TraceUtilTest.cpp b/test/src/trace/TraceUtilTest.cpp index 38d8c3d27..b5e420262 100644 --- a/test/src/trace/TraceUtilTest.cpp +++ b/test/src/trace/TraceUtilTest.cpp @@ -41,6 +41,7 @@ TEST(TraceUtil, CovertTraceTypeToString) { EXPECT_EQ(TraceUtil::CovertTraceTypeToString(TraceType::Pub), TraceContant::TRACE_TYPE_PUB); EXPECT_EQ(TraceUtil::CovertTraceTypeToString(TraceType::SubBefore), TraceContant::TRACE_TYPE_BEFORE); EXPECT_EQ(TraceUtil::CovertTraceTypeToString(TraceType::SubAfter), TraceContant::TRACE_TYPE_AFTER); + EXPECT_EQ(TraceUtil::CovertTraceTypeToString((TraceType)5), TraceContant::TRACE_TYPE_PUB); } TEST(TraceUtil, CovertTraceContextToTransferBean) { TraceContext context; @@ -55,6 +56,7 @@ TEST(TraceUtil, CovertTraceContextToTransferBean) { context.setCostTime(50); context.setStatus(true); context.setTraceBean(bean); + context.setTraceBeanIndex(1); TraceTransferBean beanPub = TraceUtil::CovertTraceContextToTransferBean(&context); EXPECT_GT(beanPub.getTransKey().size(), 0); context.setTraceType(TraceType::SubBefore); @@ -64,6 +66,15 @@ TEST(TraceUtil, CovertTraceContextToTransferBean) { context.setTraceType(TraceType::SubAfter); TraceTransferBean beanAfter = TraceUtil::CovertTraceContextToTransferBean(&context); EXPECT_GT(beanAfter.getTransKey().size(), 0); + + TraceContext contextFailed("testGroup"); + contextFailed.setMsgType(context.getMsgType()); + contextFailed.setTraceType((TraceType)5); + contextFailed.setRequestId(context.getRegionId()); + contextFailed.setTimeStamp(context.getTimeStamp()); + contextFailed.setTraceBeanIndex(context.getTraceBeanIndex()); + TraceTransferBean beanWrong = TraceUtil::CovertTraceContextToTransferBean(&contextFailed); + EXPECT_EQ(beanWrong.getTransKey().size(), 0); } int main(int argc, char* argv[]) { InitGoogleMock(&argc, argv); From 07926a18bb4edd2e843556d0204455cbb954e301 Mon Sep 17 00:00:00 2001 From: ShannonDing Date: Mon, 16 Mar 2020 11:11:06 +0800 Subject: [PATCH 4/4] feat(trace): add default key value to trace message to avoid the bug in broker. --- src/common/NameSpaceUtil.cpp | 9 +++++++++ src/common/NameSpaceUtil.h | 2 ++ src/consumer/ConsumeMessageHookImpl.cpp | 3 ++- src/consumer/DefaultMQPushConsumerImpl.cpp | 4 ++-- src/producer/DefaultMQProducerImpl.cpp | 7 +++---- src/producer/SendMessageHookImpl.cpp | 6 +++++- src/trace/TraceUtil.cpp | 18 ++++++++++++++---- 7 files changed, 37 insertions(+), 12 deletions(-) diff --git a/src/common/NameSpaceUtil.cpp b/src/common/NameSpaceUtil.cpp index 8f81b96bb..61866e95c 100644 --- a/src/common/NameSpaceUtil.cpp +++ b/src/common/NameSpaceUtil.cpp @@ -76,6 +76,15 @@ bool NameSpaceUtil::checkNameSpaceExistInNameServer(string nameServerAddr) { return false; } +string NameSpaceUtil::withoutNameSpace(string source, string nameSpace) { + if (!nameSpace.empty()) { + auto index = source.find(nameSpace); + if (index != string::npos) { + return source.substr(index + nameSpace.length() + NAMESPACE_SPLIT_FLAG.length(), source.length()); + } + } + return source; +} string NameSpaceUtil::withNameSpace(string source, string ns) { if (!ns.empty()) { return ns + NAMESPACE_SPLIT_FLAG + source; diff --git a/src/common/NameSpaceUtil.h b/src/common/NameSpaceUtil.h index a63d6475f..cdaaf6bed 100644 --- a/src/common/NameSpaceUtil.h +++ b/src/common/NameSpaceUtil.h @@ -43,6 +43,8 @@ class NameSpaceUtil { static string withNameSpace(string source, string ns); + static string withoutNameSpace(string source, string ns); + static bool hasNameSpace(string source, string ns); }; diff --git a/src/consumer/ConsumeMessageHookImpl.cpp b/src/consumer/ConsumeMessageHookImpl.cpp index dc3da3cc1..a1045c69f 100644 --- a/src/consumer/ConsumeMessageHookImpl.cpp +++ b/src/consumer/ConsumeMessageHookImpl.cpp @@ -22,6 +22,7 @@ #include "DefaultMQPushConsumerImpl.h" #include "Logging.h" #include "MQClientException.h" +#include "NameSpaceUtil.h" #include "TraceContant.h" #include "TraceContext.h" #include "TraceTransferBean.h" @@ -48,7 +49,7 @@ void ConsumeMessageHookImpl::executeHookBefore(ConsumeMessageContext* context) { TraceContext* traceContext = new TraceContext(); context->setTraceContext(traceContext); traceContext->setTraceType(SubBefore); - traceContext->setGroupName(context->getConsumerGroup()); + traceContext->setGroupName(NameSpaceUtil::withoutNameSpace(context->getConsumerGroup(), context->getNameSpace())); std::vector beans; std::vector msgs = context->getMsgList(); diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp b/src/consumer/DefaultMQPushConsumerImpl.cpp index 61ac82109..c5038046b 100644 --- a/src/consumer/DefaultMQPushConsumerImpl.cpp +++ b/src/consumer/DefaultMQPushConsumerImpl.cpp @@ -191,7 +191,6 @@ class AsyncPullCallback : public PullCallback { bool m_bShutdown; }; -//submitSendTraceRequest(msg, pSendCallback); m_DefaultMQProducerImpl->send(msg, pSendCallback, false); } catch (exception e) { diff --git a/src/producer/DefaultMQProducerImpl.cpp b/src/producer/DefaultMQProducerImpl.cpp index 380024bda..bd2556b63 100644 --- a/src/producer/DefaultMQProducerImpl.cpp +++ b/src/producer/DefaultMQProducerImpl.cpp @@ -37,7 +37,6 @@ namespace rocketmq { -//setDefaultMqProducer(this); - pSendMesgContext->setProducerGroup(getGroupName()); + pSendMesgContext->setProducerGroup(NameSpaceUtil::withoutNameSpace(getGroupName(), getNameSpace())); pSendMesgContext->setCommunicationMode(static_cast(communicationMode)); pSendMesgContext->setBornHost(UtilAll::getLocalAddress()); pSendMesgContext->setBrokerAddr(brokerAddr); @@ -749,12 +748,12 @@ void DefaultMQProducerImpl::submitSendTraceRequest(const MQMessage& msg, SendCal void DefaultMQProducerImpl::sendTraceMessage(MQMessage& msg, SendCallback* pSendCallback) { try { - LOG_DEBUG("=====Send Trace Messages,Topic[%s],Body[%s]", msg.getTopic().c_str(), msg.getBody().c_str()); + LOG_DEBUG("=====Send Trace Messages,Topic[%s],Key[%s],Body[%s]", msg.getTopic().c_str(), msg.getKeys().c_str(), + msg.getBody().c_str()); send(msg, pSendCallback, true); } catch (MQException e) { LOG_ERROR(e.what()); // throw e; } } -// #include #include "DefaultMQProducerImpl.h" +#include "Logging.h" #include "MQClientException.h" #include "SendMessageContext.h" #include "TraceContant.h" @@ -30,7 +31,10 @@ using namespace std; namespace rocketmq { class TraceMessageSendCallback : public SendCallback { - virtual void onSuccess(SendResult& sendResult) {} + virtual void onSuccess(SendResult& sendResult) { + LOG_DEBUG("TraceMessageSendCallback, MsgId:[%s],OffsetMsgId[%s]", sendResult.getMsgId().c_str(), + sendResult.getOffsetMsgId().c_str()); + } virtual void onException(MQException& e) {} }; static TraceMessageSendCallback* callback = new TraceMessageSendCallback(); diff --git a/src/trace/TraceUtil.cpp b/src/trace/TraceUtil.cpp index eaeb4de2a..a95fcf8af 100644 --- a/src/trace/TraceUtil.cpp +++ b/src/trace/TraceUtil.cpp @@ -64,9 +64,14 @@ TraceTransferBean TraceUtil::CovertTraceContextToTransferBean(TraceContext* ctx) ss << ctx->getRegionId() << TraceContant::CONTENT_SPLITOR; ss << ctx->getGroupName() << TraceContant::CONTENT_SPLITOR; ss << ctx->getRequestId() << TraceContant::CONTENT_SPLITOR; - ss << (*it).getMsgId() << TraceContant::CONTENT_SPLITOR; - ss << (*it).getRetryTimes() << TraceContant::CONTENT_SPLITOR; - ss << (*it).getKeys() << TraceContant::FIELD_SPLITOR; + ss << it->getMsgId() << TraceContant::CONTENT_SPLITOR; + ss << it->getRetryTimes() << TraceContant::CONTENT_SPLITOR; + // this is a bug caused by broker. + std::string defaultKey = "dKey"; + if (!it->getKeys().empty()) { + defaultKey = it->getKeys(); + } + ss << defaultKey << TraceContant::FIELD_SPLITOR; } } break; @@ -77,7 +82,12 @@ TraceTransferBean TraceUtil::CovertTraceContextToTransferBean(TraceContext* ctx) ss << it->getMsgId() << TraceContant::CONTENT_SPLITOR; ss << ctx->getCostTime() << TraceContant::CONTENT_SPLITOR; ss << (ctx->getStatus() ? "true" : "false") << TraceContant::CONTENT_SPLITOR; - ss << it->getKeys() << TraceContant::FIELD_SPLITOR; + // this is a bug caused by broker. + std::string defaultKey = "dKey"; + if (!it->getKeys().empty()) { + defaultKey = it->getKeys(); + } + ss << defaultKey << TraceContant::FIELD_SPLITOR; } break; default: