Skip to content

Commit

Permalink
chore: introduce back-pressure to tiered storage (#3243)
Browse files Browse the repository at this point in the history
* chore: introduce back-pressure to tiered storage

Also, so clean-up with mac-os daily build.


Enabled forgotten test.
Improve CI insights

---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
  • Loading branch information
romange authored Jun 30, 2024
1 parent c34a789 commit 0e37529
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 36 deletions.
21 changes: 15 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ jobs:
run: |
python -m pip install pre-commit
python -m pip freeze --local
lsblk -l
echo "sda rotational = $(cat /sys/block/sda/queue/rotational)"
echo "sdb rotational = $(cat /sys/block/sdb/queue/rotational)"
- uses: actions/cache@v4
with:
path: ~/.cache/pre-commit
Expand All @@ -31,13 +34,9 @@ jobs:
run: pre-commit run --show-diff-on-failure --color=always --from-ref HEAD^ --to-ref HEAD
shell: bash
build:
# The CMake configure and build commands are platform agnostic and should work equally
# well on Windows or Mac. You can convert this to a matrix build if you need
# cross-platform coverage.
# See: https://docs.github.com/en/free-pro-team@latest/actions/learn-github-actions/managing-complex-workflows#using-a-build-matrix
runs-on: ubuntu-latest
strategy:
matrix:
runner: [ubuntu-latest]
# Test of these containers
container: ["ubuntu-dev:20", "alpine-dev:latest"]
build-type: [Debug, Release]
Expand All @@ -48,6 +47,12 @@ jobs:
build-type: Debug
compiler: { cxx: clang++, c: clang }
cxx_flags: ""
runner: ubuntu-latest
exclude:
- container: "alpine-dev:latest"
runner: Larger-CI-Arm

runs-on: ${{matrix.runner}}
timeout-minutes: 60
env:
SCCACHE_GHA_ENABLED: "true"
Expand All @@ -61,6 +66,7 @@ jobs:
options: --security-opt seccomp=unconfined --sysctl "net.ipv6.conf.all.disable_ipv6=0"
volumes:
- /:/hostroot
- /mnt:/mnt
credentials:
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
Expand All @@ -74,6 +80,7 @@ jobs:
uname -a
cmake --version
mkdir -p ${GITHUB_WORKSPACE}/build
mount
echo "===================Before freeing up space ============================================"
df -h
Expand All @@ -83,7 +90,8 @@ jobs:
rm -rf /hostroot/opt/ghc
echo "===================After freeing up space ============================================"
df -h
touch /mnt/foo
ls -la /mnt/foo
- name: Run sccache-cache
uses: mozilla-actions/sccache-action@v0.0.4

Expand Down Expand Up @@ -180,6 +188,7 @@ jobs:

lint-test-chart:
runs-on: ubuntu-latest
needs: [build]
steps:
- uses: actions/checkout@v4
- uses: ./.github/actions/lint-test-chart
15 changes: 3 additions & 12 deletions .github/workflows/daily-builds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ jobs:
build-macos:
if: false
runs-on: macos-12
runs-on: macos-13
timeout-minutes: 45
steps:
- uses: actions/checkout@v4
Expand All @@ -87,13 +87,7 @@ jobs:
- name: Install dependencies
run: |
brew uninstall --formula node kotlin harfbuzz sbt selenium-server imagemagick \
gradle maven openjdk postgresql r ant mongodb-community@5.0 mongosh \
node@18 php composer
# Preven updating these packages that bring in dependencies that sometimes break
# the update
brew pin azure-cli jpeg-xl aom lima pipx gcc
# Remove Python3 symlinks in /usr/local/bin as workaround to brew update issues
# https://github.com/actions/setup-python/issues/577
rm /usr/local/bin/2to3* || :
Expand All @@ -103,22 +97,19 @@ jobs:
brew update && brew install ninja boost openssl automake gcc zstd bison c-ares \
autoconf libtool automake
# brew info icu4c
mkdir -p $GITHUB_WORKSPACE/build
- name: Configure & Build
run: |
cd $GITHUB_WORKSPACE/build
export PATH=/usr/local/opt/bison/bin:$PATH
which gcc
which gcc-13
gcc-12 --version
bison --version
echo "*************************** START BUILDING **************************************"
CC=gcc-13 CXX=g++-13 cmake .. -DCMAKE_BUILD_TYPE=Debug -GNinja -DWITH_UNWIND=OFF \
CC=gcc-12 CXX=g++-12 cmake .. -DCMAKE_BUILD_TYPE=Debug -GNinja -DWITH_UNWIND=OFF \
-DCMAKE_C_COMPILER_LAUNCHER=sccache
ninja src/all
Expand Down
25 changes: 14 additions & 11 deletions src/server/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,22 +61,25 @@ struct LockTagOptions {
};

struct TieredStats {
size_t total_stashes = 0;
size_t total_fetches = 0;
size_t total_cancels = 0;
size_t total_deletes = 0;
size_t total_defrags = 0;
size_t total_registered_buf_allocs = 0;
size_t total_heap_buf_allocs = 0;
uint64_t total_stashes = 0;
uint64_t total_fetches = 0;
uint64_t total_cancels = 0;
uint64_t total_deletes = 0;
uint64_t total_defrags = 0;
uint64_t total_registered_buf_allocs = 0;
uint64_t total_heap_buf_allocs = 0;

// How many times the system did not perform Stash call (disjoint with total_stashes).
uint64_t total_stash_overflows = 0;

size_t allocated_bytes = 0;
size_t capacity_bytes = 0;

size_t pending_read_cnt = 0;
size_t pending_stash_cnt = 0;
uint32_t pending_read_cnt = 0;
uint32_t pending_stash_cnt = 0;

size_t small_bins_cnt = 0;
size_t small_bins_entries_cnt = 0;
uint64_t small_bins_cnt = 0;
uint64_t small_bins_entries_cnt = 0;
size_t small_bins_filling_bytes = 0;

TieredStats& operator+=(const TieredStats&);
Expand Down
1 change: 1 addition & 0 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2175,6 +2175,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("tiered_total_fetches", m.tiered_stats.total_fetches);
append("tiered_total_cancels", m.tiered_stats.total_cancels);
append("tiered_total_deletes", m.tiered_stats.total_deletes);
append("tiered_total_stash_overflows", m.tiered_stats.total_stash_overflows);
append("tiered_heap_buf_allocations", m.tiered_stats.total_heap_buf_allocs);
append("tiered_registered_buf_allocations", m.tiered_stats.total_registered_buf_allocs);

Expand Down
2 changes: 1 addition & 1 deletion src/server/string_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ void SetCmd::PostEdit(const SetParams& params, std::string_view key, std::string
PrimeValue* pv) {
EngineShard* shard = op_args_.shard;

// Currently we always offload
// Currently we always try to offload, but Stash may ignore it, if disk I/O is overloaded.
if (auto* ts = shard->tiered_storage(); ts && ts->ShouldStash(*pv)) {
ts->Stash(op_args_.db_cntx.db_index, key, pv);
}
Expand Down
14 changes: 11 additions & 3 deletions src/server/tiered_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
ABSL_FLAG(bool, tiered_storage_cache_fetched, true,
"WIP: Load results of offloaded reads to memory");

ABSL_FLAG(size_t, tiered_storage_write_depth, 50,
ABSL_FLAG(unsigned, tiered_storage_write_depth, 50,
"Maximum number of concurrent stash requests issued by background offload");

namespace dfly {
Expand Down Expand Up @@ -217,6 +217,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
TieredStorage::TieredStorage(DbSlice* db_slice, size_t max_size)
: op_manager_{make_unique<ShardOpManager>(this, db_slice, max_size)},
bins_{make_unique<tiering::SmallBins>()} {
write_depth_limit_ = absl::GetFlag(FLAGS_tiered_storage_write_depth);
}

TieredStorage::~TieredStorage() {
Expand Down Expand Up @@ -273,6 +274,13 @@ template util::fb2::Future<size_t> TieredStorage::Modify(DbIndex dbid, std::stri
void TieredStorage::Stash(DbIndex dbid, string_view key, PrimeValue* value) {
DCHECK(!value->IsExternal() && !value->HasIoPending());

// TODO: When we are low on memory we should introduce a back-pressure, to avoid OOMs
// with a lot of underutilized disk space.
if (op_manager_->GetStats().pending_stash_cnt >= write_depth_limit_) {
++stash_overflow_cnt_;
return;
}

string buf;
string_view value_sv = value->GetSlice(&buf);
value->SetIoPending(true);
Expand Down Expand Up @@ -342,6 +350,7 @@ TieredStats TieredStorage::GetStats() const {
stats.small_bins_filling_bytes = bins_stats.current_bin_bytes;
}

stats.total_stash_overflows = stash_overflow_cnt_;
return stats;
}

Expand All @@ -350,8 +359,7 @@ void TieredStorage::RunOffloading(DbIndex dbid) {
return;

PrimeTable& table = op_manager_->db_slice_->GetDBTable(dbid)->prime;
int stash_limit =
absl::GetFlag(FLAGS_tiered_storage_write_depth) - op_manager_->GetStats().pending_stash_cnt;
int stash_limit = write_depth_limit_ - op_manager_->GetStats().pending_stash_cnt;
if (stash_limit <= 0)
return;

Expand Down
2 changes: 2 additions & 0 deletions src/server/tiered_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ class TieredStorage {

std::unique_ptr<ShardOpManager> op_manager_;
std::unique_ptr<tiering::SmallBins> bins_;
unsigned write_depth_limit_ = 10;
uint64_t stash_overflow_cnt_ = 0;
};

} // namespace dfly
Expand Down
2 changes: 2 additions & 0 deletions src/server/tiered_storage_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ ABSL_DECLARE_FLAG(string, tiered_prefix);
ABSL_DECLARE_FLAG(bool, tiered_storage_cache_fetched);
ABSL_DECLARE_FLAG(bool, backing_file_direct);
ABSL_DECLARE_FLAG(float, tiered_offload_threshold);
ABSL_DECLARE_FLAG(unsigned, tiered_storage_write_depth);

namespace dfly {

Expand All @@ -41,6 +42,7 @@ class TieredStorageTest : public BaseFamilyTest {
exit(0);
}

absl::SetFlag(&FLAGS_tiered_storage_write_depth, 15000);
absl::SetFlag(&FLAGS_tiered_prefix, "/tmp/tiered_storage_test");
absl::SetFlag(&FLAGS_tiered_storage_cache_fetched, true);
absl::SetFlag(&FLAGS_backing_file_direct, true);
Expand Down
6 changes: 3 additions & 3 deletions tests/dragonfly/tiering_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ async def run(sub_ops):
n = 20
await asyncio.gather(*(run(ops[i::n]) for i in range(n)))

return # TODO(vlad): to make sure the tests below pass
info = await async_client.info("tiered")
assert info["tiered_entries"] > len(key_range) / 5
async for info, breaker in info_tick_timer(async_client, section="TIERED"):
with breaker:
assert info["tiered_entries"] > len(key_range) / 5

# Verify lengths
p = async_client.pipeline(transaction=False)
Expand Down

0 comments on commit 0e37529

Please sign in to comment.