From dec2324adadac5d284c07d548a8bebf8f567bf7a Mon Sep 17 00:00:00 2001 From: wiedld Date: Thu, 15 Aug 2024 09:57:00 -0700 Subject: [PATCH] Use tracked-consumers memory pool be the default. (#11949) * feat(11523): set the default memory pool to the tracked-consumer pool * test(11523): update tests for the OOM message including the top consumers * chore(11523): remove duplicate wording from OOM messages --- datafusion/core/tests/memory_limit/mod.rs | 35 +++++++------------ datafusion/execution/src/memory_pool/pool.rs | 16 ++++----- datafusion/execution/src/runtime_env.rs | 14 ++++++-- .../physical-plan/src/joins/cross_join.rs | 3 +- .../physical-plan/src/joins/hash_join.rs | 13 +++---- .../src/joins/nested_loop_join.rs | 3 +- 6 files changed, 38 insertions(+), 46 deletions(-) diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 5c712af80192..e6a51eae1337 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -76,8 +76,7 @@ async fn group_by_none() { TestCase::new() .with_query("select median(request_bytes) from t") .with_expected_errors(vec![ - "Resources exhausted: Failed to allocate additional", - "AggregateStream", + "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: AggregateStream" ]) .with_memory_limit(2_000) .run() @@ -89,8 +88,7 @@ async fn group_by_row_hash() { TestCase::new() .with_query("select count(*) from t GROUP BY response_bytes") .with_expected_errors(vec![ - "Resources exhausted: Failed to allocate additional", - "GroupedHashAggregateStream", + "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: GroupedHashAggregateStream" ]) .with_memory_limit(2_000) .run() @@ -103,8 +101,7 @@ async fn group_by_hash() { // group by dict column .with_query("select count(*) from t GROUP BY service, host, pod, container") .with_expected_errors(vec![ - "Resources exhausted: Failed to allocate additional", - "GroupedHashAggregateStream", + "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: GroupedHashAggregateStream" ]) .with_memory_limit(1_000) .run() @@ -117,8 +114,7 @@ async fn join_by_key_multiple_partitions() { TestCase::new() .with_query("select t1.* from t t1 JOIN t t2 ON t1.service = t2.service") .with_expected_errors(vec![ - "Resources exhausted: Failed to allocate additional", - "HashJoinInput[0]", + "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput[0]", ]) .with_memory_limit(1_000) .with_config(config) @@ -132,8 +128,7 @@ async fn join_by_key_single_partition() { TestCase::new() .with_query("select t1.* from t t1 JOIN t t2 ON t1.service = t2.service") .with_expected_errors(vec![ - "Resources exhausted: Failed to allocate additional", - "HashJoinInput", + "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput", ]) .with_memory_limit(1_000) .with_config(config) @@ -146,8 +141,7 @@ async fn join_by_expression() { TestCase::new() .with_query("select t1.* from t t1 JOIN t t2 ON t1.service != t2.service") .with_expected_errors(vec![ - "Resources exhausted: Failed to allocate additional", - "NestedLoopJoinLoad[0]", + "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: NestedLoopJoinLoad[0]", ]) .with_memory_limit(1_000) .run() @@ -159,8 +153,7 @@ async fn cross_join() { TestCase::new() .with_query("select t1.* from t t1 CROSS JOIN t t2") .with_expected_errors(vec![ - "Resources exhausted: Failed to allocate additional", - "CrossJoinExec", + "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: CrossJoinExec", ]) .with_memory_limit(1_000) .run() @@ -216,8 +209,7 @@ async fn symmetric_hash_join() { "select t1.* from t t1 JOIN t t2 ON t1.pod = t2.pod AND t1.time = t2.time", ) .with_expected_errors(vec![ - "Resources exhausted: Failed to allocate additional", - "SymmetricHashJoinStream", + "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: SymmetricHashJoinStream", ]) .with_memory_limit(1_000) .with_scenario(Scenario::AccessLogStreaming) @@ -235,8 +227,7 @@ async fn sort_preserving_merge() { // so only a merge is needed .with_query("select * from t ORDER BY a ASC NULLS LAST, b ASC NULLS LAST LIMIT 10") .with_expected_errors(vec![ - "Resources exhausted: Failed to allocate additional", - "SortPreservingMergeExec", + "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: SortPreservingMergeExec", ]) // provide insufficient memory to merge .with_memory_limit(partition_size / 2) @@ -313,8 +304,7 @@ async fn sort_spill_reservation() { test.clone() .with_expected_errors(vec![ - "Resources exhausted: Failed to allocate additional", - "ExternalSorterMerge", // merging in sort fails + "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: ExternalSorterMerge", ]) .with_config(config) .run() @@ -343,8 +333,7 @@ async fn oom_recursive_cte() { SELECT * FROM nodes;", ) .with_expected_errors(vec![ - "Resources exhausted: Failed to allocate additional", - "RecursiveQuery", + "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: RecursiveQuery", ]) .with_memory_limit(2_000) .run() @@ -396,7 +385,7 @@ async fn oom_with_tracked_consumer_pool() { .with_expected_errors(vec![ "Failed to allocate additional", "for ParquetSink(ArrowColumnWriter)", - "Resources exhausted with top memory consumers (across reservations) are: ParquetSink(ArrowColumnWriter)" + "Additional allocation failed with top memory consumers (across reservations) as: ParquetSink(ArrowColumnWriter)" ]) .with_memory_pool(Arc::new( TrackConsumersPool::new( diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index 4a41602bd961..d3cd93979baf 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -392,7 +392,7 @@ fn provide_top_memory_consumers_to_error_msg( error_msg: String, top_consumers: String, ) -> String { - format!("Resources exhausted with top memory consumers (across reservations) are: {}. Error: {}", top_consumers, error_msg) + format!("Additional allocation failed with top memory consumers (across reservations) as: {}. Error: {}", top_consumers, error_msg) } #[cfg(test)] @@ -501,7 +501,7 @@ mod tests { // Test: reports if new reservation causes error // using the previously set sizes for other consumers let mut r5 = MemoryConsumer::new("r5").register(&pool); - let expected = "Resources exhausted with top memory consumers (across reservations) are: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes. Error: Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated for this reservation - 5 bytes remain available for the total pool"; + let expected = "Additional allocation failed with top memory consumers (across reservations) as: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes. Error: Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated for this reservation - 5 bytes remain available for the total pool"; let res = r5.try_grow(150); assert!( matches!( @@ -524,7 +524,7 @@ mod tests { // Test: see error message when no consumers recorded yet let mut r0 = MemoryConsumer::new(same_name).register(&pool); - let expected = "Resources exhausted with top memory consumers (across reservations) are: foo consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 100 bytes remain available for the total pool"; + let expected = "Additional allocation failed with top memory consumers (across reservations) as: foo consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 100 bytes remain available for the total pool"; let res = r0.try_grow(150); assert!( matches!( @@ -543,7 +543,7 @@ mod tests { let mut r1 = new_consumer_same_name.clone().register(&pool); // TODO: the insufficient_capacity_err() message is per reservation, not per consumer. // a followup PR will clarify this message "0 bytes already allocated for this reservation" - let expected = "Resources exhausted with top memory consumers (across reservations) are: foo consumed 10 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 90 bytes remain available for the total pool"; + let expected = "Additional allocation failed with top memory consumers (across reservations) as: foo consumed 10 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 90 bytes remain available for the total pool"; let res = r1.try_grow(150); assert!( matches!( @@ -555,7 +555,7 @@ mod tests { // Test: will accumulate size changes per consumer, not per reservation r1.grow(20); - let expected = "Resources exhausted with top memory consumers (across reservations) are: foo consumed 30 bytes. Error: Failed to allocate additional 150 bytes for foo with 20 bytes already allocated for this reservation - 70 bytes remain available for the total pool"; + let expected = "Additional allocation failed with top memory consumers (across reservations) as: foo consumed 30 bytes. Error: Failed to allocate additional 150 bytes for foo with 20 bytes already allocated for this reservation - 70 bytes remain available for the total pool"; let res = r1.try_grow(150); assert!( matches!( @@ -570,7 +570,7 @@ mod tests { let consumer_with_same_name_but_different_hash = MemoryConsumer::new(same_name).with_can_spill(true); let mut r2 = consumer_with_same_name_but_different_hash.register(&pool); - let expected = "Resources exhausted with top memory consumers (across reservations) are: foo(can_spill=false) consumed 30 bytes, foo(can_spill=true) consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 70 bytes remain available for the total pool"; + let expected = "Additional allocation failed with top memory consumers (across reservations) as: foo(can_spill=false) consumed 30 bytes, foo(can_spill=true) consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 70 bytes remain available for the total pool"; let res = r2.try_grow(150); assert!( matches!( @@ -590,7 +590,7 @@ mod tests { let r1_consumer = MemoryConsumer::new("r1"); let mut r1 = r1_consumer.clone().register(&pool); r1.grow(20); - let expected = "Resources exhausted with top memory consumers (across reservations) are: r1 consumed 20 bytes, r0 consumed 10 bytes. Error: Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated for this reservation - 70 bytes remain available for the total pool"; + let expected = "Additional allocation failed with top memory consumers (across reservations) as: r1 consumed 20 bytes, r0 consumed 10 bytes. Error: Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated for this reservation - 70 bytes remain available for the total pool"; let res = r0.try_grow(150); assert!( matches!( @@ -604,7 +604,7 @@ mod tests { // Test: unregister one // only the remaining one should be listed pool.unregister(&r1_consumer); - let expected_consumers = "Resources exhausted with top memory consumers (across reservations) are: r0 consumed 10 bytes"; + let expected_consumers = "Additional allocation failed with top memory consumers (across reservations) as: r0 consumed 10 bytes"; let res = r0.try_grow(150); assert!( matches!( diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index 25573d915959..420246595558 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -20,16 +20,21 @@ use crate::{ disk_manager::{DiskManager, DiskManagerConfig}, - memory_pool::{GreedyMemoryPool, MemoryPool, UnboundedMemoryPool}, + memory_pool::{ + GreedyMemoryPool, MemoryPool, TrackConsumersPool, UnboundedMemoryPool, + }, object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry}, }; use crate::cache::cache_manager::{CacheManager, CacheManagerConfig}; use datafusion_common::{DataFusionError, Result}; use object_store::ObjectStore; -use std::fmt::{Debug, Formatter}; use std::path::PathBuf; use std::sync::Arc; +use std::{ + fmt::{Debug, Formatter}, + num::NonZeroUsize, +}; use url::Url; #[derive(Clone)] @@ -213,7 +218,10 @@ impl RuntimeConfig { /// Note DataFusion does not yet respect this limit in all cases. pub fn with_memory_limit(self, max_memory: usize, memory_fraction: f64) -> Self { let pool_size = (max_memory as f64 * memory_fraction) as usize; - self.with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size))) + self.with_memory_pool(Arc::new(TrackConsumersPool::new( + GreedyMemoryPool::new(pool_size), + NonZeroUsize::new(5).unwrap(), + ))) } /// Use the specified path to create any needed temporary files diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 2840d3f62bf9..0868ee721665 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -693,9 +693,8 @@ mod tests { assert_contains!( err.to_string(), - "External error: Resources exhausted: Failed to allocate additional" + "External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: CrossJoinExec" ); - assert_contains!(err.to_string(), "CrossJoinExec"); Ok(()) } diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 14835f717ea3..e40a07cf6220 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -3821,13 +3821,11 @@ mod tests { let stream = join.execute(0, task_ctx)?; let err = common::collect(stream).await.unwrap_err(); + // Asserting that operator-level reservation attempting to overallocate assert_contains!( err.to_string(), - "External error: Resources exhausted: Failed to allocate additional" + "External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput" ); - - // Asserting that operator-level reservation attempting to overallocate - assert_contains!(err.to_string(), "HashJoinInput"); } Ok(()) @@ -3902,13 +3900,12 @@ mod tests { let stream = join.execute(1, task_ctx)?; let err = common::collect(stream).await.unwrap_err(); + // Asserting that stream-level reservation attempting to overallocate assert_contains!( err.to_string(), - "External error: Resources exhausted: Failed to allocate additional" - ); + "External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput[1]" - // Asserting that stream-level reservation attempting to overallocate - assert_contains!(err.to_string(), "HashJoinInput[1]"); + ); } Ok(()) diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index d69d818331be..04a025c93288 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -1039,9 +1039,8 @@ mod tests { assert_contains!( err.to_string(), - "External error: Resources exhausted: Failed to allocate additional" + "External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: NestedLoopJoinLoad[0]" ); - assert_contains!(err.to_string(), "NestedLoopJoinLoad[0]"); } Ok(())