Skip to content

Commit

Permalink
Remove useless code such as removeDropedPullRequestOpaque and deleteO…
Browse files Browse the repository at this point in the history
…paqueForDropPullRequest.
  • Loading branch information
jonnxu authored Jul 8, 2019
2 parents 390bfe6 + f33a696 commit 8b6cb07
Show file tree
Hide file tree
Showing 12 changed files with 8 additions and 77 deletions.
22 changes: 3 additions & 19 deletions src/MQClientAPIImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,6 @@ void MQClientAPIImpl::sendMessageAsync(const string& addr,
}
}

void MQClientAPIImpl::deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque) {
m_pRemotingClient->deleteOpaqueForDropPullRequest(mq, opaque);
}

PullResult* MQClientAPIImpl::pullMessage(const string& addr,
PullMessageRequestHeader* pRequestHeader,
int timeoutMillis,
Expand Down Expand Up @@ -467,21 +463,9 @@ void MQClientAPIImpl::pullMessageAsync(const string& addr,
void* pArg) {
//<!delete in future;
AsyncCallbackWrap* cbw = new PullCallbackWarp(pullCallback, this, pArg);
MQMessageQueue mq;
AsyncArg* pAsyncArg = static_cast<AsyncArg*>(pArg);
if (pAsyncArg && pAsyncArg->pPullRequest) {
mq = pAsyncArg->mq;
pAsyncArg->pPullRequest->setLatestPullRequestOpaque(request.getOpaque());
LOG_DEBUG("pullMessageAsync set opaque:%d, mq:%s", pAsyncArg->pPullRequest->getLatestPullRequestOpaque(),
mq.toString().c_str());
}

if (m_pRemotingClient->invokeAsync(addr, request, cbw, timeoutMillis) == false) {
LOG_ERROR("pullMessageAsync failed of addr:%s, opaque:%d, mq:%s", addr.c_str(), request.getOpaque(),
mq.toString().data());
if (pAsyncArg && pAsyncArg->pPullRequest) {
pAsyncArg->pPullRequest->setLatestPullRequestOpaque(0);
}
LOG_ERROR("pullMessageAsync failed of addr:%s, mq:%s", addr.c_str(),
static_cast<AsyncArg*>(pArg)->mq.toString().data());
deleteAndZero(cbw);
THROW_MQEXCEPTION(MQClientException, "pullMessageAsync failed", -1);
}
Expand Down Expand Up @@ -919,4 +903,4 @@ void MQClientAPIImpl::unlockBatchMQ(const string& addr,
}

//<!************************************************************************
} //<!end namespace;
} // namespace rocketmq
1 change: 0 additions & 1 deletion src/MQClientAPIImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ class MQClientAPIImpl {
int64 timeoutMilliseconds,
int maxRetryTimes = 1,
int retrySendTimes = 1);
void deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque);

private:
SendResult sendMessageSync(const string& addr,
Expand Down
19 changes: 2 additions & 17 deletions src/MQClientFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -988,18 +988,6 @@ void MQClientFactory::findConsumerIds(const string& topic,
}
}

void MQClientFactory::removeDropedPullRequestOpaque(PullRequest* pullRequest) {
// delete the opaque record that's ignore the response of this pullrequest when drop pullrequest
if (!pullRequest)
return;
MQMessageQueue mq = pullRequest->m_messageQueue;
int opaque = pullRequest->getLatestPullRequestOpaque();
if (opaque > 0) {
LOG_INFO("####### need delete the pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data());
getMQClientAPIImpl()->deleteOpaqueForDropPullRequest(mq, opaque);
}
}

void MQClientFactory::resetOffset(const string& group,
const string& topic,
const map<MQMessageQueue, int64>& offsetTable) {
Expand All @@ -1012,10 +1000,7 @@ void MQClientFactory::resetOffset(const string& group,
PullRequest* pullreq = pConsumer->getRebalance()->getPullRequest(mq);
if (pullreq) {
pullreq->setDroped(true);
LOG_INFO("resetOffset setDroped for opaque:%d, mq:%s", pullreq->getLatestPullRequestOpaque(),
mq.toString().data());
// delete the opaque record that's ignore the response of this pullrequest when drop pullrequest
// removeDropedPullRequestOpaque(pullreq);
LOG_INFO("resetOffset setDroped for mq:%s", mq.toString().data());
pullreq->clearAllMsgs();
pullreq->updateQueueMaxOffset(it->second);
} else {
Expand Down Expand Up @@ -1102,4 +1087,4 @@ void MQClientFactory::getSessionCredentialsFromOneOfProducerOrConsumer(SessionCr
}

//<!************************************************************************
} //<!end namespace;
} // namespace rocketmq
1 change: 0 additions & 1 deletion src/MQClientFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ class MQClientFactory {
void addBrokerToAddrMap(const string& brokerName, map<int, string>& brokerAddrs);
map<string, map<int, string>> getBrokerAddrMap();
void clearBrokerAddrMap();
void removeDropedPullRequestOpaque(PullRequest* pullRequest);

private:
void unregisterClient(const string& producerGroup,
Expand Down
3 changes: 1 addition & 2 deletions src/common/AsyncArg.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@
#include "MQMessageQueue.h"
#include "PullAPIWrapper.h"
#include "SubscriptionData.h"
#include "../consumer/PullRequest.h"

namespace rocketmq {
//<!***************************************************************************

struct AsyncArg {
MQMessageQueue mq;
SubscriptionData subData;
PullAPIWrapper* pPullWrapper;
PullRequest* pPullRequest;
};

//<!***************************************************************************
Expand Down
1 change: 0 additions & 1 deletion src/consumer/DefaultMQPullConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,6 @@ void DefaultMQPullConsumer::pullAsyncImpl(const MQMessageQueue& mq,
arg.mq = mq;
arg.subData = *pSData;
arg.pPullWrapper = m_pPullAPIWrapper;
arg.pPullRequest = NULL;

try {
unique_ptr<PullResult> pullResult(m_pPullAPIWrapper->pullKernelImpl(mq, // 1
Expand Down
1 change: 0 additions & 1 deletion src/consumer/DefaultMQPushConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,6 @@ void DefaultMQPushConsumer::pullMessageAsync(PullRequest* request) {
arg.mq = messageQueue;
arg.subData = *pSdata;
arg.pPullWrapper = m_pPullAPIWrapper;
arg.pPullRequest = request;
try {
request->setLastPullTimestamp(UtilAll::currentTimeMillis());
m_pPullAPIWrapper->pullKernelImpl(messageQueue, // 1
Expand Down
12 changes: 0 additions & 12 deletions src/consumer/PullRequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ PullRequest::PullRequest(const string& groupname)
m_queueOffsetMax(0),
m_bDroped(false),
m_bLocked(false),
m_latestPullRequestOpaque(0),
m_bPullMsgEventInprogress(false) {}

PullRequest::~PullRequest() {
Expand All @@ -46,7 +45,6 @@ PullRequest& PullRequest::operator=(const PullRequest& other) {
m_messageQueue = other.m_messageQueue;
m_msgTreeMap = other.m_msgTreeMap;
m_msgTreeMapTemp = other.m_msgTreeMapTemp;
m_latestPullRequestOpaque = other.m_latestPullRequestOpaque;
}
return *this;
}
Expand Down Expand Up @@ -272,15 +270,5 @@ bool PullRequest::addPullMsgEvent() {
return false;
}

int PullRequest::getLatestPullRequestOpaque() {
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
return m_latestPullRequestOpaque;
}

void PullRequest::setLatestPullRequestOpaque(int opaque) {
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
m_latestPullRequestOpaque = opaque;
}

//<!***************************************************************************
} //<!end namespace;
3 changes: 0 additions & 3 deletions src/consumer/PullRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ class PullRequest {
boost::timed_mutex& getPullRequestCriticalSection();
void removePullMsgEvent();
bool addPullMsgEvent();
int getLatestPullRequestOpaque();
void setLatestPullRequestOpaque(int opaque);

public:
MQMessageQueue m_messageQueue;
Expand All @@ -89,7 +87,6 @@ class PullRequest {
// uint64 m_tryUnlockTimes;
uint64 m_lastPullTimestamp;
uint64 m_lastConsumeTimestamp;
int m_latestPullRequestOpaque;
boost::timed_mutex m_consumeLock;
boost::atomic<bool> m_bPullMsgEventInprogress;
};
Expand Down
4 changes: 2 additions & 2 deletions src/consumer/Rebalance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ bool RebalancePush::updateRequestTableInRebalance(const string& topic, vector<MQ
it->second->clearAllMsgs(); // add clear operation to avoid bad state
// when dropped pullRequest returns
// normal
LOG_INFO("drop mq:%s, delete opaque:%d", mqtemp.toString().c_str(), it->second->getLatestPullRequestOpaque());
LOG_INFO("drop mq:%s", mqtemp.toString().c_str());
}
changed = true;
}
Expand Down Expand Up @@ -634,4 +634,4 @@ void RebalancePush::removeUnnecessaryMessageQueue(const MQMessageQueue& mq) {
}

//<!************************************************************************
} //<!end namespace;
} // namespace rocketmq
16 changes: 0 additions & 16 deletions src/transport/TcpRemotingClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -639,21 +639,5 @@ void TcpRemotingClient::removeAllTimerCallback() {
m_asyncTimerTable.clear();
}

void TcpRemotingClient::deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque) {
// delete the map record of opaque<->ResponseFuture, so the answer for the pull request will
// discard when receive it later
std::shared_ptr<ResponseFuture> pFuture(findAndDeleteAsyncResponseFuture(opaque));
if (!pFuture) {
pFuture = findAndDeleteResponseFuture(opaque);
if (pFuture) {
LOG_DEBUG("succ deleted the sync pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data());
}
} else {
LOG_DEBUG("succ deleted the async pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data());
// delete the timeout timer for opaque for pullrequest
cancelTimerCallback(opaque);
}
}

//<!************************************************************************
} // namespace rocketmq
2 changes: 0 additions & 2 deletions src/transport/TcpRemotingClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ class TcpRemotingClient {

void registerProcessor(MQRequestCode requestCode, ClientRemotingProcessor* clientRemotingProcessor);

void deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque);

private:
static void static_messageReceived(void* context, const MemoryBlock& mem, const string& addr);

Expand Down

0 comments on commit 8b6cb07

Please sign in to comment.