Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): support iceberg predicate pushdown #19228

Merged
merged 42 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
a644ce9
match rw predicates
kwannoel Oct 28, 2024
8783214
mark place to add filter
kwannoel Oct 29, 2024
fe92b99
pass in schema fields as a parameter
kwannoel Nov 1, 2024
19645b7
convert input ref to reference, datum to iceberg datum
kwannoel Nov 1, 2024
425f801
convert rw expressions into iceberg predicates
kwannoel Nov 2, 2024
bc197f4
add support for more literals
kwannoel Nov 2, 2024
f084464
add predicate proto
kwannoel Nov 4, 2024
2400e4a
interim commit: use iceberg proto
kwannoel Nov 4, 2024
a28e79b
change predicate_pushdown return
kwannoel Nov 4, 2024
43ba97d
derive eq, hash for iceberg predicate
kwannoel Nov 4, 2024
09268d6
interim commit: add iceberg_predicate to batch
kwannoel Nov 4, 2024
6b7cf2d
add fetch_parameters
kwannoel Nov 5, 2024
e4154ec
Revert "derive eq, hash for iceberg predicate"
kwannoel Nov 6, 2024
a1be44d
Revert "change predicate_pushdown return"
kwannoel Nov 6, 2024
08b3676
Revert "interim commit: use iceberg proto"
kwannoel Nov 6, 2024
31fa17f
Revert "add predicate proto"
kwannoel Nov 6, 2024
d9e4a9e
Revert "interim commit: add iceberg_predicate to batch"
kwannoel Nov 6, 2024
9580498
use iceberg predicate in logical_iceberg_scan fields
kwannoel Nov 6, 2024
631c28f
add to batch
kwannoel Nov 6, 2024
77342c0
build with predicate
kwannoel Nov 6, 2024
06d8f8a
clean
kwannoel Nov 6, 2024
081dfa6
implement distill
kwannoel Nov 7, 2024
e7a3609
fix warn
kwannoel Nov 7, 2024
9090456
add tests
kwannoel Nov 7, 2024
9c1c876
no verbose for wget
kwannoel Nov 7, 2024
99f8874
more tests
kwannoel Nov 7, 2024
a4e03c1
check results
kwannoel Nov 7, 2024
aede6a9
fix bugs
kwannoel Nov 7, 2024
1a8b370
explain source plan, maybe schema malformed
kwannoel Nov 7, 2024
0f561ac
fix tests
kwannoel Nov 8, 2024
7685a97
fmt
kwannoel Nov 8, 2024
ddb7d7b
increase timeout
kwannoel Nov 8, 2024
4007509
no need double assert
kwannoel Nov 8, 2024
1051e09
docs
kwannoel Nov 8, 2024
e9642ec
fmt
kwannoel Nov 8, 2024
1ba69ea
increase timeout
kwannoel Nov 8, 2024
a860136
use rule based predicate push down
kwannoel Nov 8, 2024
69cad02
prune BatchFilter if predicate always true
kwannoel Nov 8, 2024
311f3d0
test mix filter and predicate
kwannoel Nov 8, 2024
5df282d
handle case where iceberg split is empty
kwannoel Nov 14, 2024
b4187d3
panic if doing unsupported ops
kwannoel Nov 14, 2024
f98b5fe
fix metadata
kwannoel Nov 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/scripts/e2e-iceberg-sink-v2-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ poetry run python main.py -t ./test_case/iceberg_select_empty_table.toml
poetry run python main.py -t ./test_case/iceberg_source_equality_delete.toml
poetry run python main.py -t ./test_case/iceberg_source_position_delete.toml
poetry run python main.py -t ./test_case/iceberg_source_all_delete.toml
poetry run python main.py -t ./test_case/iceberg_predicate_pushdown.toml


echo "--- Kill cluster"
Expand Down
3 changes: 1 addition & 2 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -414,14 +414,13 @@ steps:
depends_on:
- "build"
- "build-other"

plugins:
- docker-compose#v5.1.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 7
timeout_in_minutes: 9
retry: *auto-retry

- label: "end-to-end iceberg sink v2 test (release)"
Expand Down
2 changes: 1 addition & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ steps:
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 15
timeout_in_minutes: 17
retry: *auto-retry

- label: "end-to-end iceberg cdc test"
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/iceberg/start_spark_connect_server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ PACKAGES="$PACKAGES,org.apache.spark:spark-connect_2.12:$SPARK_VERSION"
SPARK_FILE="spark-${SPARK_VERSION}-bin-hadoop3.tgz"

if [ ! -d "spark-${SPARK_VERSION}-bin-hadoop3" ];then
wget https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/$SPARK_FILE
wget --no-verbose https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/$SPARK_FILE
tar -xzf $SPARK_FILE --no-same-owner
fi

Expand Down
143 changes: 143 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_predicate_pushdown.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
statement ok
set sink_decouple = false;

statement ok
set streaming_parallelism=4;

statement ok
drop table if exists s1 cascade;

statement ok
CREATE TABLE s1 (i1 int, i2 varchar, i3 varchar);

statement ok
insert into s1 select x, 'some str', 'another str' from generate_series(1, 500) t(x);

statement ok
insert into s1 select x, null as y, null as z from generate_series(501, 1000) t(x);

statement ok
flush;

statement ok
CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM s1;

statement ok
CREATE SINK sink1 AS select * from mv1 WITH (
connector = 'iceberg',
type = 'append-only',
force_append_only = 'true',
database.name = 'demo_db',
table.name = 't1',
catalog.name = 'demo',
catalog.type = 'storage',
warehouse.path = 's3a://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
commit_checkpoint_interval = 1,
create_table_if_not_exists = 'true'
);

statement ok
drop source if exists iceberg_t1_source;

statement ok
CREATE SOURCE iceberg_t1_source
WITH (
connector = 'iceberg',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
s3.path.style.access = 'true',
catalog.type = 'storage',
warehouse.path = 's3a://icebergdata/demo',
database.name = 'demo_db',
table.name = 't1',
);

statement ok
flush;

query I
select * from iceberg_t1_source order by i1 limit 1;
----
1 some str another str

query I
select count(*) from iceberg_t1_source;
----
1000

query I
select * from iceberg_t1_source where i1 > 990 order by i1;
----
991 NULL NULL
992 NULL NULL
993 NULL NULL
994 NULL NULL
995 NULL NULL
996 NULL NULL
997 NULL NULL
998 NULL NULL
999 NULL NULL
1000 NULL NULL

query I
explain select * from iceberg_t1_source where i1 > 500 and i1 < 600 and i1 >= 550 and i1 <= 590 and i1 != 570 and i1 = 580;
----
BatchExchange { order: [], dist: Single }
└─BatchIcebergScan { source: iceberg_t1_source, columns: [i1, i2, i3], predicate: (((((i1 = 580) AND (i1 > 500)) AND (i1 < 600)) AND (i1 >= 550)) AND (i1 <= 590)) AND (i1 != 570) }

query I
select i1 from iceberg_t1_source where i1 > 500 and i1 < 600 and i1 >= 550 and i1 <= 590 and i1 != 570 and i1 = 580;
----
580

query I
explain select * from iceberg_t1_source where i1 in (1, 2, 3, 4, 5);
----
BatchExchange { order: [], dist: Single }
└─BatchIcebergScan { source: iceberg_t1_source, columns: [i1, i2, i3], predicate: i1 IN (5, 4, 1, 3, 2) }

query I
select i1 from iceberg_t1_source where i1 in (1, 2, 3, 4, 5) order by i1;
----
1
2
3
4
5

query I
select count(*), i2, i3 from iceberg_t1_source where i2 = 'some str' and i3 = 'another str' group by i2, i3;
----
500 some str another str

query I
explain select i1 from iceberg_t1_source where i1 > 500 and i2 = i3;
----
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [i1] }
└─BatchFilter { predicate: (i2 = i3) }
└─BatchIcebergScan { source: iceberg_t1_source, columns: [i1, i2, i3], predicate: i1 > 500 }

query I
select i1 from iceberg_t1_source where i1 > 500 and i2 = i3;
----

# Empty splits should not panic
query I
select i1 from iceberg_t1_source where i1 > 1001;
----

statement ok
DROP SINK sink1;

statement ok
DROP SOURCE iceberg_t1_source;

statement ok
DROP TABLE s1 cascade;
11 changes: 11 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_predicate_pushdown.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
init_sqls = [
'CREATE SCHEMA IF NOT EXISTS demo_db',
'DROP TABLE IF EXISTS demo_db.t1',
]

slt = 'test_case/iceberg_predicate_pushdown.slt'

drop_sqls = [
'DROP TABLE IF EXISTS demo_db.t1',
'DROP SCHEMA IF EXISTS demo_db',
]
42 changes: 32 additions & 10 deletions src/connector/src/source/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::collections::HashMap;
use anyhow::anyhow;
use async_trait::async_trait;
use futures_async_stream::for_await;
use iceberg::expr::Predicate as IcebergPredicate;
use iceberg::scan::FileScanTask;
use iceberg::spec::TableMetadata;
use iceberg::table::Table;
Expand Down Expand Up @@ -137,6 +138,19 @@ pub struct IcebergSplit {
pub position_delete_files: Vec<IcebergFileScanTaskJsonStr>,
}

impl IcebergSplit {
pub fn empty(table_meta: TableMetadataJsonStr) -> Self {
Self {
split_id: 0,
snapshot_id: 0,
table_meta,
files: vec![],
equality_delete_files: vec![],
position_delete_files: vec![],
}
}
}

impl SplitMetaData for IcebergSplit {
fn id(&self) -> SplitId {
self.split_id.to_string().into()
Expand Down Expand Up @@ -189,6 +203,7 @@ impl IcebergSplitEnumerator {
schema: Schema,
time_traval_info: Option<IcebergTimeTravelInfo>,
batch_parallelism: usize,
predicate: IcebergPredicate,
) -> ConnectorResult<Vec<IcebergSplit>> {
if batch_parallelism == 0 {
bail!("Batch parallelism is 0. Cannot split the iceberg files.");
Expand All @@ -199,14 +214,9 @@ impl IcebergSplitEnumerator {
let current_snapshot = table.metadata().current_snapshot();
if current_snapshot.is_none() {
// If there is no snapshot, we will return a mock `IcebergSplit` with empty files.
return Ok(vec![IcebergSplit {
split_id: 0,
snapshot_id: 0,
table_meta: TableMetadataJsonStr::serialize(table.metadata()),
files: vec![],
equality_delete_files: vec![],
position_delete_files: vec![],
}]);
return Ok(vec![IcebergSplit::empty(TableMetadataJsonStr::serialize(
table.metadata(),
))]);
}

let snapshot_id = match time_traval_info {
Expand Down Expand Up @@ -246,11 +256,15 @@ impl IcebergSplitEnumerator {

let require_names = Self::get_require_field_names(&table, snapshot_id, &schema).await?;

let table_schema = table.metadata().current_schema();
tracing::debug!("iceberg_table_schema: {:?}", table_schema);

let mut position_delete_files = vec![];
let mut data_files = vec![];
let mut equality_delete_files = vec![];
let scan = table
.scan()
.with_filter(predicate)
.snapshot_id(snapshot_id)
.select(require_names)
.build()
Expand Down Expand Up @@ -302,10 +316,18 @@ impl IcebergSplitEnumerator {
.files
.push(data_files[split_num * split_size + i].clone());
}
Ok(splits
let splits = splits
.into_iter()
.filter(|split| !split.files.is_empty())
.collect_vec())
.collect_vec();

if splits.is_empty() {
return Ok(vec![IcebergSplit::empty(TableMetadataJsonStr::serialize(
table.metadata(),
))]);
}

Ok(splits)
}

/// The required field names are the intersection of the output shema and the equality delete columns.
Expand Down
8 changes: 8 additions & 0 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,14 @@ impl PlanRoot {
ApplyOrder::BottomUp,
));

// For iceberg scan, we do iceberg predicate pushdown
// BatchFilter -> BatchIcebergScan
let plan = plan.optimize_by_rules(&OptimizationStage::new(
"Iceberg Predicate Pushdown",
vec![BatchIcebergPredicatePushDownRule::create()],
ApplyOrder::BottomUp,
));

assert_eq!(plan.convention(), Convention::Batch);
Ok(plan)
}
Expand Down
6 changes: 6 additions & 0 deletions src/frontend/src/optimizer/plan_node/batch_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ impl BatchFilter {
pub fn predicate(&self) -> &Condition {
&self.core.predicate
}

pub fn clone_with_predicate(&self, predicate: Condition) -> Self {
let mut core = self.core.clone();
core.predicate = predicate;
Self::new(core)
}
}
impl_distill_by_unit!(BatchFilter, core, "BatchFilter");

Expand Down
Loading
Loading