Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export send batch messages api in c style. #139

Merged
merged 4 commits into from
Jul 8, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions example/CBatchProducer.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <stdio.h>
#include "CBatchMessage.h"
#include "CCommon.h"
#include "CMessage.h"
#include "CProducer.h"
#include "CSendResult.h"

void StartSendMessage(CProducer* producer) {
int i = 0;
int ret_code = 0;
char body[128];
CBatchMessage* batchMessage = CreateBatchMessage("T_TestTopic");

for (i = 0; i < 10; i++) {
CMessage* msg = CreateMessage("T_TestTopic");
SetMessageTags(msg, "Test_Tag");
SetMessageKeys(msg, "Test_Keys");
memset(body, 0, sizeof(body));
snprintf(body, sizeof(body), "new message body, index %d", i);
SetMessageBody(msg, body);
addMessage(batchMessage, msg);
}
CSendResult result;
int ok = SendBatchMessage(producer, batchMessage, &result);
printf("SendBatchMessage is %s .....\n", ok == 0 ? "Success" : ok == 11 ? "FAILED" : " It is null value");
DestroyBatchMessage(batchMessage);
}

void CreateProducerAndStartSendMessage() {
printf("Producer Initializing.....\n");
CProducer* producer = CreateProducer("Group_producer");
SetProducerNameServerAddress(producer, "127.0.0.1:9876");
StartProducer(producer);
printf("Producer start.....\n");
StartSendMessage(producer);
ShutdownProducer(producer);
DestroyProducer(producer);
printf("Producer Shutdown!\n");
}

int main(int argc, char* argv[]) {
printf("Send Batch.....\n");
CreateProducerAndStartSendMessage();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only call CreateProducerAndStartSendMessage in main, so i think the function is unnecessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code structure is clear and consistent with the C case in other examples, which is easy to understand.

代码结构清晰点,同时与其他example中c案例一致,便于理解

return 0;
}
36 changes: 36 additions & 0 deletions include/CBatchMessage.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef __C_BATCHMESSAGE_H__
#define __C_BATCHMESSAGE_H__
#include "CCommon.h"
#include "CMessage.h"

#ifdef __cplusplus
extern "C" {
#endif

typedef struct CBatchMessage CBatchMessage;

ROCKETMQCLIENT_API CBatchMessage* CreateBatchMessage();
ROCKETMQCLIENT_API int addMessage(CBatchMessage* batchMsg, CMessage* msg);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename with big camel-case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already processed

ROCKETMQCLIENT_API int DestroyBatchMessage(CBatchMessage* batchMsg);

#ifdef __cplusplus
};
#endif
#endif //__C_BATCHMESSAGE_H__
4 changes: 3 additions & 1 deletion include/CProducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#ifndef __C_PRODUCER_H__
#define __C_PRODUCER_H__

#include "CBatchMessage.h"
#include "CMessage.h"
#include "CSendResult.h"
#include "CMQException.h"
Expand Down Expand Up @@ -53,6 +54,7 @@ ROCKETMQCLIENT_API int SetProducerCompressLevel(CProducer* producer, int level);
ROCKETMQCLIENT_API int SetProducerMaxMessageSize(CProducer* producer, int size);

ROCKETMQCLIENT_API int SendMessageSync(CProducer* producer, CMessage* msg, CSendResult* result);
ROCKETMQCLIENT_API int SendBatchMessage(CProducer* producer, CBatchMessage* msg, CSendResult* result);
ROCKETMQCLIENT_API int SendMessageAsync(CProducer* producer,
CMessage* msg,
CSendSuccessCallback cSendSuccessCallback,
Expand All @@ -79,4 +81,4 @@ ROCKETMQCLIENT_API int SendMessageOrderlyAsync(CProducer* producer,
#ifdef __cplusplus
};
#endif
#endif //__C_PRODUCER_H__
#endif //__C_PRODUCER_H__
59 changes: 59 additions & 0 deletions src/extern/CBatchMessage.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <vector>

#include "CBatchMessage.h"
#include "CCommon.h"
#include "CMessage.h"
#include "MQMessage.h"

using std::vector;

#ifdef __cplusplus
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this file is cpp, the if is unnecessary. and i think the extern is unnecessary too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the C interface and must be ifdef
这是c接口,必须ifdef

Copy link
Contributor

@ifplusor ifplusor Apr 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个肯定是要用c++编译器编译的,只要用了c++编译器,就一定会定义__cplusplus,所以你的说法是没道理的。

Copy link
Contributor

@ifplusor ifplusor Apr 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c接口应该只要在声明的时候用extern “c”包起来就可以了,.cpp实现的时候没必要再次用extern c。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

其他extern实现类,都是这样实现的。如果需要进行整理,大家讨论下。

extern "C" {
#endif

using namespace rocketmq;

CBatchMessage *CreateBatchMessage() {
vector<MQMessage> *msgs = new vector<MQMessage>();
return (CBatchMessage *) msgs;
}

int addMessage(CBatchMessage *batchMsg, CMessage *msg) {
if (msg == NULL) {
return NULL_POINTER;
}
if (batchMsg == NULL) {
return NULL_POINTER;
}
MQMessage *message = (MQMessage *) msg;
((vector<MQMessage> *) batchMsg)->push_back(*message);
return OK;
}
int DestroyBatchMessage(CBatchMessage *batchMsg) {
if (batchMsg == NULL) {
return NULL_POINTER;
}
delete (vector<MQMessage> *) batchMsg;
return OK;
}

#ifdef __cplusplus
};
#endif
38 changes: 37 additions & 1 deletion src/extern/CProducer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "DefaultMQProducer.h"
#include "AsyncCallback.h"
#include "CBatchMessage.h"
#include "CProducer.h"
#include "CCommon.h"
#include "CSendResult.h"
Expand Down Expand Up @@ -156,6 +157,41 @@ int SendMessageSync(CProducer* producer, CMessage* msg, CSendResult* result) {
return OK;
}

int SendBatchMessage(CProducer* producer, CBatchMessage* batcMsg, CSendResult* result) {
// CSendResult sendResult;
if (producer == NULL || batcMsg == NULL || result == NULL) {
return NULL_POINTER;
}
try {
DefaultMQProducer* defaultMQProducer = (DefaultMQProducer*)producer;
vector<MQMessage>* message = (vector<MQMessage>*)batcMsg;
SendResult sendResult = defaultMQProducer->send(*message);
switch (sendResult.getSendStatus()) {
case SEND_OK:
result->sendStatus = E_SEND_OK;
break;
case SEND_FLUSH_DISK_TIMEOUT:
result->sendStatus = E_SEND_FLUSH_DISK_TIMEOUT;
break;
case SEND_FLUSH_SLAVE_TIMEOUT:
result->sendStatus = E_SEND_FLUSH_SLAVE_TIMEOUT;
break;
case SEND_SLAVE_NOT_AVAILABLE:
result->sendStatus = E_SEND_SLAVE_NOT_AVAILABLE;
break;
default:
result->sendStatus = E_SEND_OK;
break;
}
result->offset = sendResult.getQueueOffset();
strncpy(result->msgId, sendResult.getMsgId().c_str(), MAX_MESSAGE_ID_LENGTH - 1);
result->msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
} catch (exception& e) {
return PRODUCER_SEND_SYNC_FAILED;
}
return OK;
}

int SendMessageAsync(CProducer* producer,
CMessage* msg,
CSendSuccessCallback cSendSuccessCallback,
Expand Down Expand Up @@ -346,4 +382,4 @@ int SetProducerMaxMessageSize(CProducer* producer, int size) {
}
#ifdef __cplusplus
};
#endif
#endif