From 6202c8d0bc6d55cd993b793577a4ecf2d5b0cc25 Mon Sep 17 00:00:00 2001 From: ShannonDing Date: Wed, 25 Sep 2019 21:13:07 +0800 Subject: [PATCH] feat(producer): Support user data in async callback --- include/CProducer.h | 8 +++++ src/extern/CProducer.cpp | 70 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+) diff --git a/include/CProducer.h b/include/CProducer.h index a0968df1e..bf34d378b 100644 --- a/include/CProducer.h +++ b/include/CProducer.h @@ -32,9 +32,12 @@ typedef struct CProducer CProducer; typedef int (*QueueSelectorCallback)(int size, CMessage* msg, void* arg); typedef void (*CSendSuccessCallback)(CSendResult result); typedef void (*CSendExceptionCallback)(CMQException e); +typedef void (*COnSendSuccessCallback)(CSendResult result, CMessage* msg, void* userData); +typedef void (*COnSendExceptionCallback)(CMQException e, CMessage* msg, void* userData); ROCKETMQCLIENT_API CProducer* CreateProducer(const char* groupId); ROCKETMQCLIENT_API CProducer* CreateOrderlyProducer(const char* groupId); +ROCKETMQCLIENT_API CProducer* CreateTransactionProducer(const char* groupId); ROCKETMQCLIENT_API int DestroyProducer(CProducer* producer); ROCKETMQCLIENT_API int StartProducer(CProducer* producer); ROCKETMQCLIENT_API int ShutdownProducer(CProducer* producer); @@ -60,6 +63,11 @@ ROCKETMQCLIENT_API int SendMessageAsync(CProducer* producer, CMessage* msg, CSendSuccessCallback cSendSuccessCallback, CSendExceptionCallback cSendExceptionCallback); +ROCKETMQCLIENT_API int SendAsync(CProducer* producer, + CMessage* msg, + COnSendSuccessCallback cSendSuccessCallback, + COnSendExceptionCallback cSendExceptionCallback, + void* userData); ROCKETMQCLIENT_API int SendMessageOneway(CProducer* producer, CMessage* msg); ROCKETMQCLIENT_API int SendMessageOnewayOrderly(CProducer* producer, CMessage* msg, diff --git a/src/extern/CProducer.cpp b/src/extern/CProducer.cpp index 46216e6ba..c8ff4f148 100644 --- a/src/extern/CProducer.cpp +++ b/src/extern/CProducer.cpp @@ -61,6 +61,44 @@ class SelectMessageQueue : public MessageQueueSelector { private: QueueSelectorCallback m_pCallback; }; +class COnSendCallback : public AutoDeleteSendCallBack { + public: + COnSendCallback(COnSendSuccessCallback cSendSuccessCallback, + COnSendExceptionCallback cSendExceptionCallback, + void* message, + void* userData) { + m_cSendSuccessCallback = cSendSuccessCallback; + m_cSendExceptionCallback = cSendExceptionCallback; + m_message = message; + m_userData = userData; + } + + virtual ~COnSendCallback() {} + + virtual void onSuccess(SendResult& sendResult) { + CSendResult result; + result.sendStatus = CSendStatus((int)sendResult.getSendStatus()); + result.offset = sendResult.getQueueOffset(); + strncpy(result.msgId, sendResult.getMsgId().c_str(), MAX_MESSAGE_ID_LENGTH - 1); + result.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0; + m_cSendSuccessCallback(result, (CMessage*)m_message, m_userData); + } + + virtual void onException(MQException& e) { + CMQException exception; + exception.error = e.GetError(); + exception.line = e.GetLine(); + strncpy(exception.msg, e.what(), MAX_EXEPTION_MSG_LENGTH - 1); + strncpy(exception.file, e.GetFile(), MAX_EXEPTION_FILE_LENGTH - 1); + m_cSendExceptionCallback(exception, (CMessage*)m_message, m_userData); + } + + private: + COnSendSuccessCallback m_cSendSuccessCallback; + COnSendExceptionCallback m_cSendExceptionCallback; + void* m_message; + void* m_userData; +}; class CSendCallback : public AutoDeleteSendCallBack { public: @@ -68,7 +106,9 @@ class CSendCallback : public AutoDeleteSendCallBack { m_cSendSuccessCallback = cSendSuccessCallback; m_cSendExceptionCallback = cSendExceptionCallback; } + virtual ~CSendCallback() {} + virtual void onSuccess(SendResult& sendResult) { CSendResult result; result.sendStatus = CSendStatus((int)sendResult.getSendStatus()); @@ -77,6 +117,7 @@ class CSendCallback : public AutoDeleteSendCallBack { result.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0; m_cSendSuccessCallback(result); } + virtual void onException(MQException& e) { CMQException exception; exception.error = e.GetError(); @@ -241,6 +282,35 @@ int SendMessageAsync(CProducer* producer, return OK; } +int SendAsync(CProducer* producer, + CMessage* msg, + COnSendSuccessCallback onSuccess, + COnSendExceptionCallback onException, + void* usrData) { + if (producer == NULL || msg == NULL || onSuccess == NULL || onException == NULL) { + return NULL_POINTER; + } + DefaultMQProducer* defaultMQProducer = (DefaultMQProducer*)producer; + MQMessage* message = (MQMessage*)msg; + COnSendCallback* cSendCallback = new COnSendCallback(onSuccess, onException, (void*)msg, usrData); + + try { + defaultMQProducer->send(*message, cSendCallback); + } catch (exception& e) { + if (cSendCallback != NULL) { + if (std::type_index(typeid(e)) == std::type_index(typeid(MQException))) { + MQException& mqe = (MQException&)e; + cSendCallback->onException(mqe); + } + delete cSendCallback; + cSendCallback = NULL; + } + MQClientErrorContainer::setErr(string(e.what())); + return PRODUCER_SEND_ASYNC_FAILED; + } + return OK; +} + int SendMessageOneway(CProducer* producer, CMessage* msg) { if (producer == NULL || msg == NULL) { return NULL_POINTER;