Skip to content

Commit

Permalink
enable spill manager
Browse files Browse the repository at this point in the history
  • Loading branch information
lgbo-ustc committed Feb 11, 2025
1 parent c90f170 commit 01d9538
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 0 deletions.
1 change: 1 addition & 0 deletions cpp-ch/local-engine/Common/QueryContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ int64_t QueryContext::initializeQuery(const String & task_id)
query_context->thread_group->memory_tracker.setHardLimit(memory_limit + config.extra_memory_hard_limit);
int64_t id = reinterpret_cast<int64_t>(query_context->thread_group.get());
query_map_.insert(id, query_context);
query_context->query_context->setSetting("enable_adaptive_memory_spill_scheduler", true);
return id;
}

Expand Down
48 changes: 48 additions & 0 deletions cpp-ch/local-engine/Operator/GraceAggregatingTransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ GraceAggregatingTransform::GraceAggregatingTransform(
if (enable_spill_test)
buckets.emplace(1, BufferFileStream());
current_data_variants = std::make_shared<DB::AggregatedDataVariants>();

// IProcessor::spillable, MemorySpillScheduler will trigger the spill by enable this flag.
spillable = true;
}

GraceAggregatingTransform::~GraceAggregatingTransform()
Expand Down Expand Up @@ -476,6 +479,7 @@ void GraceAggregatingTransform::mergeOneBlock(const DB::Block & block, bool is_o
{
rehashDataVariants();
}
force_spill = false;

LOG_DEBUG(
logger,
Expand Down Expand Up @@ -534,6 +538,13 @@ void GraceAggregatingTransform::mergeOneBlock(const DB::Block & block, bool is_o

bool GraceAggregatingTransform::isMemoryOverflow()
{
if (force_spill)
{
auto stats = getMemoryStats();
if (stats.spillable_memory_bytes > force_spill_on_bytes * 0.8)
return true;
}

/// More greedy memory usage strategy.
if (!current_data_variants)
return false;
Expand Down Expand Up @@ -580,4 +591,41 @@ bool GraceAggregatingTransform::isMemoryOverflow()
return false;
}

DB::ProcessorMemoryStats GraceAggregatingTransform::getMemoryStats()
{
DB::ProcessorMemoryStats stats;
if (!current_data_variants)
return stats;

stats.need_reserved_memory_bytes = current_data_variants->aggregates_pool->allocatedBytes();
for (size_t i = current_bucket_index + 1; i < getBucketsNum(); ++i)
{
auto & file_stream = buckets[i];
stats.spillable_memory_bytes += file_stream.pending_bytes;
}

if (per_key_memory_usage > 0)
{
auto current_result_rows = current_data_variants->size();
stats.need_reserved_memory_bytes += current_result_rows * per_key_memory_usage;
stats.spillable_memory_bytes += current_result_rows * per_key_memory_usage;
}
else
{
// This is a rough estimation, we don't know the exact memory usage for each key.
stats.spillable_memory_bytes += current_data_variants->aggregates_pool->allocatedBytes();
}
return stats;
}

bool GraceAggregatingTransform::spillOnSize(size_t bytes)
{
auto stats = getMemoryStats();
if (stats.spillable_memory_bytes < bytes * 0.8)
return false;
force_spill = true;
force_spill_on_bytes = bytes;
return true;
}

}
7 changes: 7 additions & 0 deletions cpp-ch/local-engine/Operator/GraceAggregatingTransform.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <Processors/IProcessor.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Poco/Logger.h>
#include <Common/MemorySpillScheduler.h>
#include <Common/AggregateUtil.h>


Expand Down Expand Up @@ -106,7 +107,13 @@ class GraceAggregatingTransform : public DB::IProcessor
void checkAndSetupCurrentDataVariants();
/// Merge one block into current_data_variants.
void mergeOneBlock(const DB::Block & block, bool is_original_block);

// spill control
bool isMemoryOverflow();
DB::ProcessorMemoryStats getMemoryStats() override;
bool spillOnSize(size_t bytes) override;
bool force_spill = false; // a force flag to trigger spill
bool force_spill_on_bytes = 0;

bool input_finished = false;
bool has_input = false;
Expand Down

0 comments on commit 01d9538

Please sign in to comment.