Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(producer): Support user data in async callback #193

Merged
merged 1 commit into from
Sep 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions include/CProducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@ typedef struct CProducer CProducer;
typedef int (*QueueSelectorCallback)(int size, CMessage* msg, void* arg);
typedef void (*CSendSuccessCallback)(CSendResult result);
typedef void (*CSendExceptionCallback)(CMQException e);
typedef void (*COnSendSuccessCallback)(CSendResult result, CMessage* msg, void* userData);
typedef void (*COnSendExceptionCallback)(CMQException e, CMessage* msg, void* userData);

ROCKETMQCLIENT_API CProducer* CreateProducer(const char* groupId);
ROCKETMQCLIENT_API CProducer* CreateOrderlyProducer(const char* groupId);
ROCKETMQCLIENT_API CProducer* CreateTransactionProducer(const char* groupId);
ROCKETMQCLIENT_API int DestroyProducer(CProducer* producer);
ROCKETMQCLIENT_API int StartProducer(CProducer* producer);
ROCKETMQCLIENT_API int ShutdownProducer(CProducer* producer);
Expand All @@ -60,6 +63,11 @@ ROCKETMQCLIENT_API int SendMessageAsync(CProducer* producer,
CMessage* msg,
CSendSuccessCallback cSendSuccessCallback,
CSendExceptionCallback cSendExceptionCallback);
ROCKETMQCLIENT_API int SendAsync(CProducer* producer,
CMessage* msg,
COnSendSuccessCallback cSendSuccessCallback,
COnSendExceptionCallback cSendExceptionCallback,
void* userData);
ROCKETMQCLIENT_API int SendMessageOneway(CProducer* producer, CMessage* msg);
ROCKETMQCLIENT_API int SendMessageOnewayOrderly(CProducer* producer,
CMessage* msg,
Expand Down
70 changes: 70 additions & 0 deletions src/extern/CProducer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,54 @@ class SelectMessageQueue : public MessageQueueSelector {
private:
QueueSelectorCallback m_pCallback;
};
class COnSendCallback : public AutoDeleteSendCallBack {
public:
COnSendCallback(COnSendSuccessCallback cSendSuccessCallback,
COnSendExceptionCallback cSendExceptionCallback,
void* message,
void* userData) {
m_cSendSuccessCallback = cSendSuccessCallback;
m_cSendExceptionCallback = cSendExceptionCallback;
m_message = message;
m_userData = userData;
}

virtual ~COnSendCallback() {}

virtual void onSuccess(SendResult& sendResult) {
CSendResult result;
result.sendStatus = CSendStatus((int)sendResult.getSendStatus());
result.offset = sendResult.getQueueOffset();
strncpy(result.msgId, sendResult.getMsgId().c_str(), MAX_MESSAGE_ID_LENGTH - 1);
result.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
m_cSendSuccessCallback(result, (CMessage*)m_message, m_userData);
}

virtual void onException(MQException& e) {
CMQException exception;
exception.error = e.GetError();
exception.line = e.GetLine();
strncpy(exception.msg, e.what(), MAX_EXEPTION_MSG_LENGTH - 1);
strncpy(exception.file, e.GetFile(), MAX_EXEPTION_FILE_LENGTH - 1);
m_cSendExceptionCallback(exception, (CMessage*)m_message, m_userData);
}

private:
COnSendSuccessCallback m_cSendSuccessCallback;
COnSendExceptionCallback m_cSendExceptionCallback;
void* m_message;
void* m_userData;
};

class CSendCallback : public AutoDeleteSendCallBack {
public:
CSendCallback(CSendSuccessCallback cSendSuccessCallback, CSendExceptionCallback cSendExceptionCallback) {
m_cSendSuccessCallback = cSendSuccessCallback;
m_cSendExceptionCallback = cSendExceptionCallback;
}

virtual ~CSendCallback() {}

virtual void onSuccess(SendResult& sendResult) {
CSendResult result;
result.sendStatus = CSendStatus((int)sendResult.getSendStatus());
Expand All @@ -77,6 +117,7 @@ class CSendCallback : public AutoDeleteSendCallBack {
result.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
m_cSendSuccessCallback(result);
}

virtual void onException(MQException& e) {
CMQException exception;
exception.error = e.GetError();
Expand Down Expand Up @@ -241,6 +282,35 @@ int SendMessageAsync(CProducer* producer,
return OK;
}

int SendAsync(CProducer* producer,
CMessage* msg,
COnSendSuccessCallback onSuccess,
COnSendExceptionCallback onException,
void* usrData) {
if (producer == NULL || msg == NULL || onSuccess == NULL || onException == NULL) {
return NULL_POINTER;
}
DefaultMQProducer* defaultMQProducer = (DefaultMQProducer*)producer;
MQMessage* message = (MQMessage*)msg;
COnSendCallback* cSendCallback = new COnSendCallback(onSuccess, onException, (void*)msg, usrData);

try {
defaultMQProducer->send(*message, cSendCallback);
} catch (exception& e) {
if (cSendCallback != NULL) {
if (std::type_index(typeid(e)) == std::type_index(typeid(MQException))) {
MQException& mqe = (MQException&)e;
cSendCallback->onException(mqe);
}
delete cSendCallback;
cSendCallback = NULL;
}
MQClientErrorContainer::setErr(string(e.what()));
return PRODUCER_SEND_ASYNC_FAILED;
}
return OK;
}

int SendMessageOneway(CProducer* producer, CMessage* msg) {
if (producer == NULL || msg == NULL) {
return NULL_POINTER;
Expand Down