-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Test: Validate memory limit for sort queries to extended test #14142
Conversation
query: &str, | ||
baseline_query: &str, | ||
) { | ||
if std::env::var("DATAFUSION_TEST_MEM_LIMIT_VALIDATION").is_err() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am aware of the test_with
crate, which is able to let certain test case run when a env var is set, but I can't get it working if a test is running through a command
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for working on this @2010YOUY01 -- very exciting.
I would like to request we move these tests to the "extended" suite that runs on commits to main here: https://github.com/apache/datafusion/blob/main/.github/workflows/extended.yml
I worry that if we add these tests to every local and CI test run, it will significantly slow down development (as I think these tests force a recompile)
I manually tested via
I ran this like
cargo test --test core_integration -- memory_limit
I noticed that the submodule references to parquet-testing
and testing
(arrow-testing) are updated. I think that is fine but wanted to point it out
/// Runner that executes each test in a separate process with the required environment | ||
/// variable set. Memory limit validation tasks need to measure memory resident set | ||
/// size (RSS), so they must run in a separate process. | ||
#[test] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test takes more than 60 seconds on my laptop (which is longer than any othe rtest). Is there any way we can speed it up
SLOW [> 60.000s] datafusion::core_integration memory_limit::memory_limit_validation::sort_mem_validation::test_runner
PASS [ 64.625s] datafusion::core_integration memory_limit::memory_limit_validation::sort_mem_validation::test_runner
I think it is because the subprocess is calling cargo test
again (which is causing a recompile)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found for initial test compilation, recompile happens, but in later test runs it won't recompile.
I can't find a way to avoid it 🤦🏼
|
||
let mut handles = vec![]; | ||
|
||
// Run tests in parallel, each test in a separate process |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest we break these into their own tests (that each call a helper function) and leave the threading to the test runner (cargo test
or cargo nextest
)
That makes:
- The reporting better (the test runner prints out what tests are running)
- Controls threads better (the user can control the runner)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great point, updated in abd5d4e
To add it to the extended suite I suggest gating the tests with a environment variable (so normal invocations of Perhaps like DATAFUSION_EXTENDED_TEST=1 cargo test --test core_integration Or something to that effect |
// Spawn a monitoring task | ||
let monitor_handle = SpawnedTask::spawn(async move { | ||
let mut sys = System::new_all(); | ||
let mut interval = interval(Duration::from_millis(20)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
20 milliseconds seems quite long -- i would recommend a 1ms delay
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried 1ms, the tests take way longer to run. 7ms seems to be the smallest interval that won't affect execution speed. (so we have to make sure the profiled queries should take >> interval time to run, all current tests are satisfied)
|
||
let (_, max_rss) = measure_max_rss(|| async { df.collect().await.unwrap() }).await; | ||
|
||
println!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is quite clever, FWIW
Thank you for the review @alamb
I used features instead, this approach seems more common |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me -- thanks @2010YOUY01 -- I think we should merge it in and give it a try.
If it turns out that this is hard to maintain / fails intermittently we can reasses
I looked at the CI run after this merged to main and it ran well: https://github.com/apache/datafusion/actions/runs/12850882520/job/35830990299 I did file a small follow on to make the job naming clearer |
🎉 |
Which issue does this PR close?
Part of #13431
Rationale for this change
Datafusion supports memory-limited queries: it's implemented by tracking internal memory consumption to limit the total memory usage.
This feature needs to be verified externally: the profiled memory usage should be consistent with the specified limit.
Idea
Here is an example: compile and run datafusion-cli with memory limit, and profile the physical memory consumption:
The source relation in the query in theory should consume 800M memory (int64 * 100M), which can be checked with the same query without
order by
The ideal implementation of sorting uses O(N) space, so the query without memory limit should ideally use 800M + small memory for other internal data structures. If provided with a 400M memory limit, this query should run with around 400M physical memory.
This test module is implementing this kind of validation. (And found the memory consumption of sorting is not ideal, it consumes 2X-3X memory or worse, I plan to investigate it later)
Implementation
Implementing such test is a bit tricky. The utility functions for measuring memory RSS can only get the current process's RSS, thus each test cases have to be run in a separate process, and rust will let tests in the same module run in the same process but in different threads.
This PR uses the following workaround.
If a certain test case is run directly from 'cargo test', tests won't actually be runned. It uses a runner to be the actual entry point for all related tests.
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?