Skip to content

Commit

Permalink
feat: Add unbounded memory pool (#1386)
Browse files Browse the repository at this point in the history
## Which issue does this PR close?

## Rationale for this change

DataFusion has an unbounded memory pool. I found it useful for experimental purpose.

## What changes are included in this PR?

Added an option for unbounded memory pool.

## How are these changes tested?

existing tests
  • Loading branch information
kazuyukitanimura authored Feb 12, 2025
1 parent f099e6e commit 4fe4f57
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 4 deletions.
4 changes: 2 additions & 2 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -504,8 +504,8 @@ object CometConf extends ShimCometConf {
.doc(
"The type of memory pool to be used for Comet native execution. " +
"Available memory pool types are 'greedy', 'fair_spill', 'greedy_task_shared', " +
"'fair_spill_task_shared', 'greedy_global' and 'fair_spill_global'. For off-heap " +
"types are 'unified' and `fair_unified`.")
"'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', and `unbounded`. " +
"For off-heap types are 'unified' and `fair_unified`.")
.stringConf
.createWithDefault("greedy_task_shared")

Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ Comet provides the following configuration settings.
| spark.comet.exec.hashJoin.enabled | Whether to enable hashJoin by default. | true |
| spark.comet.exec.initCap.enabled | Whether to enable initCap by default. | false |
| spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true |
| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. Available memory pool types are 'greedy', 'fair_spill', 'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global' and 'fair_spill_global'. For off-heap types are 'unified' and `fair_unified`. | greedy_task_shared |
| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. Available memory pool types are 'greedy', 'fair_spill', 'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', and `unbounded`. For off-heap types are 'unified' and `fair_unified`. | greedy_task_shared |
| spark.comet.exec.project.enabled | Whether to enable project by default. | true |
| spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false |
| spark.comet.exec.shuffle.compression.codec | The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and snappy are supported. Compression can be disabled by setting spark.shuffle.compress=false. | lz4 |
Expand Down
5 changes: 4 additions & 1 deletion native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use datafusion::{
prelude::{SessionConfig, SessionContext},
};
use datafusion_execution::memory_pool::{
FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool,
FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool, UnboundedMemoryPool,
};
use futures::poll;
use jni::{
Expand Down Expand Up @@ -118,6 +118,7 @@ enum MemoryPoolType {
FairSpillTaskShared,
GreedyGlobal,
FairSpillGlobal,
Unbounded,
}

struct MemoryPoolConfig {
Expand Down Expand Up @@ -319,6 +320,7 @@ fn parse_memory_pool_config(
"greedy_global" => MemoryPoolConfig::new(MemoryPoolType::GreedyGlobal, pool_size),
"fair_spill" => MemoryPoolConfig::new(MemoryPoolType::FairSpill, pool_size_per_task),
"greedy" => MemoryPoolConfig::new(MemoryPoolType::Greedy, pool_size_per_task),
"unbounded" => MemoryPoolConfig::new(MemoryPoolType::Unbounded, 0),
_ => {
return Err(CometError::Config(format!(
"Unsupported memory pool type: {}",
Expand Down Expand Up @@ -397,6 +399,7 @@ fn create_memory_pool(
per_task_memory_pool.num_plans += 1;
Arc::clone(&per_task_memory_pool.memory_pool)
}
MemoryPoolType::Unbounded => Arc::new(UnboundedMemoryPool::default()),
}
}

Expand Down

0 comments on commit 4fe4f57

Please sign in to comment.