-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Destination Snowflake: Sync Id, generation_id and Meta #39107
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
This stack of pull requests is managed by Graphite. Learn more about stacking. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks reasonable, added some nitpicky comments but wouldn't block on any of them. Lmk if you want to ship this pr standalone and I'll approve 👍
...ain/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStagingClient.kt
Outdated
Show resolved
Hide resolved
...ain/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStagingClient.kt
Outdated
Show resolved
Hide resolved
...ain/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStagingClient.kt
Outdated
Show resolved
Hide resolved
...ain/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStagingClient.kt
Outdated
Show resolved
Hide resolved
...ain/kotlin/io/airbyte/integrations/destination/snowflake/operation/SnowflakeStagingClient.kt
Outdated
Show resolved
Hide resolved
...irbyte/integrations/destination/snowflake/operation/SnowflakeStagingClientIntegrationTest.kt
Outdated
Show resolved
Hide resolved
...irbyte/integrations/destination/snowflake/operation/SnowflakeStagingClientIntegrationTest.kt
Outdated
Show resolved
Hide resolved
...irbyte/integrations/destination/snowflake/operation/SnowflakeStagingClientIntegrationTest.kt
Outdated
Show resolved
Hide resolved
...irbyte/integrations/destination/snowflake/operation/SnowflakeStagingClientIntegrationTest.kt
Outdated
Show resolved
Hide resolved
...irbyte/integrations/destination/snowflake/operation/SnowflakeStagingClientIntegrationTest.kt
Outdated
Show resolved
Hide resolved
f07e155
to
925e6fe
Compare
aa96c15
to
5997f8c
Compare
925e6fe
to
7afc0ca
Compare
b19a5c5
to
0b6a891
Compare
393d5f1
to
f94814f
Compare
0b6a891
to
88c29a5
Compare
f94814f
to
c754c40
Compare
88c29a5
to
d94131a
Compare
c754c40
to
459c37c
Compare
d94131a
to
8f94ff2
Compare
c46d15a
to
74c83db
Compare
74c83db
to
c0a5ee5
Compare
c0a5ee5
to
3180ccc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a couple nitpicks, otherwise lgtm
) | ||
val rawTableDefinition = | ||
results | ||
.groupBy { it.get("schema_name").asText()!! } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we're already restricting to specifically airbyte_internal.whatever, why do we need to group by schema name / table name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just to avoid. list has 1 element and first()
call i guess.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you get the same result with this?
TableDefinition(
results.associateTo(LinkedHashMap()) {
// return value of data_type in show columns is a json string.
val dataType = Jsons.deserialize(it.get("data_type").asText())
it.get("column_name").asText()!! to
ColumnDefinition(
it.get("column_name").asText(),
dataType.get("type").asText(),
0,
dataType.get("nullable").asBoolean(),
)
}
)
i.e. don't need to group on schema/table name at all, just directly build the map of column name to definition
(... but also, I would hope that kotlin has some easy .first
accessor :P )
...io/airbyte/integrations/destination/snowflake/migrations/SnowflakeAbMetaAndGenIdMigration.kt
Show resolved
Hide resolved
...io/airbyte/integrations/destination/snowflake/migrations/SnowflakeAbMetaAndGenIdMigration.kt
Show resolved
Hide resolved
...io/airbyte/integrations/destination/snowflake/migrations/SnowflakeAbMetaAndGenIdMigration.kt
Show resolved
Hide resolved
7910b55
to
1638b6d
Compare
1638b6d
to
7910b55
Compare
3180ccc
to
fb50cbf
Compare
7910b55
to
feba86e
Compare
1f3e971
to
e728b2c
Compare
e728b2c
to
adecfad
Compare
Hi @gisripa
(if you have a remedy for this we could implement that would be great) |
@pKorsholm / @tuday2 we believe this only affects |
For me its "Full Refresh / Overwrite" and "Incremental | Append + Deduped", i have at least 20+ examples |
can you send the logs for one of the dedup streams running into problems? (feel free to slack me if you don't want to post them publicly) so far we've only seen this happen on overwrite streams, so I'm not sure what's happening there |
in the meantime, we have #39399, which should fix for overwrite streams. It's just pending green CI. |
for vis: @tuday2 sent me the logs - thanks! confirmed that the streams with errors were all full refresh overwrite, which caused the UI to mark other streams as failed (b/c the entire sync terminated uncleanly) |
What
Adding migration for Snowflake Raw tables and final tables to include
_airbyte_meta
and_airbyte_generation_id
columns.Tests and refactors to adapt to latest CDK.
Review guide
User Impact
Can this PR be safely reverted and rolled back?