diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp index 8ec60449f..2038f6c54 100644 --- a/src/MQClientAPIImpl.cpp +++ b/src/MQClientAPIImpl.cpp @@ -430,10 +430,6 @@ void MQClientAPIImpl::sendMessageAsync(const string& addr, } } -void MQClientAPIImpl::deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque) { - m_pRemotingClient->deleteOpaqueForDropPullRequest(mq, opaque); -} - PullResult* MQClientAPIImpl::pullMessage(const string& addr, PullMessageRequestHeader* pRequestHeader, int timeoutMillis, @@ -467,21 +463,9 @@ void MQClientAPIImpl::pullMessageAsync(const string& addr, void* pArg) { //(pArg); - if (pAsyncArg && pAsyncArg->pPullRequest) { - mq = pAsyncArg->mq; - pAsyncArg->pPullRequest->setLatestPullRequestOpaque(request.getOpaque()); - LOG_DEBUG("pullMessageAsync set opaque:%d, mq:%s", pAsyncArg->pPullRequest->getLatestPullRequestOpaque(), - mq.toString().c_str()); - } - if (m_pRemotingClient->invokeAsync(addr, request, cbw, timeoutMillis) == false) { - LOG_ERROR("pullMessageAsync failed of addr:%s, opaque:%d, mq:%s", addr.c_str(), request.getOpaque(), - mq.toString().data()); - if (pAsyncArg && pAsyncArg->pPullRequest) { - pAsyncArg->pPullRequest->setLatestPullRequestOpaque(0); - } + LOG_ERROR("pullMessageAsync failed of addr:%s, mq:%s", addr.c_str(), + static_cast(pArg)->mq.toString().data()); deleteAndZero(cbw); THROW_MQEXCEPTION(MQClientException, "pullMessageAsync failed", -1); } @@ -919,4 +903,4 @@ void MQClientAPIImpl::unlockBatchMQ(const string& addr, } //m_messageQueue; - int opaque = pullRequest->getLatestPullRequestOpaque(); - if (opaque > 0) { - LOG_INFO("####### need delete the pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data()); - getMQClientAPIImpl()->deleteOpaqueForDropPullRequest(mq, opaque); - } -} - void MQClientFactory::resetOffset(const string& group, const string& topic, const map& offsetTable) { @@ -1012,10 +1000,7 @@ void MQClientFactory::resetOffset(const string& group, PullRequest* pullreq = pConsumer->getRebalance()->getPullRequest(mq); if (pullreq) { pullreq->setDroped(true); - LOG_INFO("resetOffset setDroped for opaque:%d, mq:%s", pullreq->getLatestPullRequestOpaque(), - mq.toString().data()); - // delete the opaque record that's ignore the response of this pullrequest when drop pullrequest - // removeDropedPullRequestOpaque(pullreq); + LOG_INFO("resetOffset setDroped for mq:%s", mq.toString().data()); pullreq->clearAllMsgs(); pullreq->updateQueueMaxOffset(it->second); } else { @@ -1102,4 +1087,4 @@ void MQClientFactory::getSessionCredentialsFromOneOfProducerOrConsumer(SessionCr } //& brokerAddrs); map> getBrokerAddrMap(); void clearBrokerAddrMap(); - void removeDropedPullRequestOpaque(PullRequest* pullRequest); private: void unregisterClient(const string& producerGroup, diff --git a/src/common/AsyncArg.h b/src/common/AsyncArg.h index 4e237436d..fc358cb0b 100644 --- a/src/common/AsyncArg.h +++ b/src/common/AsyncArg.h @@ -21,7 +21,7 @@ #include "MQMessageQueue.h" #include "PullAPIWrapper.h" #include "SubscriptionData.h" -#include "../consumer/PullRequest.h" + namespace rocketmq { // pullResult(m_pPullAPIWrapper->pullKernelImpl(mq, // 1 diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp index f9e587493..0f51ea135 100644 --- a/src/consumer/DefaultMQPushConsumer.cpp +++ b/src/consumer/DefaultMQPushConsumer.cpp @@ -784,7 +784,6 @@ void DefaultMQPushConsumer::pullMessageAsync(PullRequest* request) { arg.mq = messageQueue; arg.subData = *pSdata; arg.pPullWrapper = m_pPullAPIWrapper; - arg.pPullRequest = request; try { request->setLastPullTimestamp(UtilAll::currentTimeMillis()); m_pPullAPIWrapper->pullKernelImpl(messageQueue, // 1 diff --git a/src/consumer/PullRequest.cpp b/src/consumer/PullRequest.cpp index 5fd6d5832..162be2a4e 100644 --- a/src/consumer/PullRequest.cpp +++ b/src/consumer/PullRequest.cpp @@ -28,7 +28,6 @@ PullRequest::PullRequest(const string& groupname) m_queueOffsetMax(0), m_bDroped(false), m_bLocked(false), - m_latestPullRequestOpaque(0), m_bPullMsgEventInprogress(false) {} PullRequest::~PullRequest() { @@ -46,7 +45,6 @@ PullRequest& PullRequest::operator=(const PullRequest& other) { m_messageQueue = other.m_messageQueue; m_msgTreeMap = other.m_msgTreeMap; m_msgTreeMapTemp = other.m_msgTreeMapTemp; - m_latestPullRequestOpaque = other.m_latestPullRequestOpaque; } return *this; } @@ -272,15 +270,5 @@ bool PullRequest::addPullMsgEvent() { return false; } -int PullRequest::getLatestPullRequestOpaque() { - boost::lock_guard lock(m_pullRequestLock); - return m_latestPullRequestOpaque; -} - -void PullRequest::setLatestPullRequestOpaque(int opaque) { - boost::lock_guard lock(m_pullRequestLock); - m_latestPullRequestOpaque = opaque; -} - // m_bPullMsgEventInprogress; }; diff --git a/src/consumer/Rebalance.cpp b/src/consumer/Rebalance.cpp index a2997a714..549e41291 100644 --- a/src/consumer/Rebalance.cpp +++ b/src/consumer/Rebalance.cpp @@ -458,7 +458,7 @@ bool RebalancePush::updateRequestTableInRebalance(const string& topic, vectorsecond->clearAllMsgs(); // add clear operation to avoid bad state // when dropped pullRequest returns // normal - LOG_INFO("drop mq:%s, delete opaque:%d", mqtemp.toString().c_str(), it->second->getLatestPullRequestOpaque()); + LOG_INFO("drop mq:%s", mqtemp.toString().c_str()); } changed = true; } @@ -634,4 +634,4 @@ void RebalancePush::removeUnnecessaryMessageQueue(const MQMessageQueue& mq) { } //ResponseFuture, so the answer for the pull request will - // discard when receive it later - std::shared_ptr pFuture(findAndDeleteAsyncResponseFuture(opaque)); - if (!pFuture) { - pFuture = findAndDeleteResponseFuture(opaque); - if (pFuture) { - LOG_DEBUG("succ deleted the sync pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data()); - } - } else { - LOG_DEBUG("succ deleted the async pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data()); - // delete the timeout timer for opaque for pullrequest - cancelTimerCallback(opaque); - } -} - //