Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: build physical plan for FK table-table joins #7517

Merged
merged 7 commits into from
May 14, 2021
Merged

feat: build physical plan for FK table-table joins #7517

merged 7 commits into from
May 14, 2021

Conversation

mjsax
Copy link
Member

@mjsax mjsax commented May 12, 2021

Description

Add step to add FK-join to the physical plan.

Testing done

Added some unit testing. Later we need to extend QTT tests when we actually use this code (that is currently "deal code").

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@mjsax mjsax requested a review from a team as a code owner May 12, 2021 21:53
@@ -28,15 +28,13 @@
.valueColumn(ColumnName.of("R_K"), SqlTypes.STRING) // Copy of key in value
.build();

private JoinParams joinParams;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side cleanup to inline variable.

@@ -193,7 +193,12 @@ public void shouldReturnCorrectSchema() {
// Then:
assertThat(
result.getSchema(),
is(JoinParamsFactory.create(R_KEY, LEFT_SCHEMA, RIGHT_SCHEMA).getSchema())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side cleanup: We should not use JoinParamsFactory to compute the expected result, because JoinParamsFactory is use to compute the actual result within join.build(...). Otherwise we have a cyclic tes "dependency"...

Same below.

@mjsax mjsax changed the title Add fk joins feat: build physical plan for FK table-table joins May 12, 2021
@mjsax mjsax mentioned this pull request May 12, 2021
Comment on lines 36 to 42
public ForeignKeyTableTableJoin(
final ExecutionStepPropertiesV1 props,
final JoinType joinType,
final ColumnName leftJoinColumnName,
final ExecutionStep<KTableHolder<KLeftT>> leftSource,
final ExecutionStep<KTableHolder<KRightT>> rightSource
) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we should have two classes for each time of join. They have repeated code except the extra left column name.

Also, I'm not sure if the TableTableJoin info is written in the plan. I see that class has a @JsonCreator and it is also referenced in PlanSummary.java as .put(TableTableJoin.class, "JOIN"). Did you verify this part?

The TableTableJoin accepts an optional keyColumn. Perhaps we can add another parameter to accept an optional nonKeyColumn too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we should have two classes for each time of join. They have repeated code except the extra left column name.

I think it makes sense to have a separate execution step for FK joins since the implementation is different. Execution steps are meant to be fine-grained so we can evolve them separately as needed.

Also, I'm not sure if the TableTableJoin info is written in the plan. I see that class has a @JsonCreator and it is also referenced in PlanSummary.java as .put(TableTableJoin.class, "JOIN"). Did you verify this part?

PlanSummary.java, ExecutionStep.java, and StepSchemaResolver.java all need to be updated to include the new execution step. Let's add these changes into this PR so we don't hit confusing failures down the line when wiring up the functionality.

The TableTableJoin accepts an optional keyColumn. Perhaps we can add another parameter to accept an optional nonKeyColumn too?

Unless I'm mistaken, the optional keyColumn in TableTableJoin specifies the name of the output key column. That is not necessary for FK joins because the output key always matches the left source key.

public static <KLeftT, KRightT> ForeignKeyTableTableJoin<KLeftT, KRightT>
foreignKeyTableTableJoin(final QueryContext.Stacker stacker,
final JoinType joinType,
final ColumnName leftJoinColumnName,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could reuse the current tableTableJoin method by passing the new leftJoinColumnName as optional, don't you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're going to have a separate execution step, we should have a separate method for creating the execution step. (See above comment.)

Comment on lines +29 to +33
public static <KLeftT, KRightT> KTableHolder<KLeftT> build(
final KTableHolder<KLeftT> left,
final KTableHolder<KRightT> right,
final ForeignKeyTableTableJoin<KLeftT, KRightT> join
) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we add info to the current TableTableJoin where we could check if the join is on a foreign key? Maybe we can reuse the TableTableJoinBuilder, and just change the impl. to call a different builder for foregign keys and another that has the current code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, but it seems cleaner to have a dedicated node for FK-joins in the physical plan, as mentioned by Victory above.

Copy link
Contributor

@vcrfxia vcrfxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mjsax . LGTM minus the comments inline.

Comment on lines 36 to 42
public ForeignKeyTableTableJoin(
final ExecutionStepPropertiesV1 props,
final JoinType joinType,
final ColumnName leftJoinColumnName,
final ExecutionStep<KTableHolder<KLeftT>> leftSource,
final ExecutionStep<KTableHolder<KRightT>> rightSource
) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we should have two classes for each time of join. They have repeated code except the extra left column name.

I think it makes sense to have a separate execution step for FK joins since the implementation is different. Execution steps are meant to be fine-grained so we can evolve them separately as needed.

Also, I'm not sure if the TableTableJoin info is written in the plan. I see that class has a @JsonCreator and it is also referenced in PlanSummary.java as .put(TableTableJoin.class, "JOIN"). Did you verify this part?

PlanSummary.java, ExecutionStep.java, and StepSchemaResolver.java all need to be updated to include the new execution step. Let's add these changes into this PR so we don't hit confusing failures down the line when wiring up the functionality.

The TableTableJoin accepts an optional keyColumn. Perhaps we can add another parameter to accept an optional nonKeyColumn too?

Unless I'm mistaken, the optional keyColumn in TableTableJoin specifies the name of the output key column. That is not necessary for FK joins because the output key always matches the left source key.

public static <KLeftT, KRightT> ForeignKeyTableTableJoin<KLeftT, KRightT>
foreignKeyTableTableJoin(final QueryContext.Stacker stacker,
final JoinType joinType,
final ColumnName leftJoinColumnName,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're going to have a separate execution step, we should have a separate method for creating the execution step. (See above comment.)

public static LogicalSchema createSchema(
final LogicalSchema leftSchema,
final LogicalSchema rightSchema
) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to validate that the join expression types match. If we follow the pattern of the existing JoinParamsFactory, we can do that here.

JoinParamsFactory also validates that the join expressions represent the complete key, which is a requirement for the right side of FK joins as well. I think we don't need this extra validation here in light of the recent updates to LogicalPlanner to check this condition up front, but it may make sense to have a duplicate check as a second guard in the physical layer here as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to validate that the join expression types match. If we follow the pattern of the existing JoinParamsFactory, we can do that here.

Yes, but this should be done in the logical planner IMHO.

Happy to add a second guard.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are we currently validating that the expression types match? I don't see the check in either the logical or the physical layer right now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For table-table joins, it happens in JoinParamsFactory.throwOnKeyMismatch (but I think it's wrong to do this check when building the physical plan, but it should be done earlier.

Thus, I did not add it to ForeignKeyJoinParamsFactory and I think @spena current WIP PR should add this check. (And we should move the existing check for PK-joins in a follow up PR into the logical planner.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JoinParamsFactory.throwOnKeyMismatch() is called from JoinParamsFactory.createSchema(), which is called from JoinNode.buildJoinSchema(), which is called as part of attempting to build the join from the logical planner. In other words, this check is already happening in the logical planner (for table-table joins today). Where are you proposing that the check should happen instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is quite convoluted code than, because JoinParamsFactory is in package io.confluent.ksql.execution.streams -- the logical planner should not use helpers from the physical planner IMHO. Maybe the solution is to move the whole factory class into the logical planner package?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method is also called from the physical layer when executing the execution step. Ideally this method would be in a separate package clearly meant to be shared between the two, rather than entirely in one or the other.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. Think we don't need to resolve this in this PR. We only need to make sure that @spena PR's adds the type check.

}

@Test
public void shouldDoLeftJoinOnKey() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this test? Isn't this a primary key join, not a foreign key join?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you are right -- seems I overshot on corner case testing. Even if we could compile a PK_PK-join as a FK-join and it should still work -- it would be less efficient though I guess.

This actually raises a question: if we have two tables with different partition count and/or different key format, could we implement a PK-PK-join without repartitioning one input table (or even rejecting the join do the missing co-partitioning), but compiling it down as FK-join?

Thus, I am wondering if we should just keep this test, because from a physical plan point of view, this should work, even if we don't use it when translating the logical plan into a physical plan atm -- but maybe we would do this in the future?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we have two tables with different partition count and/or different key format, could we implement a PK-PK-join without repartitioning one input table (or even rejecting the join do the missing co-partitioning), but compiling it down as FK-join?

I suppose we could but it seems odd to do this by default, as it allows users to inadvertently write queries with poorer performance than expected. I'm not convinced we need these unit tests until (and if) we decide to do this one day, but I don't feel strongly. If we're going to leave them in, can we at least add a comment explaining that these are actually primary key joins which won't reach this code today, but we've left it in the future? Otherwise future readers may be similarly confused.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Also not sure if we should do it, but from a unit-test point of view, it seem ok to keep the tests, as unit testing the physical plan should not know anything about the logical plan layer.

Will add a comment to clarify.

}

@Test
public void shouldDoInnerJoinOnKey() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as above: What is the purpose of this test? Isn't this a primary key join, not a foreign key join?

);
}

// not sure what "legacy schema" is and if we need to support it?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is necessary. LEGACY_KEY_COL refers to the old default output key column name for joins. For FK joins, the output key columns always match the left source key columns.

@@ -0,0 +1,110 @@
/*
* Copyright 2019 Confluent Inc.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: update year (and similarly for all the other new files in this PR)

final LogicalSchema rightSchema
) {
final Builder builder = LogicalSchema.builder()
.keyColumns(leftSchema.key())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It occurs to me now that these column names might be aliased, which we probably don't want. Can we verify? Can also defer to a later PR when we wire everything together but my current suspicion is that this will be a problem.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not 100% sure atm -- leftSchema.key()returns Column that only has a ColumnName but no "alias".

But yes, we need to keep aliases in mind... I think it might be best to defer to later.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass over the non-testing part.

final LogicalSchema leftSchema,
final ColumnName leftJoinColumnName) {

final Optional<Column> leftJoinColumn = leftSchema.findValueColumn(leftJoinColumnName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my understanding, is it possible that due to alias we would not find a match here? I.e. if we do

... SELECT A.a AS a1, ... WHERE A.a = B.b

would we try to find a in a schema with a1 then?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw new IllegalStateException("invalid join type");
}

return KTableHolder.unmaterialized(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it intentional to not materialize the join results?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's was kinda intentional, because TableTableJoinBuilder() does the same thing. Not sure what the status of materializing table-table joins result is. \cc @cprasad1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We skip materializing the Table-Table join results immediately. We materialize at the TableSelectBuilder which comes after the TableTableJoinBuilder(). It makes sense to follow the same pattern and skip materializing here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifically, we materialize here:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks @cprasad1 for confirming!

);
break;
default:
throw new IllegalStateException("invalid join type");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: also include the type enum?

import java.util.Objects;
import java.util.function.Function;

public final class KsqlKeyExtractor<KRightT> implements Function<GenericRow, KRightT> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question here: I saw different Extractor classes in the repo, and some with a Ksql prefix in its name and some do not. Is there a convention for the naming at the moment? cc @agavra @vcrfxia

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔥 ⚠️ - I really don't like Ksql prefix. I don't think we have a convention, so it's really just do what you like.

Copy link
Contributor

@vcrfxia vcrfxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM besides the couple minor comments I just added. Thanks @mjsax !

@mjsax mjsax merged commit 744fa36 into confluentinc:master May 14, 2021
@mjsax mjsax deleted the add-fk-joins branch May 14, 2021 20:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants