Skip to content

Commit

Permalink
Destinations CDK: Correctly detect when real raw/final table is corre…
Browse files Browse the repository at this point in the history
…ct generation during truncate sync (#42503)
  • Loading branch information
edgao authored Aug 19, 2024
1 parent a7863a0 commit e3cc022
Show file tree
Hide file tree
Showing 5 changed files with 364 additions and 24 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:-----------|:-----------|:-------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.44.14 | 2024-08-19 | [\#42503](https://github.com/airbytehq/airbyte/pull/42503) | Destinations (refreshes) - correctly detect existing raw/final table of the correct generation during truncate sync |
| 0.44.13 | 2024-08-14 | [\#42579](https://github.com/airbytehq/airbyte/pull/42579) | S3 destination - OVERWRITE: keep files until successful sync of same generationId |
| 0.44.5 | 2024-08-09 | [\#43374](https://github.com/airbytehq/airbyte/pull/43374) | S3 destination V2 fields, conversion improvements, bugfixes |
| 0.44.4 | 2024-08-08 | [\#43410](https://github.com/airbytehq/airbyte/pull/43330) | Better logs for counting info to state message. |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.44.13
version=0.44.14
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
}

if (isTruncateSync) {
prepareStageForTruncate(destinationInitialStatus, stream)
rawTableSuffix = TMP_TABLE_SUFFIX
initialRawTableStatus = destinationInitialStatus.initialTempRawTableStatus
val (rawTableStatus, suffix) = prepareStageForTruncate(destinationInitialStatus, stream)
initialRawTableStatus = rawTableStatus
rawTableSuffix = suffix
} else {
rawTableSuffix = NO_SUFFIX
initialRawTableStatus = prepareStageForNormalSync(stream, destinationInitialStatus)
Expand Down Expand Up @@ -132,7 +132,17 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
private fun prepareStageForTruncate(
destinationInitialStatus: DestinationInitialStatus<DestinationState>,
stream: StreamConfig
) {
): Pair<InitialRawTableStatus, String> {
/*
tl;dr:
* if a temp raw table exists, check whether it belongs to the correct generation.
* if wrong generation, truncate it.
* regardless, write into the temp raw table.
* else, if a real raw table exists, check its generation.
* if wrong generation, write into a new temp raw table.
* else, write into the preexisting real raw table.
* else, create a new temp raw table and write into it.
*/
if (destinationInitialStatus.initialTempRawTableStatus.rawTableExists) {
val tempStageGeneration =
storageOperation.getStageGeneration(stream.id, TMP_TABLE_SUFFIX)
Expand All @@ -146,6 +156,7 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
stream.id,
TMP_TABLE_SUFFIX,
)
return Pair(destinationInitialStatus.initialTempRawTableStatus, TMP_TABLE_SUFFIX)
} else {
log.info {
"${stream.id.originalNamespace}.${stream.id.originalName}: truncate sync, and existing temp raw table belongs to generation $tempStageGeneration (!= current generation ${stream.generationId}). Truncating it."
Expand All @@ -156,18 +167,67 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
TMP_TABLE_SUFFIX,
replace = true,
)
// We nuked the temp raw table, so create a new initial raw table status.
return Pair(
InitialRawTableStatus(
rawTableExists = true,
hasUnprocessedRecords = false,
maxProcessedTimestamp = Optional.empty(),
),
TMP_TABLE_SUFFIX,
)
}
} else if (destinationInitialStatus.initialRawTableStatus.rawTableExists) {
// It's possible to "resume" a truncate sync that was previously already finalized.
// In this case, there is no existing temp raw table, and there is a real raw table
// which already belongs to the correct generation.
// Check for that case now.
val realStageGeneration = storageOperation.getStageGeneration(stream.id, NO_SUFFIX)
if (realStageGeneration == null || realStageGeneration == stream.generationId) {
log.info {
"${stream.id.originalNamespace}.${stream.id.originalName}: truncate sync, no existing temp raw table, and existing real raw table belongs to generation $realStageGeneration (== current generation ${stream.generationId}). Retaining it."
}
// The real raw table is from the correct generation. Set up any other resources
// (staging file, etc.), but leave the table untouched.
storageOperation.prepareStage(stream.id, NO_SUFFIX)
return Pair(destinationInitialStatus.initialRawTableStatus, NO_SUFFIX)
} else {
log.info {
"${stream.id.originalNamespace}.${stream.id.originalName}: truncate sync, existing real raw table belongs to generation $realStageGeneration (!= current generation ${stream.generationId}), and no preexisting temp raw table. Creating a temp raw table."
}
// We're initiating a new truncate refresh. Create a new temp stage.
storageOperation.prepareStage(
stream.id,
TMP_TABLE_SUFFIX,
)
return Pair(
// Create a fresh raw table status, since we created a fresh temp stage.
InitialRawTableStatus(
rawTableExists = true,
hasUnprocessedRecords = false,
maxProcessedTimestamp = Optional.empty(),
),
TMP_TABLE_SUFFIX,
)
}
// (if the existing temp stage is from the correct generation, then we're resuming
// a truncate refresh, and should keep the previous temp stage).
} else {
log.info {
"${stream.id.originalNamespace}.${stream.id.originalName}: truncate sync, and no preexisting temp raw table. Creating it."
"${stream.id.originalNamespace}.${stream.id.originalName}: truncate sync, and no preexisting temp or raw table. Creating a temp raw table."
}
// We're initiating a new truncate refresh. Create a new temp stage.
storageOperation.prepareStage(
stream.id,
TMP_TABLE_SUFFIX,
)
return Pair(
// Create a fresh raw table status, since we created a fresh temp stage.
InitialRawTableStatus(
rawTableExists = true,
hasUnprocessedRecords = false,
maxProcessedTimestamp = Optional.empty(),
),
TMP_TABLE_SUFFIX,
)
}
}

Expand All @@ -188,8 +248,39 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
// The table already exists. Decide whether we're writing to it directly, or
// using a tmp table.
if (isTruncateSync) {
// Truncate refresh. Use a temp final table.
return prepareFinalTableForOverwrite(initialStatus)
if (initialStatus.isFinalTableEmpty || initialStatus.finalTableGenerationId == null) {
if (!initialStatus.isSchemaMismatch) {
log.info {
"Truncate sync, and final table is empty and has correct schema. Writing to it directly."
}
return NO_SUFFIX
} else {
// No point soft resetting an empty table. We'll just do an overwrite later.
log.info {
"Truncate sync, and final table is empty, but has the wrong schema. Using a temp final table."
}
return prepareFinalTableForOverwrite(initialStatus)
}
} else if (
initialStatus.finalTableGenerationId >=
initialStatus.streamConfig.minimumGenerationId
) {
if (!initialStatus.isSchemaMismatch) {
log.info {
"Truncate sync, and final table matches our generation and has correct schema. Writing to it directly."
}
return NO_SUFFIX
} else {
log.info {
"Truncate sync, and final table matches our generation, but has the wrong schema. Writing to it directly, but triggering a soft reset first."
}
storageOperation.softResetFinalTable(stream)
return NO_SUFFIX
}
} else {
// The final table is in the wrong generation. Use a temp final table.
return prepareFinalTableForOverwrite(initialStatus)
}
} else {
if (initialStatus.isSchemaMismatch || initialStatus.destinationState.needsSoftReset()) {
// We're loading data directly into the existing table.
Expand Down Expand Up @@ -257,14 +348,14 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
// which is possible (`typeAndDedupe(streamConfig.id.copy(rawName = streamConfig.id.rawName
// + suffix))`
// but annoying and confusing.
if (isTruncateSync && streamSuccessful) {
if (isTruncateSync && streamSuccessful && rawTableSuffix.isNotEmpty()) {
log.info {
"Overwriting raw table for ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName} because this is a truncate sync and we received a stream success message."
"Overwriting raw table for ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName} because this is a truncate sync, we received a stream success message, and are using a temporary raw table."
}
storageOperation.overwriteStage(streamConfig.id, rawTableSuffix)
} else {
log.info {
"Not overwriting raw table for ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName}. Truncate sync: $isTruncateSync; stream success: $streamSuccessful"
"Not overwriting raw table for ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName}. Truncate sync: $isTruncateSync; stream success: $streamSuccessful; raw table suffix: \"$rawTableSuffix\""
}
}

Expand Down Expand Up @@ -303,10 +394,11 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
"Skipping typing and deduping for stream ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName} running as truncate sync. Stream success: $streamSuccessful; records written: ${syncSummary.recordsWritten}; temp raw table already existed: ${initialRawTableStatus.rawTableExists}; temp raw table had records: ${initialRawTableStatus.hasUnprocessedRecords}"
}
} else {
// In truncate mode, we want to read all the raw records. Typically, this is equivalent
// When targeting the temp final table, we want to read all the raw records
// because the temp final table is always a full rebuild. Typically, this is equivalent
// to filtering on timestamp, but might as well be explicit.
val timestampFilter =
if (!isTruncateSync) {
if (finalTmpTableSuffix.isEmpty()) {
initialRawTableStatus.maxProcessedTimestamp
} else {
Optional.empty()
Expand Down
Loading

0 comments on commit e3cc022

Please sign in to comment.