From 1de5dea6dc9369d1151780e6d82ff8b21496bab2 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Wed, 4 Oct 2023 21:46:47 -0400 Subject: [PATCH 1/4] enable insert into sorted listing table --- .../core/src/datasource/file_format/csv.rs | 10 ++++- .../core/src/datasource/file_format/json.rs | 9 ++++- .../core/src/datasource/file_format/mod.rs | 3 +- .../src/datasource/file_format/parquet.rs | 10 ++++- .../core/src/datasource/listing/table.rs | 37 +++++++++++-------- datafusion/core/src/datasource/memory.rs | 1 + datafusion/core/src/physical_planner.rs | 2 +- datafusion/physical-plan/src/insert.rs | 19 +++++++--- .../test_files/insert_to_external.slt | 26 +++++++++++++ 9 files changed, 90 insertions(+), 27 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index c3295042b5e3..e160ae11c83a 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -29,7 +29,7 @@ use arrow::{self, datatypes::SchemaRef}; use arrow_array::RecordBatch; use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType}; use datafusion_execution::TaskContext; -use datafusion_physical_expr::PhysicalExpr; +use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; use async_trait::async_trait; use bytes::{Buf, Bytes}; @@ -263,6 +263,7 @@ impl FileFormat for CsvFormat { input: Arc, _state: &SessionState, conf: FileSinkConfig, + order_requirements: Option>>>, ) -> Result> { if conf.overwrite { return not_impl_err!("Overwrites are not implemented yet for CSV"); @@ -275,7 +276,12 @@ impl FileFormat for CsvFormat { let sink_schema = conf.output_schema().clone(); let sink = Arc::new(CsvSink::new(conf)); - Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _) + Ok(Arc::new(FileSinkExec::new( + input, + sink, + sink_schema, + order_requirements, + )) as _) } fn file_type(&self) -> FileType { diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 96fd4daa2da6..02bc702d6a93 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -24,6 +24,7 @@ use datafusion_common::not_impl_err; use datafusion_common::DataFusionError; use datafusion_common::FileType; use datafusion_execution::TaskContext; +use datafusion_physical_expr::PhysicalSortRequirement; use rand::distributions::Alphanumeric; use rand::distributions::DistString; use std::fmt; @@ -173,6 +174,7 @@ impl FileFormat for JsonFormat { input: Arc, _state: &SessionState, conf: FileSinkConfig, + order_requirements: Option>>>, ) -> Result> { if conf.overwrite { return not_impl_err!("Overwrites are not implemented yet for Json"); @@ -184,7 +186,12 @@ impl FileFormat for JsonFormat { let sink_schema = conf.output_schema().clone(); let sink = Arc::new(JsonSink::new(conf, self.file_compression_type)); - Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _) + Ok(Arc::new(FileSinkExec::new( + input, + sink, + sink_schema, + order_requirements, + )) as _) } fn file_type(&self) -> FileType { diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 86f265ab9492..3244c8078d76 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -41,7 +41,7 @@ use crate::execution::context::SessionState; use crate::physical_plan::{ExecutionPlan, Statistics}; use datafusion_common::{not_impl_err, DataFusionError, FileType}; -use datafusion_physical_expr::PhysicalExpr; +use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; use async_trait::async_trait; use object_store::{ObjectMeta, ObjectStore}; @@ -99,6 +99,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug { _input: Arc, _state: &SessionState, _conf: FileSinkConfig, + _order_requirements: Option>>>, ) -> Result> { not_impl_err!("Writer not implemented for this format") } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 16050d66db5d..1368de38a7a3 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -36,7 +36,7 @@ use async_trait::async_trait; use bytes::{BufMut, BytesMut}; use datafusion_common::{exec_err, not_impl_err, plan_err, DataFusionError, FileType}; use datafusion_execution::TaskContext; -use datafusion_physical_expr::PhysicalExpr; +use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; use futures::{StreamExt, TryStreamExt}; use hashbrown::HashMap; use object_store::{ObjectMeta, ObjectStore}; @@ -229,6 +229,7 @@ impl FileFormat for ParquetFormat { input: Arc, _state: &SessionState, conf: FileSinkConfig, + order_requirements: Option>>>, ) -> Result> { if conf.overwrite { return not_impl_err!("Overwrites are not implemented yet for Parquet"); @@ -237,7 +238,12 @@ impl FileFormat for ParquetFormat { let sink_schema = conf.output_schema().clone(); let sink = Arc::new(ParquetSink::new(conf)); - Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _) + Ok(Arc::new(FileSinkExec::new( + input, + sink, + sink_schema, + order_requirements, + )) as _) } fn file_type(&self) -> FileType { diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 5b1710d344ee..b7cc02f754e1 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -57,7 +57,9 @@ use datafusion_execution::cache::cache_manager::FileStatisticsCache; use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache; use datafusion_expr::expr::Sort; use datafusion_optimizer::utils::conjunction; -use datafusion_physical_expr::{create_physical_expr, LexOrdering, PhysicalSortExpr}; +use datafusion_physical_expr::{ + create_physical_expr, LexOrdering, PhysicalSortExpr, PhysicalSortRequirement, +}; use async_trait::async_trait; use futures::{future, stream, StreamExt, TryStreamExt}; @@ -826,19 +828,6 @@ impl TableProvider for ListingTable { ); } - // TODO support inserts to sorted tables which preserve sort_order - // Inserts currently make no effort to preserve sort_order. This could lead to - // incorrect query results on the table after inserting incorrectly sorted data. - let unsorted: Vec> = vec![]; - if self.options.file_sort_order != unsorted { - return Err( - DataFusionError::NotImplemented( - "Writing to a sorted listing table via insert into is not supported yet. \ - To write to this table in the meantime, register an equivalent table with \ - file_sort_order = vec![]".into()) - ); - } - let table_path = &self.table_paths()[0]; // Get the object store for the table path. let store = state.runtime_env().object_store(table_path)?; @@ -908,9 +897,27 @@ impl TableProvider for ListingTable { file_type_writer_options, }; + let unsorted: Vec> = vec![]; + let order_requirements = if self.options().file_sort_order != unsorted { + Some( + self.try_create_output_ordering()? + .into_iter() + .map(|v| { + Some( + v.into_iter() + .map(PhysicalSortRequirement::from) + .collect::>(), + ) + }) + .collect::>(), + ) + } else { + None + }; + self.options() .format - .create_writer_physical_plan(input, state, config) + .create_writer_physical_plan(input, state, config, order_requirements) .await } } diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index 337a8cabc269..6231bd2c2f92 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -223,6 +223,7 @@ impl TableProvider for MemTable { input, sink, self.schema.clone(), + None, ))) } } diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 84b5b9afa7e7..35119f374fa3 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -604,7 +604,7 @@ impl DefaultPhysicalPlanner { FileType::ARROW => Arc::new(ArrowFormat {}), }; - sink_format.create_writer_physical_plan(input_exec, session_state, config).await + sink_format.create_writer_physical_plan(input_exec, session_state, config, None).await } LogicalPlan::Dml(DmlStatement { table_name, diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs index 8b467461ddad..0d7b2856655c 100644 --- a/datafusion/physical-plan/src/insert.rs +++ b/datafusion/physical-plan/src/insert.rs @@ -73,6 +73,8 @@ pub struct FileSinkExec { sink_schema: SchemaRef, /// Schema describing the structure of the output data. count_schema: SchemaRef, + /// Optional required sort order for output data. + sort_order: Option>>>, } impl fmt::Debug for FileSinkExec { @@ -87,12 +89,14 @@ impl FileSinkExec { input: Arc, sink: Arc, sink_schema: SchemaRef, + sort_order: Option>>>, ) -> Self { Self { input, sink, sink_schema, count_schema: make_count_schema(), + sort_order, } } @@ -192,16 +196,20 @@ impl ExecutionPlan for FileSinkExec { } fn required_input_ordering(&self) -> Vec>> { - // Require that the InsertExec gets the data in the order the + // The input order is either exlicitly set (such as by a ListingTable), + // or require that the [FileSinkExec] gets the data in the order the // input produced it (otherwise the optimizer may chose to reorder // the input which could result in unintended / poor UX) // // More rationale: // https://github.com/apache/arrow-datafusion/pull/6354#discussion_r1195284178 - vec![self - .input - .output_ordering() - .map(PhysicalSortRequirement::from_sort_exprs)] + match &self.sort_order { + Some(requirements) => requirements.clone(), + None => vec![self + .input + .output_ordering() + .map(PhysicalSortRequirement::from_sort_exprs)], + } } fn maintains_input_order(&self) -> Vec { @@ -221,6 +229,7 @@ impl ExecutionPlan for FileSinkExec { sink: self.sink.clone(), sink_schema: self.sink_schema.clone(), count_schema: self.count_schema.clone(), + sort_order: self.sort_order.clone(), })) } diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt b/datafusion/sqllogictest/test_files/insert_to_external.slt index a29c230a466e..6acf1c2c13ec 100644 --- a/datafusion/sqllogictest/test_files/insert_to_external.slt +++ b/datafusion/sqllogictest/test_files/insert_to_external.slt @@ -45,6 +45,32 @@ LOCATION '../../testing/data/csv/aggregate_test_100.csv' statement ok set datafusion.execution.target_partitions = 8; +statement ok +CREATE EXTERNAL TABLE +ordered_insert_test(a bigint, b bigint) +STORED AS csv +LOCATION 'test_files/scratch/insert_to_external/insert_to_ordered/' +WITH ORDER (a ASC, B DESC) +OPTIONS( +create_local_path 'true', +insert_mode 'append_new_files', +); + +query II +INSERT INTO ordered_insert_test values (5, 1), (4, 2), (7,7), (3, 3), (2, 4), (1, 5); +---- +6 + +query II +SELECT * from ordered_insert_test; +---- +1 5 +2 4 +3 3 +4 2 +5 1 +7 7 + statement ok CREATE EXTERNAL TABLE single_file_test(a bigint, b bigint) From f7b69fce856d40665fa6af51a1d4a6207410feaa Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Thu, 5 Oct 2023 06:48:27 -0400 Subject: [PATCH 2/4] add check for single file table --- datafusion/core/src/datasource/listing/table.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index b7cc02f754e1..a603a7349722 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -899,6 +899,15 @@ impl TableProvider for ListingTable { let unsorted: Vec> = vec![]; let order_requirements = if self.options().file_sort_order != unsorted { + if matches!( + self.options().insert_mode, + ListingTableInsertMode::AppendToFile + ) { + return Err(DataFusionError::Plan( + "Cannot insert into a sorted ListingTable with mode append!".into(), + )); + } + // Converts Vec> into type required by execution plan to specify its required input ordering Some( self.try_create_output_ordering()? .into_iter() From 0d53d196edd8996e2e8083afe1379ef5224c5bbc Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Thu, 5 Oct 2023 06:54:49 -0400 Subject: [PATCH 3/4] improve test to verify both order conditions --- datafusion/sqllogictest/test_files/insert_to_external.slt | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt b/datafusion/sqllogictest/test_files/insert_to_external.slt index 6acf1c2c13ec..8f185f37eb0b 100644 --- a/datafusion/sqllogictest/test_files/insert_to_external.slt +++ b/datafusion/sqllogictest/test_files/insert_to_external.slt @@ -57,9 +57,9 @@ insert_mode 'append_new_files', ); query II -INSERT INTO ordered_insert_test values (5, 1), (4, 2), (7,7), (3, 3), (2, 4), (1, 5); +INSERT INTO ordered_insert_test values (5, 1), (4, 2), (7,7), (7,8), (7,9), (7,10), (3, 3), (2, 4), (1, 5); ---- -6 +9 query II SELECT * from ordered_insert_test; @@ -69,6 +69,9 @@ SELECT * from ordered_insert_test; 3 3 4 2 5 1 +7 10 +7 9 +7 8 7 7 statement ok From 5a65f91d37752b172607cfb25eddb80f473b8624 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Sat, 7 Oct 2023 08:42:20 -0400 Subject: [PATCH 4/4] address feedback --- .../core/src/datasource/file_format/csv.rs | 2 +- .../core/src/datasource/file_format/json.rs | 2 +- .../core/src/datasource/file_format/mod.rs | 2 +- .../core/src/datasource/file_format/parquet.rs | 2 +- .../core/src/datasource/listing/table.rs | 18 ++++++++++-------- datafusion/physical-plan/src/insert.rs | 6 +++--- .../test_files/insert_to_external.slt | 13 +++++++++++++ 7 files changed, 30 insertions(+), 15 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index e160ae11c83a..4c625b7ed742 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -263,7 +263,7 @@ impl FileFormat for CsvFormat { input: Arc, _state: &SessionState, conf: FileSinkConfig, - order_requirements: Option>>>, + order_requirements: Option>, ) -> Result> { if conf.overwrite { return not_impl_err!("Overwrites are not implemented yet for CSV"); diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 02bc702d6a93..6c260b9802ec 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -174,7 +174,7 @@ impl FileFormat for JsonFormat { input: Arc, _state: &SessionState, conf: FileSinkConfig, - order_requirements: Option>>>, + order_requirements: Option>, ) -> Result> { if conf.overwrite { return not_impl_err!("Overwrites are not implemented yet for Json"); diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 3244c8078d76..293f062d86a9 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -99,7 +99,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug { _input: Arc, _state: &SessionState, _conf: FileSinkConfig, - _order_requirements: Option>>>, + _order_requirements: Option>, ) -> Result> { not_impl_err!("Writer not implemented for this format") } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 1368de38a7a3..2bb6d3194936 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -229,7 +229,7 @@ impl FileFormat for ParquetFormat { input: Arc, _state: &SessionState, conf: FileSinkConfig, - order_requirements: Option>>>, + order_requirements: Option>, ) -> Result> { if conf.overwrite { return not_impl_err!("Overwrites are not implemented yet for Parquet"); diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index a603a7349722..d47f456a554b 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -907,17 +907,19 @@ impl TableProvider for ListingTable { "Cannot insert into a sorted ListingTable with mode append!".into(), )); } + // Multiple sort orders in outer vec are equivalent, so we pass only the first one + let ordering = self + .try_create_output_ordering()? + .get(0) + .ok_or(DataFusionError::Internal( + "Expected ListingTable to have a sort order, but none found!".into(), + ))? + .clone(); // Converts Vec> into type required by execution plan to specify its required input ordering Some( - self.try_create_output_ordering()? + ordering .into_iter() - .map(|v| { - Some( - v.into_iter() - .map(PhysicalSortRequirement::from) - .collect::>(), - ) - }) + .map(PhysicalSortRequirement::from) .collect::>(), ) } else { diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs index 0d7b2856655c..a7b0d32c8eb8 100644 --- a/datafusion/physical-plan/src/insert.rs +++ b/datafusion/physical-plan/src/insert.rs @@ -74,7 +74,7 @@ pub struct FileSinkExec { /// Schema describing the structure of the output data. count_schema: SchemaRef, /// Optional required sort order for output data. - sort_order: Option>>>, + sort_order: Option>, } impl fmt::Debug for FileSinkExec { @@ -89,7 +89,7 @@ impl FileSinkExec { input: Arc, sink: Arc, sink_schema: SchemaRef, - sort_order: Option>>>, + sort_order: Option>, ) -> Self { Self { input, @@ -204,7 +204,7 @@ impl ExecutionPlan for FileSinkExec { // More rationale: // https://github.com/apache/arrow-datafusion/pull/6354#discussion_r1195284178 match &self.sort_order { - Some(requirements) => requirements.clone(), + Some(requirements) => vec![Some(requirements.clone())], None => vec![self .input .output_ordering() diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt b/datafusion/sqllogictest/test_files/insert_to_external.slt index 8f185f37eb0b..d1b73204e379 100644 --- a/datafusion/sqllogictest/test_files/insert_to_external.slt +++ b/datafusion/sqllogictest/test_files/insert_to_external.slt @@ -56,6 +56,19 @@ create_local_path 'true', insert_mode 'append_new_files', ); +query TT +EXPLAIN INSERT INTO ordered_insert_test values (5, 1), (4, 2), (7,7), (7,8), (7,9), (7,10), (3, 3), (2, 4), (1, 5); +---- +logical_plan +Dml: op=[Insert Into] table=[ordered_insert_test] +--Projection: column1 AS a, column2 AS b +----Values: (Int64(5), Int64(1)), (Int64(4), Int64(2)), (Int64(7), Int64(7)), (Int64(7), Int64(8)), (Int64(7), Int64(9))... +physical_plan +InsertExec: sink=CsvSink(writer_mode=PutMultipart, file_groups=[]) +--SortExec: expr=[a@0 ASC NULLS LAST,b@1 DESC] +----ProjectionExec: expr=[column1@0 as a, column2@1 as b] +------ValuesExec + query II INSERT INTO ordered_insert_test values (5, 1), (4, 2), (7,7), (7,8), (7,9), (7,10), (3, 3), (2, 4), (1, 5); ----