Skip to content

Commit

Permalink
Format the code style.
Browse files Browse the repository at this point in the history
  • Loading branch information
ShannonDing committed Sep 21, 2019
1 parent 3b2c0f3 commit 352ea92
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 35 deletions.
6 changes: 4 additions & 2 deletions include/CProducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
#define __C_PRODUCER_H__

#include "CBatchMessage.h"
#include "CMQException.h"
#include "CMessage.h"
#include "CSendResult.h"
#include "CMQException.h"

#ifdef __cplusplus
extern "C" {
Expand Down Expand Up @@ -79,7 +79,9 @@ ROCKETMQCLIENT_API int SendMessageOrderlyAsync(CProducer* producer,
CSendSuccessCallback cSendSuccessCallback,
CSendExceptionCallback cSendExceptionCallback);
ROCKETMQCLIENT_API int SendMessageOrderlyByShardingKey(CProducer* producer,
CMessage* msg, const char * shardingKey, CSendResult* result);
CMessage* msg,
const char* shardingKey,
CSendResult* result);

#ifdef __cplusplus
};
Expand Down
64 changes: 31 additions & 33 deletions src/extern/CProducer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

#include "CProducer.h"
#include <string.h>
#include <functional>
#include <typeindex>
#include <string.h>
#include <typeinfo>
#include "AsyncCallback.h"
#include "CBatchMessage.h"
Expand All @@ -29,7 +29,6 @@
#include "DefaultMQProducer.h"
#include "MQClientErrorContainer.h"
#include "UtilAll.h"
#include <functional>

#ifdef __cplusplus
extern "C" {
Expand All @@ -38,14 +37,14 @@ using namespace rocketmq;
using namespace std;

class SelectMessageQueueInner : public MessageQueueSelector {
public:
MQMessageQueue select(const std::vector<MQMessageQueue>& mqs, const MQMessage& msg, void* arg) {
int index = 0;
std::string shardingKey = rocketmq::UtilAll::to_string((char *)arg);
public:
MQMessageQueue select(const std::vector<MQMessageQueue>& mqs, const MQMessage& msg, void* arg) {
int index = 0;
std::string shardingKey = rocketmq::UtilAll::to_string((char*)arg);

index = std::hash<std::string>{}(shardingKey) % mqs.size();
return mqs[index % mqs.size()];
}
index = std::hash<std::string>{}(shardingKey) % mqs.size();
return mqs[index % mqs.size()];
}
};

class SelectMessageQueue : public MessageQueueSelector {
Expand Down Expand Up @@ -100,8 +99,8 @@ CProducer* CreateProducer(const char* groupId) {
return (CProducer*)defaultMQProducer;
}

CProducer* CreateOrderlyProducer(const char* groupId){
return CreateProducer(groupId);
CProducer* CreateOrderlyProducer(const char* groupId) {
return CreateProducer(groupId);
}
int DestroyProducer(CProducer* pProducer) {
if (pProducer == NULL) {
Expand Down Expand Up @@ -325,28 +324,27 @@ int SendMessageOrderly(CProducer* producer,
}
return OK;
}
int SendMessageOrderlyByShardingKey(CProducer* producer,
CMessage* msg, const char * shardingKey, CSendResult* result){
if (producer == NULL || msg == NULL || shardingKey == NULL || result == NULL) {
return NULL_POINTER;
}
DefaultMQProducer* defaultMQProducer = (DefaultMQProducer*)producer;
MQMessage* message = (MQMessage*)msg;
try {
// Constructing SelectMessageQueue objects through function pointer callback
int retryTimes = 3;
SelectMessageQueueInner selectMessageQueue;
SendResult sendResult = defaultMQProducer->send(*message, &selectMessageQueue, (void *)shardingKey, retryTimes);
// Convert SendStatus to CSendStatus
result->sendStatus = CSendStatus((int)sendResult.getSendStatus());
result->offset = sendResult.getQueueOffset();
strncpy(result->msgId, sendResult.getMsgId().c_str(), MAX_MESSAGE_ID_LENGTH - 1);
result->msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
} catch (exception& e) {
MQClientErrorContainer::setErr(string(e.what()));
return PRODUCER_SEND_ORDERLY_FAILED;
}
return OK;
int SendMessageOrderlyByShardingKey(CProducer* producer, CMessage* msg, const char* shardingKey, CSendResult* result) {
if (producer == NULL || msg == NULL || shardingKey == NULL || result == NULL) {
return NULL_POINTER;
}
DefaultMQProducer* defaultMQProducer = (DefaultMQProducer*)producer;
MQMessage* message = (MQMessage*)msg;
try {
// Constructing SelectMessageQueue objects through function pointer callback
int retryTimes = 3;
SelectMessageQueueInner selectMessageQueue;
SendResult sendResult = defaultMQProducer->send(*message, &selectMessageQueue, (void*)shardingKey, retryTimes);
// Convert SendStatus to CSendStatus
result->sendStatus = CSendStatus((int)sendResult.getSendStatus());
result->offset = sendResult.getQueueOffset();
strncpy(result->msgId, sendResult.getMsgId().c_str(), MAX_MESSAGE_ID_LENGTH - 1);
result->msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
} catch (exception& e) {
MQClientErrorContainer::setErr(string(e.what()));
return PRODUCER_SEND_ORDERLY_FAILED;
}
return OK;
}
int SetProducerGroupName(CProducer* producer, const char* groupName) {
if (producer == NULL) {
Expand Down

0 comments on commit 352ea92

Please sign in to comment.