Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamically Determine the Number of Output Files based on Configs #7767

Closed
devinjdangelo opened this issue Oct 7, 2023 · 1 comment · Fixed by #7791
Closed

Dynamically Determine the Number of Output Files based on Configs #7767

devinjdangelo opened this issue Oct 7, 2023 · 1 comment · Fixed by #7791
Labels
enhancement New feature or request

Comments

@devinjdangelo
Copy link
Contributor

devinjdangelo commented Oct 7, 2023

Is your feature request related to a problem or challenge?

Currently FileSinks (e.g. ParquetSink), output 1 file for each input partition. E.g. the FileSink::write_all method accepts a Vec<RecordBatchStream> and independently serializes and writes each RecordBatchStream to an ObjectStore. This setup is easy to implement and efficient for parallelization (trivial to spawn a task to process each RecordBatchStream in parallel), but there are a few drawbacks:

  1. The number of output files is arbitrarily determined by how the plan is partitioned. Often times, you will see 1 output file for each vcore in your system, even if that means there is 1 file with data and 15 empty files. This is confusing and poor UX The output of write_csv and write_json methods results in many empty output files, which is confusing #5383 .
  2. We provide an option single_file_output that enables forcing only 1 file output, but there is no finer grained control than that unless you explicitly repartition the plan (e.g. add a RoundRobinPartition(4) to get 4 output files).
  3. It is also unclear in the current setup how FileSink can or should handle writes to hive style partitioned tables Allow Inserts to Partitioned Listing Table #7744 since we cannot know the correct number of output files up front, and thus cannot construct a Vec<RecordBatchStream>

Describe the solution you'd like

I would like to provide users with options such as the following which will determine the number of output files:

  1. Maximum rows per file
  2. Maximum file size bytes

To respect these settings, the execution plan will need to dynamically create new file writers as execution proceeds, rather than all up front. Enabling this is a challenge similar to the one discussed in #7744. Ultimately, the input signature of FileSinks will need to change. Perhaps an upstream execution plan (FileSinkRepartionExec) could be responsible for dividing a single incoming RecordBatchStream into a dynamic number of output streams Stream<Item= RecordBatchStream>. FileSink then consume each stream as it arrives, spawning a new task to write each file.

FileSinkRepartitionExec could also have specialized logic for handling writes to hive style partitioned tables.

Describe alternatives you've considered

FileSink could also be reworked to accept a single RecordBatchStream and handle repartitioning logic within its own execution plan, rather than creating a new upstream plan.

Additional context

The proposed changes will likely reduce write performance somewhat. The efforts to parallelize individual file writing will help offset this performance impact, and ultimately, the improved UX are worth a slight performance regression in my opinion.

@alamb
Copy link
Contributor

alamb commented Oct 8, 2023

I would like to provide users with options such as the following which will determine the number of output files:

Maximum rows per file
Maximum

I agree this makes a lot of sense

FileSinkRepartitionExec could also have specialized logic for handling writes to hive style partitioned tables.

I think this is what makes the most sense to me. Maybe we could combine some of the same logic to avoid writing files unless they actually have data.

FileSink could also be reworked to accept a single RecordBatchStream and handle repartitioning logic within its own execution plan, rather than creating a new upstream plan.

I remember @tustvold @metesynnada and @ozankabak and I discussed the various tradeoffss between where the write partitoning would be determine (plan or in the writer) and i believe the conclusion was "it depends"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants