Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Conditionally allow to keep partition_by columns when using PARTITIONED BY enhancement #11107

Merged
merged 15 commits into from
Jun 28, 2024
Merged
8 changes: 8 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,9 @@ config_namespace! {
/// statistics into the same file groups.
/// Currently experimental
pub split_file_groups_by_statistics: bool, default = false

/// Should Datafusion keep the columns used for partition_by in the output RecordBatches
pub keep_partition_by_columns: bool, default = false
}
}

Expand Down Expand Up @@ -1282,6 +1285,11 @@ impl TableOptions {
return ConfigField::set(self, key, value);
}

// Only used for hive.keep_partition_by_columns
if prefix == "hive" {
return Ok(());
}

let Some(e) = self.extensions.0.get_mut(prefix) else {
return _config_err!("Could not find config namespace \"{prefix}\"");
};
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ impl DataSink for ArrowFileSink {
part_col,
self.config.table_paths[0].clone(),
"arrow".into(),
self.config.keep_partition_by_columns,
);

let mut file_write_tasks: JoinSet<std::result::Result<usize, DataFusionError>> =
Expand Down
7 changes: 6 additions & 1 deletion datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,9 @@ impl ParquetSink {
/// of hive style partitioning where some columns are removed from the
/// underlying files.
fn get_writer_schema(&self) -> Arc<Schema> {
if !self.config.table_partition_cols.is_empty() {
if !self.config.table_partition_cols.is_empty()
&& !self.config.keep_partition_by_columns
{
let schema = self.config.output_schema();
let partition_names: Vec<_> = self
.config
Expand Down Expand Up @@ -638,6 +640,7 @@ impl DataSink for ParquetSink {
part_col,
self.config.table_paths[0].clone(),
"parquet".into(),
self.config.keep_partition_by_columns,
);

let mut file_write_tasks: JoinSet<
Expand Down Expand Up @@ -1875,6 +1878,7 @@ mod tests {
output_schema: schema.clone(),
table_partition_cols: vec![],
overwrite: true,
keep_partition_by_columns: false,
};
let parquet_sink = Arc::new(ParquetSink::new(
file_sink_config,
Expand Down Expand Up @@ -1969,6 +1973,7 @@ mod tests {
output_schema: schema.clone(),
table_partition_cols: vec![("a".to_string(), DataType::Utf8)], // add partitioning
overwrite: true,
keep_partition_by_columns: false,
};
let parquet_sink = Arc::new(ParquetSink::new(
file_sink_config,
Expand Down
13 changes: 9 additions & 4 deletions datafusion/core/src/datasource/file_format/write/demux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub(crate) fn start_demuxer_task(
partition_by: Option<Vec<(String, DataType)>>,
base_output_path: ListingTableUrl,
file_extension: String,
keep_partition_by_columns: bool,
) -> (SpawnedTask<Result<()>>, DemuxedStreamReceiver) {
let (tx, rx) = mpsc::unbounded_channel();
let context = context.clone();
Expand All @@ -91,6 +92,7 @@ pub(crate) fn start_demuxer_task(
parts,
base_output_path,
file_extension,
keep_partition_by_columns,
)
.await
})
Expand All @@ -111,7 +113,7 @@ pub(crate) fn start_demuxer_task(
(task, rx)
}

/// Dynamically partitions input stream to acheive desired maximum rows per file
/// Dynamically partitions input stream to achieve desired maximum rows per file
async fn row_count_demuxer(
mut tx: UnboundedSender<(Path, Receiver<RecordBatch>)>,
mut input: SendableRecordBatchStream,
Expand Down Expand Up @@ -240,6 +242,7 @@ async fn hive_style_partitions_demuxer(
partition_by: Vec<(String, DataType)>,
base_output_path: ListingTableUrl,
file_extension: String,
keep_partition_by_columns: bool,
) -> Result<()> {
let write_id =
rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
Expand Down Expand Up @@ -298,9 +301,11 @@ async fn hive_style_partitions_demuxer(
}
};

// remove partitions columns
let final_batch_to_send =
remove_partition_by_columns(&parted_batch, &partition_by)?;
let final_batch_to_send = if keep_partition_by_columns {
parted_batch
} else {
remove_partition_by_columns(&parted_batch, &partition_by)?
};

// Finally send the partial batch partitioned by distinct value!
part_tx.send(final_batch_to_send).await.map_err(|_| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ pub(crate) async fn stateless_multipart_put(
part_cols,
base_output_path.clone(),
file_extension,
config.keep_partition_by_columns,
);

let rb_buffer_size = &context
Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,8 @@ impl TableProvider for ListingTable {
.await?;

let file_groups = file_list_stream.try_collect::<Vec<_>>().await?;
let keep_partition_by_columns =
state.config().options().execution.keep_partition_by_columns;

// Sink related option, apart from format
let config = FileSinkConfig {
Expand All @@ -926,6 +928,7 @@ impl TableProvider for ListingTable {
output_schema: self.schema(),
table_partition_cols: self.options.table_partition_cols.clone(),
overwrite,
keep_partition_by_columns,
};

let unsorted: Vec<Vec<Expr>> = vec![];
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ pub struct FileSinkConfig {
pub table_partition_cols: Vec<(String, DataType)>,
/// Controls whether existing data should be overwritten by this sink
pub overwrite: bool,
/// Controls whether partition columns are kept for the file
pub keep_partition_by_columns: bool,
}

impl FileSinkConfig {
Expand Down
11 changes: 11 additions & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,16 @@ impl DefaultPhysicalPlanner {
.map(|s| (s.to_string(), arrow_schema::DataType::Null))
.collect::<Vec<_>>();

let keep_partition_by_columns = source_option_tuples
.get("hive.keep_partition_by_columns")
.map(|v| v.trim() == "true")
.unwrap_or(false)
|| session_state
.config()
hveiga marked this conversation as resolved.
Show resolved Hide resolved
.options()
.execution
.keep_partition_by_columns;

hveiga marked this conversation as resolved.
Show resolved Hide resolved
// Set file sink related options
let config = FileSinkConfig {
object_store_url,
Expand All @@ -785,6 +795,7 @@ impl DefaultPhysicalPlanner {
output_schema: Arc::new(schema),
table_partition_cols,
overwrite: false,
keep_partition_by_columns,
};
let mut table_options = session_state.default_table_options();
let sink_format: Arc<dyn FileFormat> = match format_options {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,9 @@ impl LogicalPlanBuilder {
Ok(Self::from(LogicalPlan::Copy(CopyTo {
input: Arc::new(input),
output_url,
partition_by,
format_options,
options,
partition_by,
})))
}

Expand Down
6 changes: 5 additions & 1 deletion datafusion/sql/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1475,7 +1475,7 @@ mod tests {
fn copy_to_multi_options() -> Result<(), ParserError> {
// order of options is preserved
let sql =
"COPY foo TO bar STORED AS parquet OPTIONS ('format.row_group_size' 55, 'format.compression' snappy)";
"COPY foo TO bar STORED AS parquet OPTIONS ('format.row_group_size' 55, 'format.compression' snappy, 'keep_partition_by_columns' true)";

let expected_options = vec![
(
Expand All @@ -1486,6 +1486,10 @@ mod tests {
"format.compression".to_string(),
Value::SingleQuotedString("snappy".to_string()),
),
(
"keep_partition_by_columns".to_string(),
Value::SingleQuotedString("true".to_string()),
),
];

let mut statements = DFParser::parse_sql(sql).unwrap();
Expand Down
10 changes: 9 additions & 1 deletion datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
Some(v) => v,
};
if !(&key.contains('.')) {

if key.to_lowercase().contains("keep_partition_by_columns") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this special case is unfortunate, but I don't have a great idea of how to make it better

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After further consideration, IMO we should restrict this handling to format options. For other configurations, users must specify the prefix. Otherwise, this list will continue to grow longer, and using those prefixes would lose their meaning (which is why all these refactors were done to have a structured configuration).

Additionally, instead of using "hive," we need a more general term. Perhaps the "execution" prefix would be a better alternative.

Copy link
Contributor Author

@hveiga hveiga Jun 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate on what you mean by "we should restrict this handling to format options"? I am unsure if you are suggesting using a different struct for these non-format. options or continue using the HashMap with a mix of options and then extract out the logic to filter out the non-format. options and do not pass those down through table_options.alter_with_string_hash_map.

I agree on changing the prefix to execution. which also aligns with the ExecutionOptions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gave a try to this in e91397a . Let me know if that's what you had in mind. Thanks for feedback.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's exactly what I had in my mind. Thank you for the collaboration. I just would like to mention two more points:

  1. I guess you missed to add execution prefix to the example in datafusion/sql/src/parser.rs.
let keep_partition_by_columns = source_option_tuples
                    .get("execution.keep_partition_by_columns")
                    .map(|v| v.trim() == "true")
                    .unwrap_or(...

If the user provides anything other than "true," it is interpreted as "false." It might be wiser to give an error if the value is neither "true" nor "false."

Copy link
Contributor Author

@hveiga hveiga Jun 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I just fixed that test case and also slightly modified the handling of the value to account for what we discussed:

  • Give preference to what is explicitly provided.
  • If what is provided is invalid, through a config error. Added a test in copy.slt for this.
  • If not provided, fallback to ExecutionOptions value, false by default.

Hopefully e352203 is finally ready. Thanks.

Copy link
Contributor

@berkaysynnada berkaysynnada Jun 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is a good first step towards adding settings other than format. Thanks for your effort, @hveiga. I think the PR is ready to be merged once conflicts are resolved.

let renamed_key = if !&key.starts_with("hive.") {
format!("hive.{}", key)
} else {
key
};
options.insert(renamed_key.to_lowercase(), value_string.to_lowercase());
} else if !(&key.contains('.')) {
// If config does not belong to any namespace, assume it is
// a format option and apply the format prefix for backwards
// compatibility.
Expand Down
17 changes: 17 additions & 0 deletions datafusion/sqllogictest/test_files/copy.slt
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,23 @@ physical_plan
01)DataSinkExec: sink=ParquetSink(file_groups=[])
02)--MemoryExec: partitions=1, partition_sizes=[1]

# Copy to directory as partitioned files with keep_partition_by_columns enabled
hveiga marked this conversation as resolved.
Show resolved Hide resolved
query TT
COPY (values ('1', 'a'), ('2', 'b'), ('3', 'c')) TO 'test_files/scratch/copy/partitioned_table4/' STORED AS parquet PARTITIONED BY (column1)
OPTIONS (KEEP_PARTITION_BY_COLUMNS true);
----
3

# validate generated file contains tables
statement ok
CREATE EXTERNAL TABLE validate_partitioned_parquet4 STORED AS PARQUET
LOCATION 'test_files/scratch/copy/partitioned_table4/column1=1/*.parquet';

query TT
select column1, column2 from validate_partitioned_parquet4 order by column1,column2;
----
1 a

# Copy more files to directory via query
query IT
COPY (select * from source_table UNION ALL select * from source_table) to 'test_files/scratch/copy/table/' STORED AS PARQUET;
Expand Down
5 changes: 4 additions & 1 deletion docs/source/user-guide/sql/dml.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ TO '<i><b>file_name</i></b>'
clause is not specified, it will be inferred from the file extension if possible.

`PARTITIONED BY` specifies the columns to use for partitioning the output files into
alamb marked this conversation as resolved.
Show resolved Hide resolved
separate hive-style directories.
separate hive-style directories. By default, columns used in `PARTITIONED BY` will be removed
from the output format. If you want to keep the columns, you should provide the option
`KEEP_PARTITION_BY_COLUMNS true`. `KEEP_PARTITION_BY_COLUMNS` flag can also be enabled
through `ExecutionOptions` within `SessionConfig`.

The output format is determined by the first match of the following rules:

Expand Down
10 changes: 10 additions & 0 deletions docs/source/user-guide/sql/write_options.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ In this example, we write the entirety of `source_table` out to a folder of parq

## Available Options

### Hive Specific Options

The following options are available when writing hive-style partitioned data.

| Option | Description | Default Value |
| ------------------------- | ---------------------------------------------------------------------------------- | ------------- |
| KEEP_PARTITION_BY_COLUMNS | Flag to retain the columns in the output data when using `PARTITIONED BY` queries. | false |

Note: `KEEP_PARTITION_BY_COLUMNS` flag can also be enabled through `ExecutionOptions` within `SessionConfig`.

### JSON Format Specific Options

The following options are available when writing JSON files. Note: If any unsupported option is specified, an error will be raised and the query will fail.
Expand Down
Loading