From a038d6f0cb7c77af4d4908da8529410b369eabb9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A9ctor=20Veiga=20Ortiz?= Date: Wed, 19 Jun 2024 18:02:33 -0700 Subject: [PATCH 01/12] feat: conditionally allow to keep partition_by columns --- datafusion/common/src/config.rs | 3 +++ .../core/src/datasource/file_format/write/demux.rs | 10 +++++++--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 47da14574c5d..d251f568fc27 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -303,6 +303,9 @@ config_namespace! { /// statistics into the same file groups. /// Currently experimental pub split_file_groups_by_statistics: bool, default = false + + /// Should Datafusion keep the columns used for partition_by in the output RecordBatches + pub keep_partition_by_columns: bool, default = false } } diff --git a/datafusion/core/src/datasource/file_format/write/demux.rs b/datafusion/core/src/datasource/file_format/write/demux.rs index d82c2471c596..ed290e739eea 100644 --- a/datafusion/core/src/datasource/file_format/write/demux.rs +++ b/datafusion/core/src/datasource/file_format/write/demux.rs @@ -246,6 +246,7 @@ async fn hive_style_partitions_demuxer( let exec_options = &context.session_config().options().execution; let max_buffered_recordbatches = exec_options.max_buffered_batches_per_output_file; + let keep_partition_by_columns = exec_options.keep_partition_by_columns; // To support non string partition col types, cast the type to &str first let mut value_map: HashMap, Sender> = HashMap::new(); @@ -298,9 +299,12 @@ async fn hive_style_partitions_demuxer( } }; - // remove partitions columns - let final_batch_to_send = - remove_partition_by_columns(&parted_batch, &partition_by)?; + let final_batch_to_send = if keep_partition_by_columns { + parted_batch + } else { + // remove partitions columns + remove_partition_by_columns(&parted_batch, &partition_by)? + }; // Finally send the partial batch partitioned by distinct value! part_tx.send(final_batch_to_send).await.map_err(|_| { From 44be0745ceda1f87729a42339357864c006ef95b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A9ctor=20Veiga=20Ortiz?= Date: Mon, 24 Jun 2024 10:30:37 -0700 Subject: [PATCH 02/12] feat: add flag to file sink config, add tests --- datafusion/common/src/config.rs | 3 --- .../core/src/datasource/file_format/arrow.rs | 1 + .../src/datasource/file_format/parquet.rs | 5 ++++- .../src/datasource/file_format/write/demux.rs | 7 ++++--- .../file_format/write/orchestration.rs | 1 + .../core/src/datasource/listing/table.rs | 1 + .../core/src/datasource/physical_plan/mod.rs | 2 ++ datafusion/core/src/physical_planner.rs | 19 ++++++++++++++++--- datafusion/sql/src/parser.rs | 6 +++++- datafusion/sqllogictest/test_files/copy.slt | 17 +++++++++++++++++ docs/source/user-guide/sql/dml.md | 4 +++- 11 files changed, 54 insertions(+), 12 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index d251f568fc27..47da14574c5d 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -303,9 +303,6 @@ config_namespace! { /// statistics into the same file groups. /// Currently experimental pub split_file_groups_by_statistics: bool, default = false - - /// Should Datafusion keep the columns used for partition_by in the output RecordBatches - pub keep_partition_by_columns: bool, default = false } } diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index 8c6790541597..ff24f2427aa6 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -223,6 +223,7 @@ impl DataSink for ArrowFileSink { part_col, self.config.table_paths[0].clone(), "arrow".into(), + self.config.keep_partition_by_columns, ); let mut file_write_tasks: JoinSet> = diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 4204593eba96..2bc794d81846 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -548,7 +548,7 @@ impl ParquetSink { /// of hive style partitioning where some columns are removed from the /// underlying files. fn get_writer_schema(&self) -> Arc { - if !self.config.table_partition_cols.is_empty() { + if !self.config.table_partition_cols.is_empty() && !self.config.keep_partition_by_columns { let schema = self.config.output_schema(); let partition_names: Vec<_> = self .config @@ -638,6 +638,7 @@ impl DataSink for ParquetSink { part_col, self.config.table_paths[0].clone(), "parquet".into(), + self.config.keep_partition_by_columns, ); let mut file_write_tasks: JoinSet< @@ -1875,6 +1876,7 @@ mod tests { output_schema: schema.clone(), table_partition_cols: vec![], overwrite: true, + keep_partition_by_columns: false, }; let parquet_sink = Arc::new(ParquetSink::new( file_sink_config, @@ -1969,6 +1971,7 @@ mod tests { output_schema: schema.clone(), table_partition_cols: vec![("a".to_string(), DataType::Utf8)], // add partitioning overwrite: true, + keep_partition_by_columns: false, }; let parquet_sink = Arc::new(ParquetSink::new( file_sink_config, diff --git a/datafusion/core/src/datasource/file_format/write/demux.rs b/datafusion/core/src/datasource/file_format/write/demux.rs index ed290e739eea..e29c877442cf 100644 --- a/datafusion/core/src/datasource/file_format/write/demux.rs +++ b/datafusion/core/src/datasource/file_format/write/demux.rs @@ -75,6 +75,7 @@ pub(crate) fn start_demuxer_task( partition_by: Option>, base_output_path: ListingTableUrl, file_extension: String, + keep_partition_by_columns: bool, ) -> (SpawnedTask>, DemuxedStreamReceiver) { let (tx, rx) = mpsc::unbounded_channel(); let context = context.clone(); @@ -91,6 +92,7 @@ pub(crate) fn start_demuxer_task( parts, base_output_path, file_extension, + keep_partition_by_columns, ) .await }) @@ -111,7 +113,7 @@ pub(crate) fn start_demuxer_task( (task, rx) } -/// Dynamically partitions input stream to acheive desired maximum rows per file +/// Dynamically partitions input stream to achieve desired maximum rows per file async fn row_count_demuxer( mut tx: UnboundedSender<(Path, Receiver)>, mut input: SendableRecordBatchStream, @@ -240,13 +242,13 @@ async fn hive_style_partitions_demuxer( partition_by: Vec<(String, DataType)>, base_output_path: ListingTableUrl, file_extension: String, + keep_partition_by_columns: bool, ) -> Result<()> { let write_id = rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 16); let exec_options = &context.session_config().options().execution; let max_buffered_recordbatches = exec_options.max_buffered_batches_per_output_file; - let keep_partition_by_columns = exec_options.keep_partition_by_columns; // To support non string partition col types, cast the type to &str first let mut value_map: HashMap, Sender> = HashMap::new(); @@ -302,7 +304,6 @@ async fn hive_style_partitions_demuxer( let final_batch_to_send = if keep_partition_by_columns { parted_batch } else { - // remove partitions columns remove_partition_by_columns(&parted_batch, &partition_by)? }; diff --git a/datafusion/core/src/datasource/file_format/write/orchestration.rs b/datafusion/core/src/datasource/file_format/write/orchestration.rs index 3ae2122de827..a62b5715aeb3 100644 --- a/datafusion/core/src/datasource/file_format/write/orchestration.rs +++ b/datafusion/core/src/datasource/file_format/write/orchestration.rs @@ -224,6 +224,7 @@ pub(crate) async fn stateless_multipart_put( part_cols, base_output_path.clone(), file_extension, + config.keep_partition_by_columns, ); let rb_buffer_size = &context diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 7f5e80c4988a..0c8238a615c2 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -926,6 +926,7 @@ impl TableProvider for ListingTable { output_schema: self.schema(), table_partition_cols: self.options.table_partition_cols.clone(), overwrite, + keep_partition_by_columns: false, }; let unsorted: Vec> = vec![]; diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 720e29e35582..a897895246e3 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -85,6 +85,8 @@ pub struct FileSinkConfig { pub table_partition_cols: Vec<(String, DataType)>, /// Controls whether existing data should be overwritten by this sink pub overwrite: bool, + /// Controls whether partition columns are kept for the file + pub keep_partition_by_columns: bool, } impl FileSinkConfig { diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 52d976c45cf3..d1005f2e8433 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -777,6 +777,18 @@ impl DefaultPhysicalPlanner { .map(|s| (s.to_string(), arrow_schema::DataType::Null)) .collect::>(); + let keep_partition_by_columns = source_option_tuples + .get("format.keep_partition_by_columns") + .map(|v| v.trim() == "true") + .unwrap_or(false); + + let mut updated_source_options_tuples = HashMap::new(); + for (k, v) in source_option_tuples { + if k != "format.keep_partition_by_columns" { + updated_source_options_tuples.insert(k.clone(), v.clone()); + } + } + // Set file sink related options let config = FileSinkConfig { object_store_url, @@ -785,26 +797,27 @@ impl DefaultPhysicalPlanner { output_schema: Arc::new(schema), table_partition_cols, overwrite: false, + keep_partition_by_columns, }; let mut table_options = session_state.default_table_options(); let sink_format: Arc = match format_options { FormatOptions::CSV(options) => { table_options.csv = options.clone(); table_options.set_file_format(FileType::CSV); - table_options.alter_with_string_hash_map(source_option_tuples)?; + table_options.alter_with_string_hash_map(&updated_source_options_tuples)?; Arc::new(CsvFormat::default().with_options(table_options.csv)) } FormatOptions::JSON(options) => { table_options.json = options.clone(); table_options.set_file_format(FileType::JSON); - table_options.alter_with_string_hash_map(source_option_tuples)?; + table_options.alter_with_string_hash_map(&updated_source_options_tuples)?; Arc::new(JsonFormat::default().with_options(table_options.json)) } #[cfg(feature = "parquet")] FormatOptions::PARQUET(options) => { table_options.parquet = options.clone(); table_options.set_file_format(FileType::PARQUET); - table_options.alter_with_string_hash_map(source_option_tuples)?; + table_options.alter_with_string_hash_map(&updated_source_options_tuples)?; Arc::new( ParquetFormat::default().with_options(table_options.parquet), ) diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs index d2f3f508a316..2d7c0688dbbd 100644 --- a/datafusion/sql/src/parser.rs +++ b/datafusion/sql/src/parser.rs @@ -1475,7 +1475,7 @@ mod tests { fn copy_to_multi_options() -> Result<(), ParserError> { // order of options is preserved let sql = - "COPY foo TO bar STORED AS parquet OPTIONS ('format.row_group_size' 55, 'format.compression' snappy)"; + "COPY foo TO bar STORED AS parquet OPTIONS ('format.row_group_size' 55, 'format.compression' snappy, 'keep_partition_by_columns' true)"; let expected_options = vec![ ( @@ -1486,6 +1486,10 @@ mod tests { "format.compression".to_string(), Value::SingleQuotedString("snappy".to_string()), ), + ( + "keep_partition_by_columns".to_string(), + Value::SingleQuotedString("true".to_string()), + ), ]; let mut statements = DFParser::parse_sql(sql).unwrap(); diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index 00bcea7ec154..1f012d6e8850 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -166,6 +166,23 @@ physical_plan 01)DataSinkExec: sink=ParquetSink(file_groups=[]) 02)--MemoryExec: partitions=1, partition_sizes=[1] +# Copy to directory as partitioned files with keep_partition_by_columns enabled +query TT +COPY (values ('1', 'a'), ('2', 'b'), ('3', 'c')) TO 'test_files/scratch/copy/partitioned_table4/' STORED AS parquet PARTITIONED BY (column1) +OPTIONS ('keep_partition_by_columns' true); +---- +3 + +# validate generated file contains tables +statement ok +CREATE EXTERNAL TABLE validate_partitioned_parquet4 STORED AS PARQUET +LOCATION 'test_files/scratch/copy/partitioned_table4/column1=1/*.parquet'; + +query TT +select column1, column2 from validate_partitioned_parquet4 order by column1,column2; +---- +1 a + # Copy more files to directory via query query IT COPY (select * from source_table UNION ALL select * from source_table) to 'test_files/scratch/copy/table/' STORED AS PARQUET; diff --git a/docs/source/user-guide/sql/dml.md b/docs/source/user-guide/sql/dml.md index 42e0c8054c9b..c1c609261898 100644 --- a/docs/source/user-guide/sql/dml.md +++ b/docs/source/user-guide/sql/dml.md @@ -39,7 +39,9 @@ TO 'file_name' clause is not specified, it will be inferred from the file extension if possible. `PARTITIONED BY` specifies the columns to use for partitioning the output files into -separate hive-style directories. +separate hive-style directories. By default, columns used in `PARTITIONED BY` will be removed +from the output format. If you want to keep the columns, you should provide the option +`keep_partition_by_columns true`. The output format is determined by the first match of the following rules: From 43b931c3ebd504c1ad560a84f710d1a237bf65f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A9ctor=20Veiga=20Ortiz?= Date: Tue, 25 Jun 2024 09:39:17 -0700 Subject: [PATCH 03/12] this commit contains: - separate options by prefix 'hive.' - add hive_options to CopyTo struct - add more documentation - add session execution flag to enable feature, false by default --- datafusion/common/src/config.rs | 3 +++ datafusion/core/src/dataframe/mod.rs | 2 ++ datafusion/core/src/dataframe/parquet.rs | 1 + .../core/src/datasource/listing/table.rs | 3 ++- datafusion/core/src/physical_planner.rs | 20 +++++++------------ datafusion/expr/src/logical_plan/builder.rs | 4 +++- datafusion/expr/src/logical_plan/display.rs | 9 ++++++++- datafusion/expr/src/logical_plan/dml.rs | 2 ++ datafusion/expr/src/logical_plan/plan.rs | 2 ++ datafusion/expr/src/logical_plan/tree_node.rs | 2 ++ datafusion/sql/src/statement.rs | 12 ++++++++++- datafusion/sqllogictest/test_files/copy.slt | 2 +- docs/source/user-guide/sql/dml.md | 3 ++- docs/source/user-guide/sql/write_options.md | 10 ++++++++++ 14 files changed, 56 insertions(+), 19 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 47da14574c5d..d251f568fc27 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -303,6 +303,9 @@ config_namespace! { /// statistics into the same file groups. /// Currently experimental pub split_file_groups_by_statistics: bool, default = false + + /// Should Datafusion keep the columns used for partition_by in the output RecordBatches + pub keep_partition_by_columns: bool, default = false } } diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index e1fc8273e6ff..88fd0cedf483 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -1338,6 +1338,7 @@ impl DataFrame { FormatOptions::CSV(props), HashMap::new(), options.partition_by, + Default::default(), )? .build()?; @@ -1393,6 +1394,7 @@ impl DataFrame { FormatOptions::JSON(props), Default::default(), options.partition_by, + Default::default(), )? .build()?; diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 0ec46df0ae5d..0299ddecfd4c 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -66,6 +66,7 @@ impl DataFrame { FormatOptions::PARQUET(props), Default::default(), options.partition_by, + Default::default(), )? .build()?; DataFrame { diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 0c8238a615c2..26bb7740e9de 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -917,6 +917,7 @@ impl TableProvider for ListingTable { .await?; let file_groups = file_list_stream.try_collect::>().await?; + let keep_partition_by_columns = state.config().options().execution.keep_partition_by_columns; // Sink related option, apart from format let config = FileSinkConfig { @@ -926,7 +927,7 @@ impl TableProvider for ListingTable { output_schema: self.schema(), table_partition_cols: self.options.table_partition_cols.clone(), overwrite, - keep_partition_by_columns: false, + keep_partition_by_columns, }; let unsorted: Vec> = vec![]; diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index d1005f2e8433..ff99e31d0b85 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -762,6 +762,7 @@ impl DefaultPhysicalPlanner { format_options, partition_by, options: source_option_tuples, + hive_options }) => { let input_exec = children.one()?; let parsed_url = ListingTableUrl::parse(output_url)?; @@ -777,17 +778,10 @@ impl DefaultPhysicalPlanner { .map(|s| (s.to_string(), arrow_schema::DataType::Null)) .collect::>(); - let keep_partition_by_columns = source_option_tuples - .get("format.keep_partition_by_columns") + let keep_partition_by_columns = hive_options + .get("hive.keep_partition_by_columns") .map(|v| v.trim() == "true") - .unwrap_or(false); - - let mut updated_source_options_tuples = HashMap::new(); - for (k, v) in source_option_tuples { - if k != "format.keep_partition_by_columns" { - updated_source_options_tuples.insert(k.clone(), v.clone()); - } - } + .unwrap_or(false) || session_state.config().options().execution.keep_partition_by_columns; // Set file sink related options let config = FileSinkConfig { @@ -804,20 +798,20 @@ impl DefaultPhysicalPlanner { FormatOptions::CSV(options) => { table_options.csv = options.clone(); table_options.set_file_format(FileType::CSV); - table_options.alter_with_string_hash_map(&updated_source_options_tuples)?; + table_options.alter_with_string_hash_map(source_option_tuples)?; Arc::new(CsvFormat::default().with_options(table_options.csv)) } FormatOptions::JSON(options) => { table_options.json = options.clone(); table_options.set_file_format(FileType::JSON); - table_options.alter_with_string_hash_map(&updated_source_options_tuples)?; + table_options.alter_with_string_hash_map(source_option_tuples)?; Arc::new(JsonFormat::default().with_options(table_options.json)) } #[cfg(feature = "parquet")] FormatOptions::PARQUET(options) => { table_options.parquet = options.clone(); table_options.set_file_format(FileType::PARQUET); - table_options.alter_with_string_hash_map(&updated_source_options_tuples)?; + table_options.alter_with_string_hash_map(source_option_tuples)?; Arc::new( ParquetFormat::default().with_options(table_options.parquet), ) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 434f4dace1de..d94b071a3127 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -274,13 +274,15 @@ impl LogicalPlanBuilder { format_options: FormatOptions, options: HashMap, partition_by: Vec, + hive_options: HashMap, ) -> Result { Ok(Self::from(LogicalPlan::Copy(CopyTo { input: Arc::new(input), output_url, + partition_by, format_options, options, - partition_by, + hive_options, }))) } diff --git a/datafusion/expr/src/logical_plan/display.rs b/datafusion/expr/src/logical_plan/display.rs index 707cff8ab5f1..308ca1e0da20 100644 --- a/datafusion/expr/src/logical_plan/display.rs +++ b/datafusion/expr/src/logical_plan/display.rs @@ -428,17 +428,24 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> { format_options, partition_by: _, options, + hive_options, }) => { let op_str = options .iter() .map(|(k, v)| format!("{}={}", k, v)) .collect::>() .join(", "); + let hive_op_str = hive_options + .iter() + .map(|(k, v)| format!("{}={}", k, v)) + .collect::>() + .join(", "); json!({ "Node Type": "CopyTo", "Output URL": output_url, "Format Options": format!("{}", format_options), - "Options": op_str + "Options": op_str, + "Hive Options": hive_op_str, }) } LogicalPlan::Ddl(ddl) => { diff --git a/datafusion/expr/src/logical_plan/dml.rs b/datafusion/expr/src/logical_plan/dml.rs index 13f3759ab8c0..2eb085087484 100644 --- a/datafusion/expr/src/logical_plan/dml.rs +++ b/datafusion/expr/src/logical_plan/dml.rs @@ -39,6 +39,8 @@ pub struct CopyTo { pub format_options: FormatOptions, /// SQL Options that can affect the formats pub options: HashMap, + /// Hive Options that can affect hive-style partitioning + pub hive_options: HashMap, } // Implement PartialEq manually diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 6e7efaf39e3e..fd445bedabe3 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -860,12 +860,14 @@ impl LogicalPlan { format_options, options, partition_by, + hive_options, }) => Ok(LogicalPlan::Copy(CopyTo { input: Arc::new(inputs.swap_remove(0)), output_url: output_url.clone(), format_options: format_options.clone(), options: options.clone(), partition_by: partition_by.clone(), + hive_options: hive_options.clone(), })), LogicalPlan::Values(Values { schema, .. }) => { Ok(LogicalPlan::Values(Values { diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index 86c0cffd80a1..1d6d0fe07760 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -258,6 +258,7 @@ impl TreeNode for LogicalPlan { partition_by, format_options, options, + hive_options, }) => rewrite_arc(input, f)?.update_data(|input| { LogicalPlan::Copy(CopyTo { input, @@ -265,6 +266,7 @@ impl TreeNode for LogicalPlan { partition_by, format_options, options, + hive_options, }) }), LogicalPlan::Ddl(ddl) => { diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index cb492b390c76..f062f313ad10 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -881,6 +881,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { }; let mut options = HashMap::new(); + let mut hive_options = HashMap::new(); for (key, value) in statement.options { let value_string = match value_to_string(&value) { None => { @@ -888,7 +889,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } Some(v) => v, }; - if !(&key.contains('.')) { + + if key.to_lowercase().contains("keep_partition_by_columns") { + let renamed_key = if !&key.starts_with("hive.") { + format!("hive.{}", key) + } else { + key + }; + hive_options.insert(renamed_key.to_lowercase(), value_string.to_lowercase()); + } else if !(&key.contains('.')) { // If config does not belong to any namespace, assume it is // a format option and apply the format prefix for backwards // compatibility. @@ -941,6 +950,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { format_options: file_type.into(), partition_by, options, + hive_options, })) } diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index 1f012d6e8850..8ed3b88016cf 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -169,7 +169,7 @@ physical_plan # Copy to directory as partitioned files with keep_partition_by_columns enabled query TT COPY (values ('1', 'a'), ('2', 'b'), ('3', 'c')) TO 'test_files/scratch/copy/partitioned_table4/' STORED AS parquet PARTITIONED BY (column1) -OPTIONS ('keep_partition_by_columns' true); +OPTIONS (KEEP_PARTITION_BY_COLUMNS true); ---- 3 diff --git a/docs/source/user-guide/sql/dml.md b/docs/source/user-guide/sql/dml.md index c1c609261898..61d63bf20e80 100644 --- a/docs/source/user-guide/sql/dml.md +++ b/docs/source/user-guide/sql/dml.md @@ -41,7 +41,8 @@ clause is not specified, it will be inferred from the file extension if possible `PARTITIONED BY` specifies the columns to use for partitioning the output files into separate hive-style directories. By default, columns used in `PARTITIONED BY` will be removed from the output format. If you want to keep the columns, you should provide the option -`keep_partition_by_columns true`. +`KEEP_PARTITION_BY_COLUMNS true`. `KEEP_PARTITION_BY_COLUMNS` flag can also be enabled +through `ExecutionOptions` within `SessionConfig`. The output format is determined by the first match of the following rules: diff --git a/docs/source/user-guide/sql/write_options.md b/docs/source/user-guide/sql/write_options.md index 3c4790dd0255..89e6d08efee1 100644 --- a/docs/source/user-guide/sql/write_options.md +++ b/docs/source/user-guide/sql/write_options.md @@ -70,6 +70,16 @@ In this example, we write the entirety of `source_table` out to a folder of parq ## Available Options +### Hive Specific Options + +The following options are available when writing hive-style partitioned data. + +| Option | Description | Default Value | +|---------------------------|------------------------------------------------------------------------------------|---------------| +| KEEP_PARTITION_BY_COLUMNS | Flag to retain the columns in the output data when using `PARTITIONED BY` queries. | false | + +Note: `KEEP_PARTITION_BY_COLUMNS` flag can also be enabled through `ExecutionOptions` within `SessionConfig`. + ### JSON Format Specific Options The following options are available when writing JSON files. Note: If any unsupported option is specified, an error will be raised and the query will fail. From 15782738efe4fefedb6b868f86204f03fd8751e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A9ctor=20Veiga=20Ortiz?= Date: Tue, 25 Jun 2024 15:35:00 -0700 Subject: [PATCH 04/12] do not add hive_options to CopyTo --- datafusion/common/src/config.rs | 5 +++++ datafusion/core/src/dataframe/mod.rs | 2 -- datafusion/core/src/dataframe/parquet.rs | 1 - datafusion/core/src/physical_planner.rs | 3 +-- datafusion/expr/src/logical_plan/builder.rs | 2 -- datafusion/expr/src/logical_plan/display.rs | 9 +-------- datafusion/expr/src/logical_plan/dml.rs | 2 -- datafusion/expr/src/logical_plan/plan.rs | 2 -- datafusion/expr/src/logical_plan/tree_node.rs | 2 -- datafusion/sql/src/statement.rs | 4 +--- 10 files changed, 8 insertions(+), 24 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index d251f568fc27..77fd535219a5 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1285,6 +1285,11 @@ impl TableOptions { return ConfigField::set(self, key, value); } + // Only used for hive.keep_partition_by_columns + if prefix == "hive" { + return Ok(()); + } + let Some(e) = self.extensions.0.get_mut(prefix) else { return _config_err!("Could not find config namespace \"{prefix}\""); }; diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 88fd0cedf483..e1fc8273e6ff 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -1338,7 +1338,6 @@ impl DataFrame { FormatOptions::CSV(props), HashMap::new(), options.partition_by, - Default::default(), )? .build()?; @@ -1394,7 +1393,6 @@ impl DataFrame { FormatOptions::JSON(props), Default::default(), options.partition_by, - Default::default(), )? .build()?; diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 0299ddecfd4c..0ec46df0ae5d 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -66,7 +66,6 @@ impl DataFrame { FormatOptions::PARQUET(props), Default::default(), options.partition_by, - Default::default(), )? .build()?; DataFrame { diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index ff99e31d0b85..5effe0ead7a3 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -762,7 +762,6 @@ impl DefaultPhysicalPlanner { format_options, partition_by, options: source_option_tuples, - hive_options }) => { let input_exec = children.one()?; let parsed_url = ListingTableUrl::parse(output_url)?; @@ -778,7 +777,7 @@ impl DefaultPhysicalPlanner { .map(|s| (s.to_string(), arrow_schema::DataType::Null)) .collect::>(); - let keep_partition_by_columns = hive_options + let keep_partition_by_columns = source_option_tuples .get("hive.keep_partition_by_columns") .map(|v| v.trim() == "true") .unwrap_or(false) || session_state.config().options().execution.keep_partition_by_columns; diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index d94b071a3127..2761be52501f 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -274,7 +274,6 @@ impl LogicalPlanBuilder { format_options: FormatOptions, options: HashMap, partition_by: Vec, - hive_options: HashMap, ) -> Result { Ok(Self::from(LogicalPlan::Copy(CopyTo { input: Arc::new(input), @@ -282,7 +281,6 @@ impl LogicalPlanBuilder { partition_by, format_options, options, - hive_options, }))) } diff --git a/datafusion/expr/src/logical_plan/display.rs b/datafusion/expr/src/logical_plan/display.rs index 308ca1e0da20..707cff8ab5f1 100644 --- a/datafusion/expr/src/logical_plan/display.rs +++ b/datafusion/expr/src/logical_plan/display.rs @@ -428,24 +428,17 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> { format_options, partition_by: _, options, - hive_options, }) => { let op_str = options .iter() .map(|(k, v)| format!("{}={}", k, v)) .collect::>() .join(", "); - let hive_op_str = hive_options - .iter() - .map(|(k, v)| format!("{}={}", k, v)) - .collect::>() - .join(", "); json!({ "Node Type": "CopyTo", "Output URL": output_url, "Format Options": format!("{}", format_options), - "Options": op_str, - "Hive Options": hive_op_str, + "Options": op_str }) } LogicalPlan::Ddl(ddl) => { diff --git a/datafusion/expr/src/logical_plan/dml.rs b/datafusion/expr/src/logical_plan/dml.rs index 2eb085087484..13f3759ab8c0 100644 --- a/datafusion/expr/src/logical_plan/dml.rs +++ b/datafusion/expr/src/logical_plan/dml.rs @@ -39,8 +39,6 @@ pub struct CopyTo { pub format_options: FormatOptions, /// SQL Options that can affect the formats pub options: HashMap, - /// Hive Options that can affect hive-style partitioning - pub hive_options: HashMap, } // Implement PartialEq manually diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index fd445bedabe3..6e7efaf39e3e 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -860,14 +860,12 @@ impl LogicalPlan { format_options, options, partition_by, - hive_options, }) => Ok(LogicalPlan::Copy(CopyTo { input: Arc::new(inputs.swap_remove(0)), output_url: output_url.clone(), format_options: format_options.clone(), options: options.clone(), partition_by: partition_by.clone(), - hive_options: hive_options.clone(), })), LogicalPlan::Values(Values { schema, .. }) => { Ok(LogicalPlan::Values(Values { diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index 1d6d0fe07760..86c0cffd80a1 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -258,7 +258,6 @@ impl TreeNode for LogicalPlan { partition_by, format_options, options, - hive_options, }) => rewrite_arc(input, f)?.update_data(|input| { LogicalPlan::Copy(CopyTo { input, @@ -266,7 +265,6 @@ impl TreeNode for LogicalPlan { partition_by, format_options, options, - hive_options, }) }), LogicalPlan::Ddl(ddl) => { diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index f062f313ad10..5303e575d65e 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -881,7 +881,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { }; let mut options = HashMap::new(); - let mut hive_options = HashMap::new(); for (key, value) in statement.options { let value_string = match value_to_string(&value) { None => { @@ -896,7 +895,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } else { key }; - hive_options.insert(renamed_key.to_lowercase(), value_string.to_lowercase()); + options.insert(renamed_key.to_lowercase(), value_string.to_lowercase()); } else if !(&key.contains('.')) { // If config does not belong to any namespace, assume it is // a format option and apply the format prefix for backwards @@ -950,7 +949,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { format_options: file_type.into(), partition_by, options, - hive_options, })) } From 20197a17d6bacf996237d93a6690a12d1e63cca8 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 25 Jun 2024 19:23:02 -0400 Subject: [PATCH 05/12] npx prettier --- docs/source/user-guide/sql/dml.md | 6 +++--- docs/source/user-guide/sql/write_options.md | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/source/user-guide/sql/dml.md b/docs/source/user-guide/sql/dml.md index 61d63bf20e80..201a02bbb43c 100644 --- a/docs/source/user-guide/sql/dml.md +++ b/docs/source/user-guide/sql/dml.md @@ -39,9 +39,9 @@ TO 'file_name' clause is not specified, it will be inferred from the file extension if possible. `PARTITIONED BY` specifies the columns to use for partitioning the output files into -separate hive-style directories. By default, columns used in `PARTITIONED BY` will be removed -from the output format. If you want to keep the columns, you should provide the option -`KEEP_PARTITION_BY_COLUMNS true`. `KEEP_PARTITION_BY_COLUMNS` flag can also be enabled +separate hive-style directories. By default, columns used in `PARTITIONED BY` will be removed +from the output format. If you want to keep the columns, you should provide the option +`KEEP_PARTITION_BY_COLUMNS true`. `KEEP_PARTITION_BY_COLUMNS` flag can also be enabled through `ExecutionOptions` within `SessionConfig`. The output format is determined by the first match of the following rules: diff --git a/docs/source/user-guide/sql/write_options.md b/docs/source/user-guide/sql/write_options.md index 89e6d08efee1..cf54bb21c773 100644 --- a/docs/source/user-guide/sql/write_options.md +++ b/docs/source/user-guide/sql/write_options.md @@ -75,7 +75,7 @@ In this example, we write the entirety of `source_table` out to a folder of parq The following options are available when writing hive-style partitioned data. | Option | Description | Default Value | -|---------------------------|------------------------------------------------------------------------------------|---------------| +| ------------------------- | ---------------------------------------------------------------------------------- | ------------- | | KEEP_PARTITION_BY_COLUMNS | Flag to retain the columns in the output data when using `PARTITIONED BY` queries. | false | Note: `KEEP_PARTITION_BY_COLUMNS` flag can also be enabled through `ExecutionOptions` within `SessionConfig`. From 206b274f480a56559d251fc159b04e571c6ad17a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 25 Jun 2024 19:23:07 -0400 Subject: [PATCH 06/12] fmt --- datafusion/core/src/datasource/file_format/parquet.rs | 4 +++- datafusion/core/src/datasource/listing/table.rs | 3 ++- datafusion/core/src/physical_planner.rs | 7 ++++++- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 2bc794d81846..7ed98bcee9eb 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -548,7 +548,9 @@ impl ParquetSink { /// of hive style partitioning where some columns are removed from the /// underlying files. fn get_writer_schema(&self) -> Arc { - if !self.config.table_partition_cols.is_empty() && !self.config.keep_partition_by_columns { + if !self.config.table_partition_cols.is_empty() + && !self.config.keep_partition_by_columns + { let schema = self.config.output_schema(); let partition_names: Vec<_> = self .config diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 26bb7740e9de..c40a7343ad60 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -917,7 +917,8 @@ impl TableProvider for ListingTable { .await?; let file_groups = file_list_stream.try_collect::>().await?; - let keep_partition_by_columns = state.config().options().execution.keep_partition_by_columns; + let keep_partition_by_columns = + state.config().options().execution.keep_partition_by_columns; // Sink related option, apart from format let config = FileSinkConfig { diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 5effe0ead7a3..10f75fe3b7d8 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -780,7 +780,12 @@ impl DefaultPhysicalPlanner { let keep_partition_by_columns = source_option_tuples .get("hive.keep_partition_by_columns") .map(|v| v.trim() == "true") - .unwrap_or(false) || session_state.config().options().execution.keep_partition_by_columns; + .unwrap_or(false) + || session_state + .config() + .options() + .execution + .keep_partition_by_columns; // Set file sink related options let config = FileSinkConfig { From e91397aa4837066807f9599f9cded99649cdfc5b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A9ctor=20Veiga=20Ortiz?= Date: Wed, 26 Jun 2024 11:39:34 -0700 Subject: [PATCH 07/12] change prefix to execution. , update override order for condition. --- datafusion/common/src/config.rs | 3 +-- datafusion/core/src/physical_planner.rs | 15 ++++++++------- datafusion/sql/src/statement.rs | 9 +-------- datafusion/sqllogictest/test_files/copy.slt | 2 +- docs/source/user-guide/sql/dml.md | 4 ++-- docs/source/user-guide/sql/write_options.md | 10 +++++----- 6 files changed, 18 insertions(+), 25 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 77fd535219a5..c30eb56e0023 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1285,8 +1285,7 @@ impl TableOptions { return ConfigField::set(self, key, value); } - // Only used for hive.keep_partition_by_columns - if prefix == "hive" { + if prefix == "execution" { return Ok(()); } diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 10f75fe3b7d8..4c753d7d745c 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -778,14 +778,15 @@ impl DefaultPhysicalPlanner { .collect::>(); let keep_partition_by_columns = source_option_tuples - .get("hive.keep_partition_by_columns") + .get("execution.keep_partition_by_columns") .map(|v| v.trim() == "true") - .unwrap_or(false) - || session_state - .config() - .options() - .execution - .keep_partition_by_columns; + .unwrap_or( + session_state + .config() + .options() + .execution + .keep_partition_by_columns, + ); // Set file sink related options let config = FileSinkConfig { diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 5303e575d65e..d5d17a05fda9 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -889,14 +889,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { Some(v) => v, }; - if key.to_lowercase().contains("keep_partition_by_columns") { - let renamed_key = if !&key.starts_with("hive.") { - format!("hive.{}", key) - } else { - key - }; - options.insert(renamed_key.to_lowercase(), value_string.to_lowercase()); - } else if !(&key.contains('.')) { + if !(&key.contains('.')) { // If config does not belong to any namespace, assume it is // a format option and apply the format prefix for backwards // compatibility. diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index 8ed3b88016cf..c7398878f9a6 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -169,7 +169,7 @@ physical_plan # Copy to directory as partitioned files with keep_partition_by_columns enabled query TT COPY (values ('1', 'a'), ('2', 'b'), ('3', 'c')) TO 'test_files/scratch/copy/partitioned_table4/' STORED AS parquet PARTITIONED BY (column1) -OPTIONS (KEEP_PARTITION_BY_COLUMNS true); +OPTIONS (execution.keep_partition_by_columns true); ---- 3 diff --git a/docs/source/user-guide/sql/dml.md b/docs/source/user-guide/sql/dml.md index 201a02bbb43c..acee247dee9f 100644 --- a/docs/source/user-guide/sql/dml.md +++ b/docs/source/user-guide/sql/dml.md @@ -41,8 +41,8 @@ clause is not specified, it will be inferred from the file extension if possible `PARTITIONED BY` specifies the columns to use for partitioning the output files into separate hive-style directories. By default, columns used in `PARTITIONED BY` will be removed from the output format. If you want to keep the columns, you should provide the option -`KEEP_PARTITION_BY_COLUMNS true`. `KEEP_PARTITION_BY_COLUMNS` flag can also be enabled -through `ExecutionOptions` within `SessionConfig`. +`execution.keep_partition_by_columns true`. `execution.keep_partition_by_columns` flag can also +be enabled through `ExecutionOptions` within `SessionConfig`. The output format is determined by the first match of the following rules: diff --git a/docs/source/user-guide/sql/write_options.md b/docs/source/user-guide/sql/write_options.md index cf54bb21c773..8f40a47535b3 100644 --- a/docs/source/user-guide/sql/write_options.md +++ b/docs/source/user-guide/sql/write_options.md @@ -70,15 +70,15 @@ In this example, we write the entirety of `source_table` out to a folder of parq ## Available Options -### Hive Specific Options +### Execution Specific Options The following options are available when writing hive-style partitioned data. -| Option | Description | Default Value | -| ------------------------- | ---------------------------------------------------------------------------------- | ------------- | -| KEEP_PARTITION_BY_COLUMNS | Flag to retain the columns in the output data when using `PARTITIONED BY` queries. | false | +| Option | Description | Default Value | +|-------------------------------------| ---------------------------------------------------------------------------------- | ------------- | +| execution.keep_partition_by_columns | Flag to retain the columns in the output data when using `PARTITIONED BY` queries. | false | -Note: `KEEP_PARTITION_BY_COLUMNS` flag can also be enabled through `ExecutionOptions` within `SessionConfig`. +Note: `execution.keep_partition_by_columns` flag can also be enabled through `ExecutionOptions` within `SessionConfig`. ### JSON Format Specific Options From e35220337c2e0034482617d5f8c7d9e9247b0d87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A9ctor=20Veiga=20Ortiz?= Date: Thu, 27 Jun 2024 10:21:15 -0700 Subject: [PATCH 08/12] improve handling of flag, added test for config error --- datafusion/core/src/physical_planner.rs | 17 ++++++++--------- datafusion/sql/src/parser.rs | 4 ++-- datafusion/sqllogictest/test_files/copy.slt | 4 ++++ 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 4c753d7d745c..36aad9ce4eb5 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -777,16 +777,15 @@ impl DefaultPhysicalPlanner { .map(|s| (s.to_string(), arrow_schema::DataType::Null)) .collect::>(); - let keep_partition_by_columns = source_option_tuples + let keep_partition_by_columns = match source_option_tuples .get("execution.keep_partition_by_columns") - .map(|v| v.trim() == "true") - .unwrap_or( - session_state - .config() - .options() - .execution - .keep_partition_by_columns, - ); + .map(|v| v.trim()) { + None => session_state.config().options().execution.keep_partition_by_columns, + Some("true") => true, + Some("false") => false, + Some(value) => + return Err(DataFusionError::Configuration(format!("provided value for 'execution.keep_partition_by_columns' was not recognized: \"{}\"", value))), + }; // Set file sink related options let config = FileSinkConfig { diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs index 2d7c0688dbbd..5da7f7176509 100644 --- a/datafusion/sql/src/parser.rs +++ b/datafusion/sql/src/parser.rs @@ -1475,7 +1475,7 @@ mod tests { fn copy_to_multi_options() -> Result<(), ParserError> { // order of options is preserved let sql = - "COPY foo TO bar STORED AS parquet OPTIONS ('format.row_group_size' 55, 'format.compression' snappy, 'keep_partition_by_columns' true)"; + "COPY foo TO bar STORED AS parquet OPTIONS ('format.row_group_size' 55, 'format.compression' snappy, 'execution.keep_partition_by_columns' true)"; let expected_options = vec![ ( @@ -1487,7 +1487,7 @@ mod tests { Value::SingleQuotedString("snappy".to_string()), ), ( - "keep_partition_by_columns".to_string(), + "execution.keep_partition_by_columns".to_string(), Value::SingleQuotedString("true".to_string()), ), ]; diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index c7398878f9a6..21c34bc25cee 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -606,3 +606,7 @@ COPY (select col2, sum(col1) from source_table # Copy from table with non literal query error DataFusion error: SQL error: ParserError\("Unexpected token \("\) COPY source_table to '/tmp/table.parquet' (row_group_size 55 + 102); + +# Copy using execution.keep_partition_by_columns with an invalid value +query error DataFusion error: Invalid or Unsupported Configuration: provided value for 'execution.keep_partition_by_columns' was not recognized: "invalid_value" +COPY source_table to '/tmp/table.parquet' OPTIONS (execution.keep_partition_by_columns invalid_value); \ No newline at end of file From 41328277bbdec9555fb77f9e064f6d9be006e799 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A9ctor=20Veiga=20Ortiz?= Date: Thu, 27 Jun 2024 15:07:27 -0700 Subject: [PATCH 09/12] trying to make CI happier --- datafusion/proto/proto/datafusion.proto | 1 + datafusion/proto/src/generated/pbjson.rs | 18 ++++++++++++++++++ datafusion/proto/src/generated/prost.rs | 2 ++ .../proto/src/physical_plan/from_proto.rs | 1 + datafusion/proto/src/physical_plan/to_proto.rs | 1 + docs/source/user-guide/sql/dml.md | 2 +- docs/source/user-guide/sql/write_options.md | 2 +- 7 files changed, 25 insertions(+), 2 deletions(-) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 2e7005a4cb13..8aeddd8643df 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -760,6 +760,7 @@ message FileSinkConfig { datafusion_common.Schema output_schema = 4; repeated PartitionColumn table_partition_cols = 5; bool overwrite = 8; + bool keep_partition_by_columns = 9; } message JsonSink { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 8fdc5d2e4db2..37722d22d61b 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -5615,6 +5615,9 @@ impl serde::Serialize for FileSinkConfig { if self.overwrite { len += 1; } + if self.keep_partition_by_columns { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.FileSinkConfig", len)?; if !self.object_store_url.is_empty() { struct_ser.serialize_field("objectStoreUrl", &self.object_store_url)?; @@ -5634,6 +5637,9 @@ impl serde::Serialize for FileSinkConfig { if self.overwrite { struct_ser.serialize_field("overwrite", &self.overwrite)?; } + if self.keep_partition_by_columns { + struct_ser.serialize_field("keepPartitionByColumns", &self.keep_partition_by_columns)?; + } struct_ser.end() } } @@ -5655,6 +5661,8 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { "table_partition_cols", "tablePartitionCols", "overwrite", + "keep_partition_by_columns", + "keepPartitionByColumns", ]; #[allow(clippy::enum_variant_names)] @@ -5665,6 +5673,7 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { OutputSchema, TablePartitionCols, Overwrite, + KeepPartitionByColumns, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -5692,6 +5701,7 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { "outputSchema" | "output_schema" => Ok(GeneratedField::OutputSchema), "tablePartitionCols" | "table_partition_cols" => Ok(GeneratedField::TablePartitionCols), "overwrite" => Ok(GeneratedField::Overwrite), + "keepPartitionByColumns" | "keep_partition_by_columns" => Ok(GeneratedField::KeepPartitionByColumns), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -5717,6 +5727,7 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { let mut output_schema__ = None; let mut table_partition_cols__ = None; let mut overwrite__ = None; + let mut keep_partition_by_columns__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::ObjectStoreUrl => { @@ -5755,6 +5766,12 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { } overwrite__ = Some(map_.next_value()?); } + GeneratedField::KeepPartitionByColumns => { + if keep_partition_by_columns__.is_some() { + return Err(serde::de::Error::duplicate_field("keepPartitionByColumns")); + } + keep_partition_by_columns__ = Some(map_.next_value()?); + } } } Ok(FileSinkConfig { @@ -5764,6 +5781,7 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig { output_schema: output_schema__, table_partition_cols: table_partition_cols__.unwrap_or_default(), overwrite: overwrite__.unwrap_or_default(), + keep_partition_by_columns: keep_partition_by_columns__.unwrap_or_default(), }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 036b7cff9b03..7ccfce72f2e9 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1145,6 +1145,8 @@ pub struct FileSinkConfig { pub table_partition_cols: ::prost::alloc::vec::Vec, #[prost(bool, tag = "8")] pub overwrite: bool, + #[prost(bool, tag = "9")] + pub keep_partition_by_columns: bool, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index b636c77641c7..1a772f402c69 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -650,6 +650,7 @@ impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig { output_schema: Arc::new(convert_required!(conf.output_schema)?), table_partition_cols, overwrite: conf.overwrite, + keep_partition_by_columns: conf.keep_partition_by_columns, }) } } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index c02b59d06230..47f41e57096a 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -725,6 +725,7 @@ impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig { output_schema: Some(conf.output_schema.as_ref().try_into()?), table_partition_cols, overwrite: conf.overwrite, + keep_partition_by_columns: conf.keep_partition_by_columns, }) } } diff --git a/docs/source/user-guide/sql/dml.md b/docs/source/user-guide/sql/dml.md index acee247dee9f..dd016cabbfb7 100644 --- a/docs/source/user-guide/sql/dml.md +++ b/docs/source/user-guide/sql/dml.md @@ -41,7 +41,7 @@ clause is not specified, it will be inferred from the file extension if possible `PARTITIONED BY` specifies the columns to use for partitioning the output files into separate hive-style directories. By default, columns used in `PARTITIONED BY` will be removed from the output format. If you want to keep the columns, you should provide the option -`execution.keep_partition_by_columns true`. `execution.keep_partition_by_columns` flag can also +`execution.keep_partition_by_columns true`. `execution.keep_partition_by_columns` flag can also be enabled through `ExecutionOptions` within `SessionConfig`. The output format is determined by the first match of the following rules: diff --git a/docs/source/user-guide/sql/write_options.md b/docs/source/user-guide/sql/write_options.md index 8f40a47535b3..fcdf6f1e6aab 100644 --- a/docs/source/user-guide/sql/write_options.md +++ b/docs/source/user-guide/sql/write_options.md @@ -72,7 +72,7 @@ In this example, we write the entirety of `source_table` out to a folder of parq ### Execution Specific Options -The following options are available when writing hive-style partitioned data. +The following options are available when executing a `COPY` query. | Option | Description | Default Value | |-------------------------------------| ---------------------------------------------------------------------------------- | ------------- | From f4e2158952d705766d3b476a6f1c6be126cbad10 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 28 Jun 2024 10:43:33 -0400 Subject: [PATCH 10/12] prettier --- docs/source/user-guide/sql/write_options.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/user-guide/sql/write_options.md b/docs/source/user-guide/sql/write_options.md index fcdf6f1e6aab..6fb4ef215ff1 100644 --- a/docs/source/user-guide/sql/write_options.md +++ b/docs/source/user-guide/sql/write_options.md @@ -75,7 +75,7 @@ In this example, we write the entirety of `source_table` out to a folder of parq The following options are available when executing a `COPY` query. | Option | Description | Default Value | -|-------------------------------------| ---------------------------------------------------------------------------------- | ------------- | +| ----------------------------------- | ---------------------------------------------------------------------------------- | ------------- | | execution.keep_partition_by_columns | Flag to retain the columns in the output data when using `PARTITIONED BY` queries. | false | Note: `execution.keep_partition_by_columns` flag can also be enabled through `ExecutionOptions` within `SessionConfig`. From 5f02278b683066b37bf251add8a2ebbec529b653 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 28 Jun 2024 11:14:09 -0400 Subject: [PATCH 11/12] Update test --- datafusion/sqllogictest/test_files/information_schema.slt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 3cc837aa8ee9..ee64f772917c 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -173,6 +173,7 @@ datafusion.execution.batch_size 8192 datafusion.execution.coalesce_batches true datafusion.execution.collect_statistics false datafusion.execution.enable_recursive_ctes true +datafusion.execution.keep_partition_by_columns false datafusion.execution.listing_table_ignore_subdirectory true datafusion.execution.max_buffered_batches_per_output_file 2 datafusion.execution.meta_fetch_concurrency 32 @@ -255,6 +256,7 @@ datafusion.execution.batch_size 8192 Default batch size while creating new batch datafusion.execution.coalesce_batches true When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting datafusion.execution.collect_statistics false Should DataFusion collect statistics after listing files datafusion.execution.enable_recursive_ctes true Should DataFusion support recursive CTEs +datafusion.execution.keep_partition_by_columns false Should Datafusion keep the columns used for partition_by in the output RecordBatches datafusion.execution.listing_table_ignore_subdirectory true Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`). datafusion.execution.max_buffered_batches_per_output_file 2 This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption datafusion.execution.meta_fetch_concurrency 32 Number of files to read in parallel when inferring schema and statistics From fdbca78923bbdfb2ff47af98cfb791915cf6a9f5 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 28 Jun 2024 14:05:37 -0400 Subject: [PATCH 12/12] update doc --- docs/source/user-guide/configs.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index c5f22725e0a3..0f0aa8460448 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -86,6 +86,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.listing_table_ignore_subdirectory | true | Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`). | | datafusion.execution.enable_recursive_ctes | true | Should DataFusion support recursive CTEs | | datafusion.execution.split_file_groups_by_statistics | false | Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental | +| datafusion.execution.keep_partition_by_columns | false | Should Datafusion keep the columns used for partition_by in the output RecordBatches | | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible |