Skip to content

Commit

Permalink
Merge pull request #4299 from marta-lokhova/minor_perf_followup
Browse files Browse the repository at this point in the history
Avoid copying StellarMessage in flooding paths

Reviewed-by: graydon
  • Loading branch information
latobarita authored May 1, 2024
2 parents 80a3185 + dca05af commit d2b09ec
Show file tree
Hide file tree
Showing 23 changed files with 73 additions and 72 deletions.
2 changes: 1 addition & 1 deletion src/bucket/Bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class Application;
class BucketManager;
class SearchableBucketListSnapshot;
struct EvictionResultEntry;
struct EvictionStatistics;
class EvictionStatistics;

class Bucket : public std::enable_shared_from_this<Bucket>,
public NonMovableOrCopyable
Expand Down
2 changes: 1 addition & 1 deletion src/bucket/BucketList.h
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ class AbstractLedgerTxn;
class Application;
class Bucket;
class Config;
class EvictionCounters;
struct EvictionCounters;
struct InflationWinner;

namespace testutil
Expand Down
5 changes: 3 additions & 2 deletions src/bucket/BucketManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1000,8 +1000,9 @@ BucketManagerImpl::startBackgroundEvictionScan(uint32_t ledgerSeq)

using task_t = std::packaged_task<EvictionResult()>;
auto task = std::make_shared<task_t>(
[bl = move(searchableBL), iter = cfg.evictionIterator(), ledgerSeq, sas,
&counters = mBucketListEvictionCounters, stats = mEvictionStatistics] {
[bl = std::move(searchableBL), iter = cfg.evictionIterator(), ledgerSeq,
sas, &counters = mBucketListEvictionCounters,
stats = mEvictionStatistics] {
return bl->scanForEviction(ledgerSeq, counters, iter, stats, sas);
});

Expand Down
6 changes: 3 additions & 3 deletions src/herder/HerderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -497,9 +497,9 @@ HerderImpl::broadcast(SCPEnvelope const& e)
ZoneScoped;
if (!mApp.getConfig().MANUAL_CLOSE)
{
StellarMessage m;
m.type(SCP_MESSAGE);
m.envelope() = e;
auto m = std::make_shared<StellarMessage>();
m->type(SCP_MESSAGE);
m->envelope() = e;

CLOG_DEBUG(Herder, "broadcast s:{} i:{}", e.statement.pledges.type(),
e.statement.slotIndex);
Expand Down
6 changes: 3 additions & 3 deletions src/herder/PendingEnvelopes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -539,9 +539,9 @@ PendingEnvelopes::envelopeReady(SCPEnvelope const& envelope)
// envelope.
recordReceivedCost(envelope);

StellarMessage msg;
msg.type(SCP_MESSAGE);
msg.envelope() = envelope;
auto msg = std::make_shared<StellarMessage>();
msg->type(SCP_MESSAGE);
msg->envelope() = envelope;
mApp.getOverlayManager().broadcastMessage(msg);

auto envW = mHerder.getHerderSCPDriver().wrapEnvelope(envelope);
Expand Down
14 changes: 7 additions & 7 deletions src/overlay/Floodgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,19 +85,20 @@ Floodgate::addRecord(StellarMessage const& msg, Peer::pointer peer, Hash& index)

// send message to anyone you haven't gotten it from
bool
Floodgate::broadcast(StellarMessage const& msg, std::optional<Hash> const& hash)
Floodgate::broadcast(std::shared_ptr<StellarMessage const> msg,
std::optional<Hash> const& hash)
{
ZoneScoped;
if (mShuttingDown)
{
return false;
}
if (msg.type() == TRANSACTION)
if (msg->type() == TRANSACTION)
{
// Must pass a hash when broadcasting transactions.
releaseAssert(hash.has_value());
}
Hash index = xdrBlake2(msg);
Hash index = xdrBlake2(*msg);

FloodRecord::pointer fr;
auto result = mFloodMap.find(index);
Expand All @@ -119,11 +120,10 @@ Floodgate::broadcast(StellarMessage const& msg, std::optional<Hash> const& hash)
auto peers = mApp.getOverlayManager().getAuthenticatedPeers();

bool broadcasted = false;
auto smsg = std::make_shared<StellarMessage const>(msg);
for (auto peer : peers)
{
releaseAssert(peer.second->isAuthenticated());
bool pullMode = msg.type() == TRANSACTION;
bool pullMode = msg->type() == TRANSACTION;

if (peersTold.insert(peer.second->toString()).second)
{
Expand All @@ -140,11 +140,11 @@ Floodgate::broadcast(StellarMessage const& msg, std::optional<Hash> const& hash)
std::weak_ptr<Peer> weak(
std::static_pointer_cast<Peer>(peer.second));
mApp.postOnMainThread(
[smsg, weak, log = !broadcasted]() {
[msg, weak, log = !broadcasted]() {
auto strong = weak.lock();
if (strong)
{
strong->sendMessage(smsg, log);
strong->sendMessage(msg, log);
}
},
fmt::format(FMT_STRING("broadcast to {}"),
Expand Down
2 changes: 1 addition & 1 deletion src/overlay/Floodgate.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class Floodgate

// returns true if msg was sent to at least one peer
// The hash required for transactions
bool broadcast(StellarMessage const& msg,
bool broadcast(std::shared_ptr<StellarMessage const> msg,
std::optional<Hash> const& hash = std::nullopt);

// returns the list of peers that sent us the item with hash `msgID`
Expand Down
9 changes: 5 additions & 4 deletions src/overlay/FlowControl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ FlowControl::hasOutboundCapacity(StellarMessage const& msg) const

// Start flow control: send SEND_MORE to a peer to indicate available capacity
void
FlowControl::start(NodeID const& peerID,
std::function<void(std::shared_ptr<StellarMessage>)> sendCb,
std::optional<uint32_t> enableFCBytes)
FlowControl::start(
NodeID const& peerID,
std::function<void(std::shared_ptr<StellarMessage const>)> sendCb,
std::optional<uint32_t> enableFCBytes)
{
mNodeID = peerID;
mSendCallback = sendCb;
Expand Down Expand Up @@ -138,7 +139,7 @@ FlowControl::maybeSendNextBatch()
break;
}

mSendCallback(std::make_shared<StellarMessage>(msg));
mSendCallback(front.mMessage);
++sent;
auto& om = mOverlayMetrics;

Expand Down
9 changes: 5 additions & 4 deletions src/overlay/FlowControl.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class FlowControl
uint64_t mFloodDataProcessedBytes{0};
std::optional<VirtualClock::time_point> mNoOutboundCapacity;
FlowControlMetrics mMetrics;
std::function<void(std::shared_ptr<StellarMessage>)> mSendCallback;
std::function<void(std::shared_ptr<StellarMessage const>)> mSendCallback;

// Release capacity used by this message. Return a struct that indicates how
// much reading and flood capacity was freed
Expand Down Expand Up @@ -160,9 +160,10 @@ class FlowControl

Json::Value getFlowControlJsonInfo(bool compact) const;

void start(NodeID const& peerID,
std::function<void(std::shared_ptr<StellarMessage>)> sendCb,
std::optional<uint32_t> enableFCBytes);
void
start(NodeID const& peerID,
std::function<void(std::shared_ptr<StellarMessage const>)> sendCb,
std::optional<uint32_t> enableFCBytes);

// Stop reading from this peer until capacity is released
void throttleRead();
Expand Down
2 changes: 1 addition & 1 deletion src/overlay/OverlayManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class OverlayManager
// When passing a transaction message,
// the hash of TransactionEnvelope must be passed also for pull mode.
virtual bool
broadcastMessage(StellarMessage const& msg,
broadcastMessage(std::shared_ptr<StellarMessage const> msg,
std::optional<Hash> const hash = std::nullopt) = 0;

// Make a note in the FloodGate that a given peer has provided us with a
Expand Down
2 changes: 1 addition & 1 deletion src/overlay/OverlayManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1205,7 +1205,7 @@ OverlayManagerImpl::recvTxDemand(FloodDemand const& dmd, Peer::pointer peer)
}

bool
OverlayManagerImpl::broadcastMessage(StellarMessage const& msg,
OverlayManagerImpl::broadcastMessage(std::shared_ptr<StellarMessage const> msg,
std::optional<Hash> const hash)
{
ZoneScoped;
Expand Down
2 changes: 1 addition & 1 deletion src/overlay/OverlayManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class OverlayManagerImpl : public OverlayManager
void forgetFloodedMsg(Hash const& msgID) override;
void recvTxDemand(FloodDemand const& dmd, Peer::pointer peer) override;
bool
broadcastMessage(StellarMessage const& msg,
broadcastMessage(std::shared_ptr<StellarMessage const> msg,
std::optional<Hash> const hash = std::nullopt) override;
void connectTo(PeerBareAddress const& address) override;

Expand Down
40 changes: 18 additions & 22 deletions src/overlay/Peer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -526,29 +526,26 @@ Peer::sendErrorAndDrop(ErrorCode error, std::string const& message,
void
Peer::sendSendMore(uint32_t numMessages)
{
ZoneScoped;
releaseAssert(threadIsMain());

ZoneScoped;
StellarMessage m;
m.type(SEND_MORE);
m.sendMoreMessage().numMessages = numMessages;
auto msgPtr = std::make_shared<StellarMessage const>(m);
sendMessage(msgPtr);
auto m = std::make_shared<StellarMessage>();
m->type(SEND_MORE);
m->sendMoreMessage().numMessages = numMessages;
sendMessage(m);
}

void
Peer::sendSendMore(uint32_t numMessages, uint32_t numBytes)
{
releaseAssert(threadIsMain());

ZoneScoped;
StellarMessage m;
m.type(SEND_MORE_EXTENDED);
m.sendMoreExtendedMessage().numMessages = numMessages;
m.sendMoreExtendedMessage().numBytes = numBytes;
releaseAssert(threadIsMain());

auto msgPtr = std::make_shared<StellarMessage const>(m);
sendMessage(msgPtr);
auto m = std::make_shared<StellarMessage>();
m->type(SEND_MORE_EXTENDED);
m->sendMoreExtendedMessage().numMessages = numMessages;
m->sendMoreExtendedMessage().numBytes = numBytes;
sendMessage(m);
}

std::string
Expand Down Expand Up @@ -1079,20 +1076,19 @@ Peer::recvGetTxSet(StellarMessage const& msg)
auto self = shared_from_this();
if (auto txSet = mAppConnector.getHerder().getTxSet(msg.txSetHash()))
{
StellarMessage newMsg;
auto newMsg = std::make_shared<StellarMessage>();
if (txSet->isGeneralizedTxSet())
{
newMsg.type(GENERALIZED_TX_SET);
txSet->toXDR(newMsg.generalizedTxSet());
newMsg->type(GENERALIZED_TX_SET);
txSet->toXDR(newMsg->generalizedTxSet());
}
else
{
newMsg.type(TX_SET);
txSet->toXDR(newMsg.txSet());
newMsg->type(TX_SET);
txSet->toXDR(newMsg->txSet());
}

auto newMsgPtr = std::make_shared<StellarMessage const>(newMsg);
self->sendMessage(newMsgPtr);
self->sendMessage(newMsg);
}
else
{
Expand Down Expand Up @@ -1548,7 +1544,7 @@ Peer::recvAuth(StellarMessage const& msg)
// Subtle: after successful auth, must send sendMore message first to
// tell the other peer about the local node's reading capacity.
auto weakSelf = std::weak_ptr<Peer>(self);
auto sendCb = [weakSelf](std::shared_ptr<StellarMessage> msg) {
auto sendCb = [weakSelf](std::shared_ptr<StellarMessage const> msg) {
auto self = weakSelf.lock();
if (self)
{
Expand Down
3 changes: 2 additions & 1 deletion src/overlay/SurveyManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,8 @@ SurveyManager::processTopologyRequest(SurveyRequestMessage const& request) const
void
SurveyManager::broadcast(StellarMessage const& msg) const
{
mApp.getOverlayManager().broadcastMessage(msg);
mApp.getOverlayManager().broadcastMessage(
std::make_shared<StellarMessage const>(msg));
}

void
Expand Down
4 changes: 1 addition & 3 deletions src/overlay/TxDemandsManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,7 @@ TxDemandsManager::recvTxDemand(FloodDemand const& dmd, Peer::pointer peer)
KeyUtils::toShortString(peer->getPeerID()));
peer->getPeerMetrics().mMessagesFulfilled++;
om.mMessagesFulfilledMeter.Mark();
auto smsg =
std::make_shared<StellarMessage>(tx->toStellarMessage());
peer->sendMessage(smsg);
peer->sendMessage(tx->toStellarMessage());
}
else
{
Expand Down
8 changes: 4 additions & 4 deletions src/overlay/test/OverlayManagerTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,17 +295,17 @@ class OverlayManagerTests
auto c = TestAccount{*app, getAccount("c")};
auto d = TestAccount{*app, getAccount("d")};

StellarMessage AtoB = a.tx({payment(b, 10)})->toStellarMessage();
auto AtoB = a.tx({payment(b, 10)})->toStellarMessage();
auto i = 0;
for (auto p : pm.mOutboundPeers.mAuthenticated)
{
if (i++ == 2)
{
pm.recvFloodedMsg(AtoB, p.second);
pm.recvFloodedMsg(*AtoB, p.second);
}
}
auto broadcastTxnMsg = [&](auto msg) {
pm.broadcastMessage(msg, xdrSha256(msg.transaction()));
pm.broadcastMessage(msg, xdrSha256(msg->transaction()));
};
broadcastTxnMsg(AtoB);
crank(10);
Expand All @@ -314,7 +314,7 @@ class OverlayManagerTests
broadcastTxnMsg(AtoB);
crank(10);
REQUIRE(sentCounts(pm) == expected);
StellarMessage CtoD = c.tx({payment(d, 10)})->toStellarMessage();
auto CtoD = c.tx({payment(d, 10)})->toStellarMessage();
broadcastTxnMsg(CtoD);
crank(10);
std::vector<int> expectedFinal{2, 2, 1, 2, 2};
Expand Down
5 changes: 3 additions & 2 deletions src/overlay/test/OverlayTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2737,8 +2737,9 @@ TEST_CASE("overlay pull mode", "[overlay][pullmode]")
auto root = TestAccount::createRoot(*apps[0]);
auto tx = root.tx({txtest::createAccount(
txtest::getAccount("acc").getPublicKey(), 100)});
auto adv = createAdvert(std::vector<std::shared_ptr<StellarMessage>>{
std::make_shared<StellarMessage>(tx->toStellarMessage())});
auto adv =
createAdvert(std::vector<std::shared_ptr<StellarMessage const>>{
tx->toStellarMessage()});
auto twoNodesRecvTx = [&]() {
// Node0 and Node1 know about tx0 and will advertise it to Node2
REQUIRE(apps[0]->getHerder().recvTransaction(tx, true) ==
Expand Down
4 changes: 2 additions & 2 deletions src/simulation/LoadGenerator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2150,8 +2150,8 @@ LoadGenerator::execute(TransactionFramePtr& txf, LoadGenMode mode,

txm.mTxnAttempted.Mark();

StellarMessage msg(txf->toStellarMessage());
txm.mTxnBytes.Mark(xdr::xdr_argpack_size(msg));
auto msg = txf->toStellarMessage();
txm.mTxnBytes.Mark(xdr::xdr_argpack_size(*msg));

auto status = mApp.getHerder().recvTransaction(txf, true);
if (status != TransactionQueue::AddResult::ADD_STATUS_PENDING)
Expand Down
7 changes: 4 additions & 3 deletions src/transactions/FeeBumpTransactionFrame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -576,11 +576,12 @@ FeeBumpTransactionFrame::resetResults(LedgerHeader const& header,
mResult.feeCharged = getFee(header, baseFee, applying);
}

StellarMessage
std::shared_ptr<StellarMessage const>
FeeBumpTransactionFrame::toStellarMessage() const
{
StellarMessage msg(TRANSACTION);
msg.transaction() = mEnvelope;
auto msg = std::make_shared<StellarMessage>();
msg->type(TRANSACTION);
msg->transaction() = mEnvelope;
return msg;
}
}
2 changes: 1 addition & 1 deletion src/transactions/FeeBumpTransactionFrame.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class FeeBumpTransactionFrame : public TransactionFrameBase
void processFeeSeqNum(AbstractLedgerTxn& ltx,
std::optional<int64_t> baseFee) override;

StellarMessage toStellarMessage() const override;
std::shared_ptr<StellarMessage const> toStellarMessage() const override;

static TransactionEnvelope
convertInnerTxToV1(TransactionEnvelope const& envelope);
Expand Down
7 changes: 4 additions & 3 deletions src/transactions/TransactionFrame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2015,11 +2015,12 @@ TransactionFrame::processRefund(Application& app, AbstractLedgerTxn& ltxOuter,
return refund;
}

StellarMessage
std::shared_ptr<StellarMessage const>
TransactionFrame::toStellarMessage() const
{
StellarMessage msg(TRANSACTION);
msg.transaction() = mEnvelope;
auto msg = std::make_shared<StellarMessage>();
msg->type(TRANSACTION);
msg->transaction() = mEnvelope;
return msg;
}

Expand Down
2 changes: 1 addition & 1 deletion src/transactions/TransactionFrame.h
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ class TransactionFrame : public TransactionFrameBase
bool apply(Application& app, AbstractLedgerTxn& ltx,
Hash const& sorobanBasePrngSeed);

StellarMessage toStellarMessage() const override;
std::shared_ptr<StellarMessage const> toStellarMessage() const override;

LedgerTxnEntry loadAccount(AbstractLedgerTxn& ltx,
LedgerTxnHeader const& header,
Expand Down
2 changes: 1 addition & 1 deletion src/transactions/TransactionFrameBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class TransactionFrameBase
virtual void processPostApply(Application& app, AbstractLedgerTxn& ltx,
TransactionMetaFrame& meta) = 0;

virtual StellarMessage toStellarMessage() const = 0;
virtual std::shared_ptr<StellarMessage const> toStellarMessage() const = 0;

virtual bool hasDexOperations() const = 0;

Expand Down

0 comments on commit d2b09ec

Please sign in to comment.