Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support InsertInto Sorted ListingTable #7743

Merged
merged 4 commits into from
Oct 8, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
address feedback
devinjdangelo committed Oct 7, 2023
commit 5a65f91d37752b172607cfb25eddb80f473b8624
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
@@ -263,7 +263,7 @@ impl FileFormat for CsvFormat {
input: Arc<dyn ExecutionPlan>,
_state: &SessionState,
conf: FileSinkConfig,
order_requirements: Option<Vec<Option<Vec<PhysicalSortRequirement>>>>,
order_requirements: Option<Vec<PhysicalSortRequirement>>,
) -> Result<Arc<dyn ExecutionPlan>> {
if conf.overwrite {
return not_impl_err!("Overwrites are not implemented yet for CSV");
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
@@ -174,7 +174,7 @@ impl FileFormat for JsonFormat {
input: Arc<dyn ExecutionPlan>,
_state: &SessionState,
conf: FileSinkConfig,
order_requirements: Option<Vec<Option<Vec<PhysicalSortRequirement>>>>,
order_requirements: Option<Vec<PhysicalSortRequirement>>,
) -> Result<Arc<dyn ExecutionPlan>> {
if conf.overwrite {
return not_impl_err!("Overwrites are not implemented yet for Json");
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
@@ -99,7 +99,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
_input: Arc<dyn ExecutionPlan>,
_state: &SessionState,
_conf: FileSinkConfig,
_order_requirements: Option<Vec<Option<Vec<PhysicalSortRequirement>>>>,
_order_requirements: Option<Vec<PhysicalSortRequirement>>,
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("Writer not implemented for this format")
}
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
@@ -229,7 +229,7 @@ impl FileFormat for ParquetFormat {
input: Arc<dyn ExecutionPlan>,
_state: &SessionState,
conf: FileSinkConfig,
order_requirements: Option<Vec<Option<Vec<PhysicalSortRequirement>>>>,
order_requirements: Option<Vec<PhysicalSortRequirement>>,
) -> Result<Arc<dyn ExecutionPlan>> {
if conf.overwrite {
return not_impl_err!("Overwrites are not implemented yet for Parquet");
18 changes: 10 additions & 8 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
@@ -907,17 +907,19 @@ impl TableProvider for ListingTable {
"Cannot insert into a sorted ListingTable with mode append!".into(),
));
}
// Multiple sort orders in outer vec are equivalent, so we pass only the first one
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

let ordering = self
.try_create_output_ordering()?
.get(0)
.ok_or(DataFusionError::Internal(
"Expected ListingTable to have a sort order, but none found!".into(),
))?
.clone();
// Converts Vec<Vec<SortExpr>> into type required by execution plan to specify its required input ordering
Some(
self.try_create_output_ordering()?
ordering
.into_iter()
.map(|v| {
Some(
v.into_iter()
.map(PhysicalSortRequirement::from)
.collect::<Vec<_>>(),
)
})
.map(PhysicalSortRequirement::from)
.collect::<Vec<_>>(),
)
} else {
6 changes: 3 additions & 3 deletions datafusion/physical-plan/src/insert.rs
Original file line number Diff line number Diff line change
@@ -74,7 +74,7 @@ pub struct FileSinkExec {
/// Schema describing the structure of the output data.
count_schema: SchemaRef,
/// Optional required sort order for output data.
sort_order: Option<Vec<Option<Vec<PhysicalSortRequirement>>>>,
sort_order: Option<Vec<PhysicalSortRequirement>>,
}

impl fmt::Debug for FileSinkExec {
@@ -89,7 +89,7 @@ impl FileSinkExec {
input: Arc<dyn ExecutionPlan>,
sink: Arc<dyn DataSink>,
sink_schema: SchemaRef,
sort_order: Option<Vec<Option<Vec<PhysicalSortRequirement>>>>,
sort_order: Option<Vec<PhysicalSortRequirement>>,
) -> Self {
Self {
input,
@@ -204,7 +204,7 @@ impl ExecutionPlan for FileSinkExec {
// More rationale:
// https://github.com/apache/arrow-datafusion/pull/6354#discussion_r1195284178
match &self.sort_order {
Some(requirements) => requirements.clone(),
Some(requirements) => vec![Some(requirements.clone())],
None => vec![self
.input
.output_ordering()
13 changes: 13 additions & 0 deletions datafusion/sqllogictest/test_files/insert_to_external.slt
Original file line number Diff line number Diff line change
@@ -56,6 +56,19 @@ create_local_path 'true',
insert_mode 'append_new_files',
);

query TT
EXPLAIN INSERT INTO ordered_insert_test values (5, 1), (4, 2), (7,7), (7,8), (7,9), (7,10), (3, 3), (2, 4), (1, 5);
----
logical_plan
Dml: op=[Insert Into] table=[ordered_insert_test]
--Projection: column1 AS a, column2 AS b
----Values: (Int64(5), Int64(1)), (Int64(4), Int64(2)), (Int64(7), Int64(7)), (Int64(7), Int64(8)), (Int64(7), Int64(9))...
physical_plan
InsertExec: sink=CsvSink(writer_mode=PutMultipart, file_groups=[])
--SortExec: expr=[a@0 ASC NULLS LAST,b@1 DESC]
----ProjectionExec: expr=[column1@0 as a, column2@1 as b]
------ValuesExec

query II
INSERT INTO ordered_insert_test values (5, 1), (4, 2), (7,7), (7,8), (7,9), (7,10), (3, 3), (2, 4), (1, 5);
----