diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index c3295042b5e3..4c625b7ed742 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -29,7 +29,7 @@ use arrow::{self, datatypes::SchemaRef}; use arrow_array::RecordBatch; use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType}; use datafusion_execution::TaskContext; -use datafusion_physical_expr::PhysicalExpr; +use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; use async_trait::async_trait; use bytes::{Buf, Bytes}; @@ -263,6 +263,7 @@ impl FileFormat for CsvFormat { input: Arc, _state: &SessionState, conf: FileSinkConfig, + order_requirements: Option>, ) -> Result> { if conf.overwrite { return not_impl_err!("Overwrites are not implemented yet for CSV"); @@ -275,7 +276,12 @@ impl FileFormat for CsvFormat { let sink_schema = conf.output_schema().clone(); let sink = Arc::new(CsvSink::new(conf)); - Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _) + Ok(Arc::new(FileSinkExec::new( + input, + sink, + sink_schema, + order_requirements, + )) as _) } fn file_type(&self) -> FileType { diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 96fd4daa2da6..6c260b9802ec 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -24,6 +24,7 @@ use datafusion_common::not_impl_err; use datafusion_common::DataFusionError; use datafusion_common::FileType; use datafusion_execution::TaskContext; +use datafusion_physical_expr::PhysicalSortRequirement; use rand::distributions::Alphanumeric; use rand::distributions::DistString; use std::fmt; @@ -173,6 +174,7 @@ impl FileFormat for JsonFormat { input: Arc, _state: &SessionState, conf: FileSinkConfig, + order_requirements: Option>, ) -> Result> { if conf.overwrite { return not_impl_err!("Overwrites are not implemented yet for Json"); @@ -184,7 +186,12 @@ impl FileFormat for JsonFormat { let sink_schema = conf.output_schema().clone(); let sink = Arc::new(JsonSink::new(conf, self.file_compression_type)); - Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _) + Ok(Arc::new(FileSinkExec::new( + input, + sink, + sink_schema, + order_requirements, + )) as _) } fn file_type(&self) -> FileType { diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 86f265ab9492..293f062d86a9 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -41,7 +41,7 @@ use crate::execution::context::SessionState; use crate::physical_plan::{ExecutionPlan, Statistics}; use datafusion_common::{not_impl_err, DataFusionError, FileType}; -use datafusion_physical_expr::PhysicalExpr; +use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; use async_trait::async_trait; use object_store::{ObjectMeta, ObjectStore}; @@ -99,6 +99,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug { _input: Arc, _state: &SessionState, _conf: FileSinkConfig, + _order_requirements: Option>, ) -> Result> { not_impl_err!("Writer not implemented for this format") } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 16050d66db5d..2bb6d3194936 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -36,7 +36,7 @@ use async_trait::async_trait; use bytes::{BufMut, BytesMut}; use datafusion_common::{exec_err, not_impl_err, plan_err, DataFusionError, FileType}; use datafusion_execution::TaskContext; -use datafusion_physical_expr::PhysicalExpr; +use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; use futures::{StreamExt, TryStreamExt}; use hashbrown::HashMap; use object_store::{ObjectMeta, ObjectStore}; @@ -229,6 +229,7 @@ impl FileFormat for ParquetFormat { input: Arc, _state: &SessionState, conf: FileSinkConfig, + order_requirements: Option>, ) -> Result> { if conf.overwrite { return not_impl_err!("Overwrites are not implemented yet for Parquet"); @@ -237,7 +238,12 @@ impl FileFormat for ParquetFormat { let sink_schema = conf.output_schema().clone(); let sink = Arc::new(ParquetSink::new(conf)); - Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _) + Ok(Arc::new(FileSinkExec::new( + input, + sink, + sink_schema, + order_requirements, + )) as _) } fn file_type(&self) -> FileType { diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 5b1710d344ee..d47f456a554b 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -57,7 +57,9 @@ use datafusion_execution::cache::cache_manager::FileStatisticsCache; use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache; use datafusion_expr::expr::Sort; use datafusion_optimizer::utils::conjunction; -use datafusion_physical_expr::{create_physical_expr, LexOrdering, PhysicalSortExpr}; +use datafusion_physical_expr::{ + create_physical_expr, LexOrdering, PhysicalSortExpr, PhysicalSortRequirement, +}; use async_trait::async_trait; use futures::{future, stream, StreamExt, TryStreamExt}; @@ -826,19 +828,6 @@ impl TableProvider for ListingTable { ); } - // TODO support inserts to sorted tables which preserve sort_order - // Inserts currently make no effort to preserve sort_order. This could lead to - // incorrect query results on the table after inserting incorrectly sorted data. - let unsorted: Vec> = vec![]; - if self.options.file_sort_order != unsorted { - return Err( - DataFusionError::NotImplemented( - "Writing to a sorted listing table via insert into is not supported yet. \ - To write to this table in the meantime, register an equivalent table with \ - file_sort_order = vec![]".into()) - ); - } - let table_path = &self.table_paths()[0]; // Get the object store for the table path. let store = state.runtime_env().object_store(table_path)?; @@ -908,9 +897,38 @@ impl TableProvider for ListingTable { file_type_writer_options, }; + let unsorted: Vec> = vec![]; + let order_requirements = if self.options().file_sort_order != unsorted { + if matches!( + self.options().insert_mode, + ListingTableInsertMode::AppendToFile + ) { + return Err(DataFusionError::Plan( + "Cannot insert into a sorted ListingTable with mode append!".into(), + )); + } + // Multiple sort orders in outer vec are equivalent, so we pass only the first one + let ordering = self + .try_create_output_ordering()? + .get(0) + .ok_or(DataFusionError::Internal( + "Expected ListingTable to have a sort order, but none found!".into(), + ))? + .clone(); + // Converts Vec> into type required by execution plan to specify its required input ordering + Some( + ordering + .into_iter() + .map(PhysicalSortRequirement::from) + .collect::>(), + ) + } else { + None + }; + self.options() .format - .create_writer_physical_plan(input, state, config) + .create_writer_physical_plan(input, state, config, order_requirements) .await } } diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index 337a8cabc269..6231bd2c2f92 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -223,6 +223,7 @@ impl TableProvider for MemTable { input, sink, self.schema.clone(), + None, ))) } } diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 84b5b9afa7e7..35119f374fa3 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -604,7 +604,7 @@ impl DefaultPhysicalPlanner { FileType::ARROW => Arc::new(ArrowFormat {}), }; - sink_format.create_writer_physical_plan(input_exec, session_state, config).await + sink_format.create_writer_physical_plan(input_exec, session_state, config, None).await } LogicalPlan::Dml(DmlStatement { table_name, diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs index 8b467461ddad..a7b0d32c8eb8 100644 --- a/datafusion/physical-plan/src/insert.rs +++ b/datafusion/physical-plan/src/insert.rs @@ -73,6 +73,8 @@ pub struct FileSinkExec { sink_schema: SchemaRef, /// Schema describing the structure of the output data. count_schema: SchemaRef, + /// Optional required sort order for output data. + sort_order: Option>, } impl fmt::Debug for FileSinkExec { @@ -87,12 +89,14 @@ impl FileSinkExec { input: Arc, sink: Arc, sink_schema: SchemaRef, + sort_order: Option>, ) -> Self { Self { input, sink, sink_schema, count_schema: make_count_schema(), + sort_order, } } @@ -192,16 +196,20 @@ impl ExecutionPlan for FileSinkExec { } fn required_input_ordering(&self) -> Vec>> { - // Require that the InsertExec gets the data in the order the + // The input order is either exlicitly set (such as by a ListingTable), + // or require that the [FileSinkExec] gets the data in the order the // input produced it (otherwise the optimizer may chose to reorder // the input which could result in unintended / poor UX) // // More rationale: // https://github.com/apache/arrow-datafusion/pull/6354#discussion_r1195284178 - vec![self - .input - .output_ordering() - .map(PhysicalSortRequirement::from_sort_exprs)] + match &self.sort_order { + Some(requirements) => vec![Some(requirements.clone())], + None => vec![self + .input + .output_ordering() + .map(PhysicalSortRequirement::from_sort_exprs)], + } } fn maintains_input_order(&self) -> Vec { @@ -221,6 +229,7 @@ impl ExecutionPlan for FileSinkExec { sink: self.sink.clone(), sink_schema: self.sink_schema.clone(), count_schema: self.count_schema.clone(), + sort_order: self.sort_order.clone(), })) } diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt b/datafusion/sqllogictest/test_files/insert_to_external.slt index a29c230a466e..d1b73204e379 100644 --- a/datafusion/sqllogictest/test_files/insert_to_external.slt +++ b/datafusion/sqllogictest/test_files/insert_to_external.slt @@ -45,6 +45,48 @@ LOCATION '../../testing/data/csv/aggregate_test_100.csv' statement ok set datafusion.execution.target_partitions = 8; +statement ok +CREATE EXTERNAL TABLE +ordered_insert_test(a bigint, b bigint) +STORED AS csv +LOCATION 'test_files/scratch/insert_to_external/insert_to_ordered/' +WITH ORDER (a ASC, B DESC) +OPTIONS( +create_local_path 'true', +insert_mode 'append_new_files', +); + +query TT +EXPLAIN INSERT INTO ordered_insert_test values (5, 1), (4, 2), (7,7), (7,8), (7,9), (7,10), (3, 3), (2, 4), (1, 5); +---- +logical_plan +Dml: op=[Insert Into] table=[ordered_insert_test] +--Projection: column1 AS a, column2 AS b +----Values: (Int64(5), Int64(1)), (Int64(4), Int64(2)), (Int64(7), Int64(7)), (Int64(7), Int64(8)), (Int64(7), Int64(9))... +physical_plan +InsertExec: sink=CsvSink(writer_mode=PutMultipart, file_groups=[]) +--SortExec: expr=[a@0 ASC NULLS LAST,b@1 DESC] +----ProjectionExec: expr=[column1@0 as a, column2@1 as b] +------ValuesExec + +query II +INSERT INTO ordered_insert_test values (5, 1), (4, 2), (7,7), (7,8), (7,9), (7,10), (3, 3), (2, 4), (1, 5); +---- +9 + +query II +SELECT * from ordered_insert_test; +---- +1 5 +2 4 +3 3 +4 2 +5 1 +7 10 +7 9 +7 8 +7 7 + statement ok CREATE EXTERNAL TABLE single_file_test(a bigint, b bigint)