Skip to content

Commit

Permalink
Reduce duplicate peer traffic for ledger data (#5126)
Browse files Browse the repository at this point in the history
- Drop duplicate outgoing TMGetLedger messages per peer
  - Allow a retry after 30s in case of peer or network congestion.
  - Addresses RIPD-1870
  - (Changes levelization. That is not desirable, and will need to be fixed.)
- Drop duplicate incoming TMGetLedger messages per peer
  - Allow a retry after 15s in case of peer or network congestion.
  - The requestCookie is ignored when computing the hash, thus increasing
    the chances of detecting duplicate messages.
  - With duplicate messages, keep track of the different requestCookies
    (or lack of cookie). When work is finally done for a given request,
    send the response to all the peers that are waiting on the request,
    sending one message per peer, including all the cookies and
    a "directResponse" flag indicating the data is intended for the
    sender, too.
  - Addresses RIPD-1871
- Drop duplicate incoming TMLedgerData messages
  - Addresses RIPD-1869
- Improve logging related to ledger acquisition
- Class "CanProcess" to keep track of processing of distinct items

---------

Co-authored-by: Valentin Balaschenko <13349202+vlntb@users.noreply.github.com>
  • Loading branch information
ximinez and vlntb authored Feb 14, 2025
1 parent 7c9d652 commit dd5e655
Show file tree
Hide file tree
Showing 27 changed files with 1,013 additions and 143 deletions.
2 changes: 1 addition & 1 deletion Builds/levelization/results/loops.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Loop: xrpld.app xrpld.net
xrpld.app > xrpld.net

Loop: xrpld.app xrpld.overlay
xrpld.overlay == xrpld.app
xrpld.overlay ~= xrpld.app

Loop: xrpld.app xrpld.peerfinder
xrpld.app > xrpld.peerfinder
Expand Down
134 changes: 134 additions & 0 deletions include/xrpl/basics/CanProcess.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2024 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================

#ifndef RIPPLE_BASICS_CANPROCESS_H_INCLUDED
#define RIPPLE_BASICS_CANPROCESS_H_INCLUDED

#include <functional>
#include <mutex>
#include <set>

/** RAII class to check if an Item is already being processed on another thread,
* as indicated by it's presence in a Collection.
*
* If the Item is not in the Collection, it will be added under lock in the
* ctor, and removed under lock in the dtor. The object will be considered
* "usable" and evaluate to `true`.
*
* If the Item is in the Collection, no changes will be made to the collection,
* and the CanProcess object will be considered "unusable".
*
* It's up to the caller to decide what "usable" and "unusable" mean. (e.g.
* Process or skip a block of code, or set a flag.)
*
* The current use is to avoid lock contention that would be involved in
* processing something associated with the Item.
*
* Examples:
*
* void IncomingLedgers::acquireAsync(LedgerHash const& hash, ...)
* {
* if (CanProcess check{acquiresMutex_, pendingAcquires_, hash})
* {
* acquire(hash, ...);
* }
* }
*
* bool
* NetworkOPsImp::recvValidation(
* std::shared_ptr<STValidation> const& val,
* std::string const& source)
* {
* CanProcess check(
* validationsMutex_, pendingValidations_, val->getLedgerHash());
* BypassAccept bypassAccept =
* check ? BypassAccept::no : BypassAccept::yes;
* handleNewValidation(app_, val, source, bypassAccept, m_journal);
* }
*
*/
class CanProcess
{
public:
template <class Mutex, class Collection, class Item>
CanProcess(Mutex& mtx, Collection& collection, Item const& item)
: cleanup_(insert(mtx, collection, item))
{
}

~CanProcess()
{
if (cleanup_)
cleanup_();
}

explicit
operator bool() const
{
return static_cast<bool>(cleanup_);
}

private:
template <bool useIterator, class Mutex, class Collection, class Item>
std::function<void()>
doInsert(Mutex& mtx, Collection& collection, Item const& item)
{
std::unique_lock<Mutex> lock(mtx);
// TODO: Use structured binding once LLVM 16 is the minimum supported
// version. See also: https://github.com/llvm/llvm-project/issues/48582
// https://github.com/llvm/llvm-project/commit/127bf44385424891eb04cff8e52d3f157fc2cb7c
auto const insertResult = collection.insert(item);
auto const it = insertResult.first;
if (!insertResult.second)
return {};
if constexpr (useIterator)
return [&, it]() {
std::unique_lock<Mutex> lock(mtx);
collection.erase(it);
};
else
return [&]() {
std::unique_lock<Mutex> lock(mtx);
collection.erase(item);
};
}

// Generic insert() function doesn't use iterators because they may get
// invalidated
template <class Mutex, class Collection, class Item>
std::function<void()>
insert(Mutex& mtx, Collection& collection, Item const& item)
{
return doInsert<false>(mtx, collection, item);
}

// Specialize insert() for std::set, which does not invalidate iterators for
// insert and erase
template <class Mutex, class Item>
std::function<void()>
insert(Mutex& mtx, std::set<Item>& collection, Item const& item)
{
return doInsert<true>(mtx, collection, item);
}

// If set, then the item is "usable"
std::function<void()> cleanup_;
};

#endif
7 changes: 7 additions & 0 deletions include/xrpl/basics/base_uint.h
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,13 @@ to_string(base_uint<Bits, Tag> const& a)
return strHex(a.cbegin(), a.cend());
}

template <std::size_t Bits, class Tag>
inline std::string
to_short_string(base_uint<Bits, Tag> const& a)
{
return strHex(a.cbegin(), a.cend()).substr(0, 8) + "...";
}

template <std::size_t Bits, class Tag>
inline std::ostream&
operator<<(std::ostream& out, base_uint<Bits, Tag> const& u)
Expand Down
10 changes: 10 additions & 0 deletions include/xrpl/proto/ripple.proto
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,18 @@ message TMLedgerData
required uint32 ledgerSeq = 2;
required TMLedgerInfoType type = 3;
repeated TMLedgerNode nodes = 4;
// If the peer supports "responseCookies", this field will
// never be populated.
optional uint32 requestCookie = 5;
optional TMReplyError error = 6;
// The old field is called "requestCookie", but this is
// a response, so this name makes more sense
repeated uint32 responseCookies = 7;
// If a TMGetLedger request was received without a "requestCookie",
// and the peer supports it, this flag will be set to true to
// indicate that the receiver should process the result in addition
// to forwarding it to its "responseCookies" peers.
optional bool directResponse = 8;
}

message TMPing
Expand Down
2 changes: 2 additions & 0 deletions include/xrpl/protocol/LedgerHeader.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ struct LedgerHeader

// If validated is false, it means "not yet validated."
// Once validated is true, it will never be set false at a later time.
// NOTE: If you are accessing this directly, you are probably doing it
// wrong. Use LedgerMaster::isValidated().
// VFALCO TODO Make this not mutable
bool mutable validated = false;
bool accepted = false;
Expand Down
28 changes: 28 additions & 0 deletions src/test/app/HashRouter_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,33 @@ class HashRouter_test : public beast::unit_test::suite
BEAST_EXPECT(router.shouldProcess(key, peer, flags, 1s));
}

void
testProcessPeer()
{
using namespace std::chrono_literals;
TestStopwatch stopwatch;
HashRouter router(stopwatch, 5s);
uint256 const key(1);
HashRouter::PeerShortID peer1 = 1;
HashRouter::PeerShortID peer2 = 2;
auto const timeout = 2s;

BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer1, timeout));
++stopwatch;
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(router.shouldProcessForPeer(key, peer2, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
++stopwatch;
BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
++stopwatch;
BEAST_EXPECT(router.shouldProcessForPeer(key, peer2, timeout));
++stopwatch;
BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
}

public:
void
run() override
Expand All @@ -252,6 +279,7 @@ class HashRouter_test : public beast::unit_test::suite
testSetFlags();
testRelay();
testProcess();
testProcessPeer();
}
};

Expand Down
5 changes: 5 additions & 0 deletions src/test/app/LedgerReplay_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,11 @@ class TestPeer : public Peer
{
return false;
}
std::set<std::optional<uint64_t>>
releaseRequestCookies(uint256 const& requestHash) override
{
return {};
}

bool ledgerReplayEnabled_;
PublicKey nodePublicKey_;
Expand Down
5 changes: 5 additions & 0 deletions src/test/basics/base_uint_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ struct base_uint_test : beast::unit_test::suite
uset.insert(u);
BEAST_EXPECT(raw.size() == u.size());
BEAST_EXPECT(to_string(u) == "0102030405060708090A0B0C");
BEAST_EXPECT(to_short_string(u) == "01020304...");
BEAST_EXPECT(*u.data() == 1);
BEAST_EXPECT(u.signum() == 1);
BEAST_EXPECT(!!u);
Expand All @@ -173,6 +174,7 @@ struct base_uint_test : beast::unit_test::suite
test96 v{~u};
uset.insert(v);
BEAST_EXPECT(to_string(v) == "FEFDFCFBFAF9F8F7F6F5F4F3");
BEAST_EXPECT(to_short_string(v) == "FEFDFCFB...");
BEAST_EXPECT(*v.data() == 0xfe);
BEAST_EXPECT(v.signum() == 1);
BEAST_EXPECT(!!v);
Expand All @@ -193,6 +195,7 @@ struct base_uint_test : beast::unit_test::suite
test96 z{beast::zero};
uset.insert(z);
BEAST_EXPECT(to_string(z) == "000000000000000000000000");
BEAST_EXPECT(to_short_string(z) == "00000000...");
BEAST_EXPECT(*z.data() == 0);
BEAST_EXPECT(*z.begin() == 0);
BEAST_EXPECT(*std::prev(z.end(), 1) == 0);
Expand All @@ -213,6 +216,7 @@ struct base_uint_test : beast::unit_test::suite
BEAST_EXPECT(n == z);
n--;
BEAST_EXPECT(to_string(n) == "FFFFFFFFFFFFFFFFFFFFFFFF");
BEAST_EXPECT(to_short_string(n) == "FFFFFFFF...");
n = beast::zero;
BEAST_EXPECT(n == z);

Expand All @@ -223,6 +227,7 @@ struct base_uint_test : beast::unit_test::suite
test96 x{zm1 ^ zp1};
uset.insert(x);
BEAST_EXPECTS(to_string(x) == "FFFFFFFFFFFFFFFFFFFFFFFE", to_string(x));
BEAST_EXPECTS(to_short_string(x) == "FFFFFFFF...", to_short_string(x));

BEAST_EXPECT(uset.size() == 4);

Expand Down
4 changes: 2 additions & 2 deletions src/test/overlay/ProtocolVersion_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ class ProtocolVersion_test : public beast::unit_test::suite
negotiateProtocolVersion("XRPL/2.2") == make_protocol(2, 2));
BEAST_EXPECT(
negotiateProtocolVersion(
"RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/999.999") ==
make_protocol(2, 2));
"RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/2.4, XRPL/999.999") ==
make_protocol(2, 3));
BEAST_EXPECT(
negotiateProtocolVersion("XRPL/999.999, WebSocket/1.0") ==
std::nullopt);
Expand Down
5 changes: 5 additions & 0 deletions src/test/overlay/reduce_relay_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ class PeerPartial : public Peer
removeTxQueue(const uint256&) override
{
}
std::set<std::optional<uint64_t>>
releaseRequestCookies(uint256 const& requestHash) override
{
return {};
}
};

/** Manually advanced clock. */
Expand Down
3 changes: 2 additions & 1 deletion src/xrpld/app/consensus/RCLConsensus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1073,7 +1073,8 @@ void
RCLConsensus::Adaptor::updateOperatingMode(std::size_t const positions) const
{
if (!positions && app_.getOPs().isFull())
app_.getOPs().setMode(OperatingMode::CONNECTED);
app_.getOPs().setMode(
OperatingMode::CONNECTED, "updateOperatingMode: no positions");
}

void
Expand Down
19 changes: 19 additions & 0 deletions src/xrpld/app/ledger/InboundLedger.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,25 @@ class InboundLedger final : public TimeoutCounter,
std::unique_ptr<PeerSet> mPeerSet;
};

inline std::string
to_string(InboundLedger::Reason reason)
{
using enum InboundLedger::Reason;
switch (reason)
{
case HISTORY:
return "HISTORY";
case GENERIC:
return "GENERIC";
case CONSENSUS:
return "CONSENSUS";
default:
UNREACHABLE(
"ripple::to_string(InboundLedger::Reason) : unknown value");
return "unknown";
}
}

} // namespace ripple

#endif
21 changes: 15 additions & 6 deletions src/xrpld/app/ledger/detail/InboundLedger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,14 @@ InboundLedger::onTimer(bool wasProgress, ScopedLockType&)

if (!wasProgress)
{
checkLocal();
if (checkLocal())
{
// Done. Something else (probably consensus) built the ledger
// locally while waiting for data (or possibly before requesting)
XRPL_ASSERT(isDone(), "ripple::InboundLedger::onTimer : done");
JLOG(journal_.info()) << "Finished while waiting " << hash_;
return;
}

mByHash = true;

Expand Down Expand Up @@ -502,15 +509,17 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)

if (auto stream = journal_.debug())
{
stream << "Trigger acquiring ledger " << hash_;
std::stringstream ss;
ss << "Trigger acquiring ledger " << hash_;
if (peer)
stream << " from " << peer;
ss << " from " << peer;

if (complete_ || failed_)
stream << "complete=" << complete_ << " failed=" << failed_;
ss << " complete=" << complete_ << " failed=" << failed_;
else
stream << "header=" << mHaveHeader << " tx=" << mHaveTransactions
<< " as=" << mHaveState;
ss << " header=" << mHaveHeader << " tx=" << mHaveTransactions
<< " as=" << mHaveState;
stream << ss.str();
}

if (!mHaveHeader)
Expand Down
Loading

0 comments on commit dd5e655

Please sign in to comment.