Skip to content

Commit

Permalink
Fix the flaky async cache tests (facebookincubator#10141)
Browse files Browse the repository at this point in the history
Summary:
The flakiness is due to that currently we don't handle the concurrent cache loads with cache shutdown. We rely on the query system to handle these concurrency properly like stop accessing query cache access, and then shutdown the cache. This is not the case for async cache tests which might schedule coalesced cache loads to a background executor, and might start the next round of tests without waiting for the previous background activities to finishes. Each test round like in ssdWriteOptions will reset the cache which shutdown the cache. This cause the background coalesced cache load see freed cache entries. This change fixes the flakines by adding a pending batch load counter and wait for it to complete before starts the next round of test.

Note we have no plan to handle the async cache shutdown race with concurrent query cache access. It might need a lock which is pretty expensive as in-memory cache access is high frequent operation and we would rather relying on the query system to prevent this.

Pull Request resolved: facebookincubator#10141

Reviewed By: zacw7

Differential Revision: D58432597

Pulled By: xiaoxmeng

fbshipit-source-id: 0bc811a7781de637a446f286d9a8541eaa9dd82b
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Jun 11, 2024
1 parent 9edecbb commit 9434bba
Showing 1 changed file with 35 additions and 7 deletions.
42 changes: 35 additions & 7 deletions velox/common/caching/tests/AsyncDataCacheTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,19 @@ struct Request {
SsdPin ssdPin;
};

class AsyncDataCacheTest : public ::testing::TestWithParam<bool> {
struct TestParam {
bool checksumEnabled;
bool checksumVerificationEnabled;
};

class AsyncDataCacheTest : public ::testing::TestWithParam<TestParam> {
public:
static std::vector<TestParam> getTestParams() {
static std::vector<TestParam> testParams = {
{false, false}, {true, false}, {true, true}};
return testParams;
}

// Deterministically fills 'allocation' based on 'sequence'
static void initializeContents(int64_t sequence, memory::Allocation& alloc) {
for (int32_t i = 0; i < alloc.numRuns(); ++i) {
Expand Down Expand Up @@ -92,6 +103,12 @@ class AsyncDataCacheTest : public ::testing::TestWithParam<bool> {
fileIds().testingReset();
}

void waitForPendingLoads() {
while (numPendingLoads_ > 0) {
std::this_thread::sleep_for(std::chrono::microseconds(2000)); // NOLINT
}
}

void initializeCache(
uint64_t maxBytes,
int64_t ssdBytes = 0,
Expand Down Expand Up @@ -119,8 +136,8 @@ class AsyncDataCacheTest : public ::testing::TestWithParam<bool> {
executor(),
checkpointIntervalBytes > 0 ? checkpointIntervalBytes : ssdBytes / 20,
false,
GetParam(),
GetParam());
GetParam().checksumEnabled,
GetParam().checksumVerificationEnabled);
ssdCache = std::make_unique<SsdCache>(config);
}

Expand Down Expand Up @@ -269,6 +286,7 @@ class AsyncDataCacheTest : public ::testing::TestWithParam<bool> {
std::vector<StringIdLease> filenames_;
std::unique_ptr<folly::IOThreadPoolExecutor> executor_;
int32_t numLargeRetries_{0};
std::atomic_int64_t numPendingLoads_{0};
};

class TestingCoalescedLoad : public CoalescedLoad {
Expand Down Expand Up @@ -455,7 +473,11 @@ void AsyncDataCacheTest::loadBatch(
}
auto load = std::make_shared<TestingCoalescedLoad>(
std::move(keys), std::move(sizes), cache_, injectError);
executor()->add([load, semaphore]() {
++numPendingLoads_;
executor()->add([this, load, semaphore]() {
SCOPE_EXIT {
--numPendingLoads_;
};
try {
load->loadOrFuture(nullptr);
} catch (const std::exception& e) {
Expand Down Expand Up @@ -484,7 +506,11 @@ void AsyncDataCacheTest::loadBatch(
std::move(ssdPins),
cache_,
injectError);
executor()->add([load, semaphore]() {
++numPendingLoads_;
executor()->add([this, load, semaphore]() {
SCOPE_EXIT {
--numPendingLoads_;
};
try {
load->loadOrFuture(nullptr);
} catch (const std::exception& e) {
Expand Down Expand Up @@ -689,6 +715,7 @@ TEST_P(AsyncDataCacheTest, evictAccounting) {
pool->allocateContiguous(1200, large);
EXPECT_EQ(memory::AllocationTraits::pageBytes(2400), pool->usedBytes());
loadLoop(0, kMaxBytes * 1.1);
waitForPendingLoads();
pool->allocateNonContiguous(2400, allocation);
pool->allocateContiguous(2400, large);
EXPECT_EQ(memory::AllocationTraits::pageBytes(4800), pool->usedBytes());
Expand Down Expand Up @@ -1277,7 +1304,7 @@ TEST_P(AsyncDataCacheTest, makeEvictable) {
}
}

TEST_P(AsyncDataCacheTest, DISABLED_ssdWriteOptions) {
TEST_P(AsyncDataCacheTest, ssdWriteOptions) {
constexpr uint64_t kRamBytes = 16UL << 20; // 16 MB
constexpr uint64_t kSsdBytes = 64UL << 20; // 64 MB

Expand Down Expand Up @@ -1314,6 +1341,7 @@ TEST_P(AsyncDataCacheTest, DISABLED_ssdWriteOptions) {
testData.minSsdSavableBytes});
// Load data half of the in-memory capacity.
loadLoop(0, kRamBytes / 2);
waitForPendingLoads();
auto stats = cache_->refreshStats();
if (testData.expectedSaveToSsd) {
EXPECT_GE(stats.ssdStats->entriesWritten, 0);
Expand All @@ -1333,4 +1361,4 @@ TEST_P(AsyncDataCacheTest, DISABLED_ssdWriteOptions) {
INSTANTIATE_TEST_SUITE_P(
AsyncDataCacheTest,
AsyncDataCacheTest,
::testing::Values(false, true));
::testing::ValuesIn(AsyncDataCacheTest::getTestParams()));

0 comments on commit 9434bba

Please sign in to comment.