Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support producer transaction message #154

Merged
merged 15 commits into from
Jul 24, 2019
135 changes: 135 additions & 0 deletions example/TransactionProducer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <atomic>
#include <condition_variable>
#include <iomanip>
#include <iostream>
#include <mutex>
#include <thread>
#include "TransactionListener.h"
#include "TransactionMQProducer.h"
#include "TransactionSendResult.h"
#include "common.h"

using namespace rocketmq;

std::atomic<bool> g_quit;
std::mutex g_mtx;
std::condition_variable g_finished;
TpsReportService g_tps;

class MyTransactionListener : public TransactionListener {
virtual LocalTransactionState executeLocalTransaction(const MQMessage& msg, void* arg) {

if (!arg) {
std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId() << ", return state: COMMIT_MESAGE " << endl;
return LocalTransactionState::COMMIT_MESSAGE;
}

LocalTransactionState state = (LocalTransactionState)(*(int*)arg % 3);
std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId() << ", return state: " << state << endl;
return state;
}

virtual LocalTransactionState checkLocalTransaction(const MQMessageExt& msg) {

std::cout << "checkLocalTransaction enter msg:" << msg.toString() << endl;
return LocalTransactionState::COMMIT_MESSAGE;
}
};

void SyncProducerWorker(RocketmqSendAndConsumerArgs* info, TransactionMQProducer* producer) {
while (!g_quit.load()) {
if (g_msgCount.load() <= 0) {
std::this_thread::sleep_for(std::chrono::seconds(60));
std::unique_lock<std::mutex> lck(g_mtx);
g_finished.notify_one();
break;
}

MQMessage msg(info->topic, // topic
"*", // tag
info->body); // body
try {
auto start = std::chrono::system_clock::now();
std::cout << "before sendMessageInTransaction" << endl;
LocalTransactionState state = LocalTransactionState::UNKNOW;
TransactionSendResult sendResult = producer->sendMessageInTransaction(msg, &state);
std::cout << "after sendMessageInTransaction msgId: " << sendResult.getMsgId() << endl;
g_tps.Increment();
--g_msgCount;
auto end = std::chrono::system_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
if (duration.count() >= 500) {
std::cout << "send RT more than: " << duration.count() << " ms with msgid: " << sendResult.getMsgId() << endl;
}
} catch (const MQException& e) {
std::cout << "send failed: " << e.what() << std::endl;
}
}
}

int main(int argc, char* argv[]) {
RocketmqSendAndConsumerArgs info;
if (!ParseArgs(argc, argv, &info)) {
exit(-1);
}
PrintRocketmqSendAndConsumerArgs(info);
TransactionMQProducer producer("please_rename_unique_group_name");
producer.setNamesrvAddr(info.namesrv);
producer.setNamesrvDomain(info.namesrv_domain);
producer.setGroupName(info.groupname);
producer.setInstanceName(info.groupname);
producer.setSessionCredentials("mq acesskey", "mq secretkey", "ALIYUN");
producer.setSendMsgTimeout(500);
producer.setTcpTransportTryLockTimeout(1000);
producer.setTcpTransportConnectTimeout(400);
producer.setLogLevel(eLOG_LEVEL_DEBUG);
producer.setTransactionListener(new MyTransactionListener());
producer.start();
std::vector<std::shared_ptr<std::thread>> work_pool;
auto start = std::chrono::system_clock::now();
int msgcount = g_msgCount.load();
g_tps.start();

int threadCount = info.thread_count;
for (int j = 0; j < threadCount; j++) {
std::shared_ptr<std::thread> th = std::make_shared<std::thread>(SyncProducerWorker, &info, &producer);
work_pool.push_back(th);
}

{
std::unique_lock<std::mutex> lck(g_mtx);
g_finished.wait(lck);
g_quit.store(true);
}

auto end = std::chrono::system_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);

std::cout << "per msg time: " << duration.count() / (double)msgcount << "ms \n"
<< "========================finished==============================\n";

for (size_t th = 0; th != work_pool.size(); ++th) {
work_pool[th]->join();
}

producer.shutdown();

return 0;
}
4 changes: 4 additions & 0 deletions include/MQMessage.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ class ROCKETMQCLIENT_API MQMessage {
void setBody(const char* body, int len);
void setBody(const std::string& body);

void setTransactionId(const std::string& id) { m_transactionId = id; }
std::string getTransactionId() const { return m_transactionId; }

std::map<std::string, std::string> getProperties() const;
void setProperties(std::map<std::string, std::string>& properties);

Expand Down Expand Up @@ -132,6 +135,7 @@ class ROCKETMQCLIENT_API MQMessage {
std::string m_topic;
int m_flag;
std::string m_body;
std::string m_transactionId;
std::map<std::string, std::string> m_properties;
};
//<!***************************************************************************
Expand Down
10 changes: 9 additions & 1 deletion include/SendResult.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,26 @@ class ROCKETMQCLIENT_API SendResult {
SendResult(const SendResult& other);
SendResult& operator=(const SendResult& other);

void setTransactionId(const std::string& id) {
m_transactionId = id;
}

std::string getTransactionId() { return m_transactionId; }

const std::string& getMsgId() const;
const std::string& getOffsetMsgId() const;
SendStatus getSendStatus() const;
MQMessageQueue getMessageQueue() const;
int64 getQueueOffset() const;

std::string toString() const;

private:
SendStatus m_sendStatus;
std::string m_msgId;
std::string m_offsetMsgId;
MQMessageQueue m_messageQueue;
int64 m_queueOffset;
std::string m_transactionId;
};

//<!***************************************************************************
Expand Down
48 changes: 48 additions & 0 deletions include/TransactionListener.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef __TRANSACTIONLISTENER_H__
#define __TRANSACTIONLISTENER_H__

#include "MQMessage.h"
#include "MQMessageExt.h"
#include "TransactionSendResult.h"

namespace rocketmq {
class ROCKETMQCLIENT_API TransactionListener {
public:
virtual ~TransactionListener() {}
/**
* When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
*
* @param msg Half(prepare) message
* @param arg Custom business parameter
* @return Transaction state
*/
virtual LocalTransactionState executeLocalTransaction(const MQMessage& msg, void* arg) = 0;

/**
* When no response to prepare(half) message. broker will send check message to check the transaction status, and this
* method will be invoked to get local transaction status.
*
* @param msg Check message
* @return Transaction state
*/
virtual LocalTransactionState checkLocalTransaction(const MQMessageExt& msg) = 0;
};
} // namespace rocketmq
#endif
74 changes: 74 additions & 0 deletions include/TransactionMQProducer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef __TRANSACTIONMQPRODUCER_H__
#define __TRANSACTIONMQPRODUCER_H__

#include <boost/asio.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/weak_ptr.hpp>
#include <memory>
#include <string>
#include "DefaultMQProducer.h"
#include "MQMessageExt.h"
#include "TransactionListener.h"
#include "TransactionSendResult.h"

namespace rocketmq {

class ROCKETMQCLIENT_API TransactionMQProducer : public DefaultMQProducer {
public:
TransactionMQProducer(const std::string& producerGroup)
: DefaultMQProducer(producerGroup), m_thread_num(1), m_ioServiceWork(m_ioService) {}
virtual ~TransactionMQProducer() {}
void start();
void shutdown();
std::shared_ptr<TransactionListener> getTransactionListener() { return m_transactionListener; }
void setTransactionListener(TransactionListener* listener) { m_transactionListener.reset(listener); }
TransactionSendResult sendMessageInTransaction(MQMessage& msg, void* arg);
void checkTransactionState(const std::string& addr,
const MQMessageExt& message,
long tranStateTableOffset,
long commitLogOffset,
const std::string& msgId,
const std::string& transactionId,
const std::string& offsetMsgId);

private:
void initTransactionEnv();
void destroyTransactionEnv();
void endTransaction(SendResult& sendResult, LocalTransactionState& localTransactionState);
void checkTransactionStateImpl(const std::string& addr,
const MQMessageExt& message,
long tranStateTableOffset,
long commitLogOffset,
const std::string& msgId,
const std::string& transactionId,
const std::string& offsetMsgId);

private:
std::shared_ptr<TransactionListener> m_transactionListener;
int m_thread_num;
boost::thread_group m_threadpool;
boost::asio::io_service m_ioService;
boost::asio::io_service::work m_ioServiceWork;
};
} // namespace rocketmq

#endif
52 changes: 52 additions & 0 deletions include/TransactionSendResult.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef __TRANSACTIONSENDRESULT_H__
#define __TRANSACTIONSENDRESULT_H__

#include "SendResult.h"

namespace rocketmq {

enum LocalTransactionState {
COMMIT_MESSAGE,
ROLLBACK_MESSAGE,
UNKNOW,
jonnxu marked this conversation as resolved.
Show resolved Hide resolved
};

class ROCKETMQCLIENT_API TransactionSendResult : public SendResult {
public:
TransactionSendResult() {}

TransactionSendResult(const SendStatus& sendStatus,
const std::string& msgId,
const std::string& offsetMsgId,
const MQMessageQueue& messageQueue,
int64 queueOffset)
: SendResult(sendStatus, msgId, offsetMsgId, messageQueue, queueOffset) {}

LocalTransactionState getLocalTransactionState() { return m_localTransactionState; }

void setLocalTransactionState(LocalTransactionState localTransactionState) {
m_localTransactionState = localTransactionState;
}

private:
LocalTransactionState m_localTransactionState;
};
} // namespace rocketmq
#endif
18 changes: 16 additions & 2 deletions src/MQClientAPIImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,20 @@ void MQClientAPIImpl::createTopic(const string& addr,
THROW_MQEXCEPTION(MQBrokerException, "response is null", -1);
}

void MQClientAPIImpl::endTransactionOneway(
std::string addr,
EndTransactionRequestHeader* requestHeader,
std::string remark,
const SessionCredentials& sessionCredentials) {

RemotingCommand request(END_TRANSACTION, requestHeader);
request.setRemark(remark);
callSignatureBeforeRequest(addr, request, sessionCredentials);
request.Encode();
m_pRemotingClient->invokeOneway(addr, request);
return;
}

SendResult MQClientAPIImpl::sendMessage(const string& addr,
const string& brokerName,
const MQMessage& msg,
Expand Down Expand Up @@ -373,9 +387,9 @@ SendResult MQClientAPIImpl::sendMessageSync(const string& addr,
unique_ptr<RemotingCommand> pResponse(m_pRemotingClient->invokeSync(addr, request, timeoutMillis));
if (pResponse != NULL) {
try {
LOG_DEBUG("sendMessageSync success:%s to addr:%s,brokername:%s", msg.toString().c_str(), addr.c_str(),
brokerName.c_str());
SendResult result = processSendResponse(brokerName, msg, pResponse.get());
LOG_DEBUG("sendMessageSync success:%s to addr:%s,brokername:%s, send status:%d", msg.toString().c_str(), addr.c_str(),
brokerName.c_str(), (int)result.getSendStatus());
return result;
} catch (...) {
LOG_ERROR("send error");
Expand Down
Loading