Skip to content

Commit

Permalink
[task #8203] Port tests in joins.rs to sqllogictest (#8996)
Browse files Browse the repository at this point in the history
Signed-off-by: tangruilin <tang.ruilin@foxmail.com>
  • Loading branch information
Tangruilin authored Jan 25, 2024
1 parent eb6d63f commit b97daf7
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 132 deletions.
59 changes: 0 additions & 59 deletions datafusion/core/tests/sql/joins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,65 +20,6 @@ use datafusion::test_util::register_unbounded_file_with_ordering;

use super::*;

#[tokio::test]
#[ignore]
/// TODO: need to repair. Wrong Test: ambiguous column name: a
async fn nestedjoin_with_alias() -> Result<()> {
// repro case for https://github.com/apache/arrow-datafusion/issues/2867
let sql = "select * from ((select 1 as a, 2 as b) c INNER JOIN (select 1 as a, 3 as d) e on c.a = e.a) f;";
let expected = [
"+---+---+---+---+",
"| a | b | a | d |",
"+---+---+---+---+",
"| 1 | 2 | 1 | 3 |",
"+---+---+---+---+",
];
let ctx = SessionContext::new();
let actual = execute_to_batches(&ctx, sql).await;
assert_batches_eq!(expected, &actual);

Ok(())
}

#[tokio::test]
async fn join_partitioned() -> Result<()> {
// self join on partition id (workaround for duplicate column name)
let results = execute_with_partition(
"SELECT 1 FROM test JOIN (SELECT c1 AS id1 FROM test) AS a ON c1=id1",
4,
)
.await?;

assert_eq!(
results.iter().map(|b| b.num_rows()).sum::<usize>(),
4 * 10 * 10
);

Ok(())
}

#[tokio::test]
#[ignore = "Test ignored, will be enabled after fixing the NAAJ bug"]
// https://github.com/apache/arrow-datafusion/issues/4211
async fn null_aware_left_anti_join() -> Result<()> {
let test_repartition_joins = vec![true, false];
for repartition_joins in test_repartition_joins {
let ctx = create_left_semi_anti_join_context_with_null_ids(
"t1_id",
"t2_id",
repartition_joins,
)
.unwrap();

let sql = "SELECT t1_id, t1_name FROM t1 WHERE t1_id NOT IN (SELECT t2_id FROM t2) ORDER BY t1_id";
let actual = execute_to_batches(&ctx, sql).await;
let expected = ["++", "++"];
assert_batches_eq!(expected, &actual);
}

Ok(())
}

#[tokio::test]
async fn join_change_in_planner() -> Result<()> {
let config = SessionConfig::new().with_target_partitions(8);
Expand Down
73 changes: 0 additions & 73 deletions datafusion/core/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,79 +76,6 @@ pub mod repartition;
pub mod select;
mod sql_api;

fn create_left_semi_anti_join_context_with_null_ids(
column_left: &str,
column_right: &str,
repartition_joins: bool,
) -> Result<SessionContext> {
let ctx = SessionContext::new_with_config(
SessionConfig::new()
.with_repartition_joins(repartition_joins)
.with_target_partitions(2)
.with_batch_size(4096),
);

let t1_schema = Arc::new(Schema::new(vec![
Field::new(column_left, DataType::UInt32, true),
Field::new("t1_name", DataType::Utf8, true),
Field::new("t1_int", DataType::UInt32, true),
]));
let t1_data = RecordBatch::try_new(
t1_schema,
vec![
Arc::new(UInt32Array::from(vec![
Some(11),
Some(11),
Some(22),
Some(33),
Some(44),
None,
])),
Arc::new(StringArray::from(vec![
Some("a"),
Some("a"),
Some("b"),
Some("c"),
Some("d"),
Some("e"),
])),
Arc::new(UInt32Array::from(vec![1, 1, 2, 3, 4, 0])),
],
)?;
ctx.register_batch("t1", t1_data)?;

let t2_schema = Arc::new(Schema::new(vec![
Field::new(column_right, DataType::UInt32, true),
Field::new("t2_name", DataType::Utf8, true),
Field::new("t2_int", DataType::UInt32, true),
]));
let t2_data = RecordBatch::try_new(
t2_schema,
vec![
Arc::new(UInt32Array::from(vec![
Some(11),
Some(11),
Some(22),
Some(44),
Some(55),
None,
])),
Arc::new(StringArray::from(vec![
Some("z"),
Some("z"),
Some("y"),
Some("x"),
Some("w"),
Some("v"),
])),
Arc::new(UInt32Array::from(vec![3, 3, 1, 3, 3, 0])),
],
)?;
ctx.register_batch("t2", t2_data)?;

Ok(ctx)
}

async fn register_aggregate_csv_by_sql(ctx: &SessionContext) {
let testdata = datafusion::test_util::arrow_test_data();

Expand Down
70 changes: 70 additions & 0 deletions datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3523,3 +3523,73 @@ set datafusion.optimizer.prefer_existing_sort = false;

statement ok
drop table annotated_data;

####
# nestedjoin_with_alias_test
####

query IIII
select * from ((select 1 as a, 2 as b) c INNER JOIN (select 1 as c, 3 as d) e on c.a = e.c) f;
----
1 2 1 3

####
# create_left_semi_anti_join_context_with_null_ids_table_test
####

statement ok
CREATE TABLE join_test_left(t1_id INT UNSIGNED, t1_name VARCHAR, t1_int INT UNSIGNED)
AS VALUES
(11, 'a', 1),
(11, 'a', 1),
(22, 'b', 2),
(33, 'c', 3),
(44, 'd', 4),
(NULL, 'e', 0);

statement ok
CREATE TABLE join_test_right(t2_id INT UNSIGNED, t2_name VARCHAR, t2_int INT UNSIGNED)
AS VALUES
(11, 'z', 3),
(11, 'z', 3),
(22, 'y', 1),
(33, 'x', 3),
(44, 'w', 3),
(NULL, 'v', 0);

query IT
SELECT t1_id, t1_name FROM join_test_left WHERE t1_id NOT IN (SELECT t2_id FROM join_test_right) ORDER BY t1_id;
----
NULL e

####
# join_partitioned_test
####

statement ok
CREATE TABLE join_partitioned_table(c1 INT UNSIGNED, c2 INT UNSIGNED, c3 BOOLEAN)
AS VALUES
(4, 1, true),
(4, 2, false),
(4, 3, true),
(4, 4, false);

query I
SELECT 1 FROM join_partitioned_table JOIN (SELECT c1 AS id1 FROM join_partitioned_table) AS a ON c1=id1;
----
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1

0 comments on commit b97daf7

Please sign in to comment.