Skip to content

Commit

Permalink
enable spill manager
Browse files Browse the repository at this point in the history
  • Loading branch information
lgbo-ustc committed Feb 11, 2025
1 parent c90f170 commit be95884
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 0 deletions.
49 changes: 49 additions & 0 deletions cpp-ch/local-engine/Operator/GraceAggregatingTransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <Common/GlutenConfig.h>
#include <Common/QueryContext.h>
#include <Common/formatReadable.h>
#include "Processors/IProcessor.h"

namespace DB::ErrorCodes
{
Expand Down Expand Up @@ -60,6 +61,9 @@ GraceAggregatingTransform::GraceAggregatingTransform(
if (enable_spill_test)
buckets.emplace(1, BufferFileStream());
current_data_variants = std::make_shared<DB::AggregatedDataVariants>();

// IProcessor::spillable, MemorySpillScheduler will trigger the spill by enable this flag.
spillable = true;
}

GraceAggregatingTransform::~GraceAggregatingTransform()
Expand Down Expand Up @@ -476,6 +480,7 @@ void GraceAggregatingTransform::mergeOneBlock(const DB::Block & block, bool is_o
{
rehashDataVariants();
}
force_spill = false;

LOG_DEBUG(
logger,
Expand Down Expand Up @@ -534,6 +539,13 @@ void GraceAggregatingTransform::mergeOneBlock(const DB::Block & block, bool is_o

bool GraceAggregatingTransform::isMemoryOverflow()
{
if (force_spill)
{
auto stats = getMemoryStats();
if (stats.spillable_memory_bytes > force_spill_on_bytes * 0.8)
return true;
}

/// More greedy memory usage strategy.
if (!current_data_variants)
return false;
Expand Down Expand Up @@ -580,4 +592,41 @@ bool GraceAggregatingTransform::isMemoryOverflow()
return false;
}

DB::ProcessorMemoryStats GraceAggregatingTransform::getMemoryStats()
{
DB::ProcessorMemoryStats stats;
if (!current_data_variants)
return stats;

stats.need_reserved_memory_bytes = current_data_variants->aggregates_pool->allocatedBytes();
for (size_t i = current_bucket_index + 1; i < getBucketsNum(); ++i)
{
auto & file_stream = buckets[i];
stats.spillable_memory_bytes += file_stream.pending_bytes;
}

if (per_key_memory_usage > 0)
{
auto current_result_rows = current_data_variants->size();
stats.need_reserved_memory_bytes += current_result_rows * per_key_memory_usage;
stats.spillable_memory_bytes += current_result_rows * per_key_memory_usage;
}
else
{
// This is a rough estimation, we don't know the exact memory usage for each key.
stats.spillable_memory_bytes += current_data_variants->aggregates_pool->allocatedBytes();
}
return stats;
}

bool GraceAggregatingTransform::spillOnSize(size_t bytes)
{
auto stats = getMemoryStats();
if (stats.spillable_memory_bytes < bytes * 0.8)
return false;
force_spill = true;
force_spill_on_bytes = bytes;
return true;
}

}
7 changes: 7 additions & 0 deletions cpp-ch/local-engine/Operator/GraceAggregatingTransform.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <Processors/IProcessor.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Poco/Logger.h>
#include "Common/MemorySpillScheduler.h"
#include <Common/AggregateUtil.h>


Expand Down Expand Up @@ -106,7 +107,13 @@ class GraceAggregatingTransform : public DB::IProcessor
void checkAndSetupCurrentDataVariants();
/// Merge one block into current_data_variants.
void mergeOneBlock(const DB::Block & block, bool is_original_block);

// spill control
bool isMemoryOverflow();
DB::ProcessorMemoryStats getMemoryStats() override;
bool spillOnSize(size_t bytes) override;
bool force_spill = false; // a force flag to trigger spill
bool force_spill_on_bytes = 0;

bool input_finished = false;
bool has_input = false;
Expand Down

0 comments on commit be95884

Please sign in to comment.