-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Spark] Add tests for implicit cast in INSERT to delta table #3605
Conversation
* Each take a unique path through analysis. The abstractions below captures these different | ||
* inserts to allow more easily running tests with all or a subset of them. | ||
*/ | ||
trait DeltaInsertIntoTest extends QueryTest with DeltaDMLTestUtils with DeltaSQLCommandTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is moved from TypeWideningInsertSchemaEvolutionSuite
.
StreamingInsert
was added in the process to cover streaming writes, and the test runner method testInserts
was updated to make it slightly more expressive
Tests covering streaming insert assume #3443 is merged, these will fail until that change is actually merged and will succeed afterwards. |
02e09e6
to
cd021a9
Compare
} | ||
|
||
/** df.writeStream.toTable() */ | ||
object StreamingInsert extends Insert { self: Insert => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is structured streaming. Not sure if we also want to test DStreams (Spark Streaming) as well and if it makes a difference since Structured Streaming should be build on top of DStreams.
This is using the default trigger. Do we want to test the behaviour with multiple batches in which only one leads to type widening?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't know about DStreams, I'm not sure whether it can be used directly to write to a Delta table or if that's something we consider should be supported. I would leave it out from this PR
For multiple batches: this PR isn't about type widening but implicit casting.
I have a PR open for type widening in the Delta sink which includes a test where we write multiple batches manually: https://github.com/delta-io/delta/pull/3626/files#diff-82a2720fce9d77504dc9f8e0365dc106e2113b4933d18fec17293b0244504460R197
initialSchemaDDL: String, | ||
initialJsonData: Seq[String], | ||
partitionBy: Seq[String] = Seq.empty, | ||
insertSchemaDDL: String, | ||
insertJsonData: Seq[String], | ||
overwriteWhere: (String, Int), | ||
expectedSchema: StructType = null, | ||
checkError: SparkThrowable => Unit = null, | ||
includeInserts: Seq[Insert] = allInsertTypes, | ||
excludeInserts: Seq[Insert] = Seq.empty, | ||
confs: Seq[(String, String)] = Seq.empty): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This list is getting long, and it also seems like maybe some data goes together. E.g. (initialSchemaDDL, initialJsonData) and (insertSchemaDDL, insertJsonData) should have probably been a case class. (includeInserts, excludeInserts) can be 1 variable passed in with the default value of allInsertTypes
. This PR is already quite big so if you choose to do this additional refactor it'd be better to do it in another PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good points. I've merged expectedSchema
and checkError
into a single argument since I'm touching this arg here already but left the other ones as is to avoid having to update all tests and get a massive diff.
I'll keep that in mind to refactor and simplify later on
spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTest.scala
Show resolved
Hide resolved
spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTest.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm!
class DeltaInsertIntoImplicitCastSuite extends DeltaInsertIntoTest { | ||
|
||
for (schemaEvolution <- BOOLEAN_DOMAIN) { | ||
testInserts("insert with implicit up and down cast on top-level fields, " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: isn't there no implicit up and down cast since the schema remains the same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The schema of the data inserted isn't the same as the schema of the table:
table: a long, b int
data: a int, b long
so there' is an upcast for a
and downcast for b
|
||
testInserts("insert with implicit up and down cast on fields nested in map, " + | ||
s"schemaEvolution=$schemaEvolution")( | ||
initialSchemaDDL = "key int, m map<string, struct<x: long, y: int>>", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we test the map
's key as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to be hard to test here, these tests rely on parsing json to setup the table and ingest data. and I don't believe that supports using anything other than a string for map keys.
That would be a pretty significant rewrite of all the tests
#3762) ## Description Follow on #3605 Adds more tests covering behavior for all ways of running insert with: - an extra column or struct field in the input, in `DeltaInsertIntoSchemaEvolutionSuite` - a missing column or struct field in the input, in `DeltaInsertIntoImplicitCastSuite` - a different column or field ordering than the table schema, in `DeltaInsertIntoColumnOrderSuite` Note: tests are spread across multiple suites as each test case covers 20 different ways to run inserts, quickly leading to large test suites. This change includes improvements to `DeltaInsertIntoTest`: - Group all types of inserts into categories that are easier to reference in tests: - SQL vs. Dataframe inserts - Position-based vs. name-based inserts - Append vs. overwrite - Provide a mechanism to ensure that each test covers all existing insert types. ## How was this patch tested? N/A: test only
Description
Add tests covering implicit casting when inserting into a Delta table.
Covers various insert API:
Changes:
TypeWideningInsertSchemaEvolutionSuite
and into its own trait to allow reusability.How was this patch tested?
Test-only
Does this PR introduce any user-facing changes?
No