-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
S3 Dest emits V2 fields and captures failures #42409
S3 Dest emits V2 fields and captures failures #42409
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Skipped Deployment
|
This stack of pull requests is managed by Graphite. Learn more about stacking. Join @johnny-schmidt and the rest of your teammates on Graphite |
@@ -19,6 +32,8 @@ class NoFlatteningSheetGenerator : BaseSheetGenerator(), CsvSheetGenerator { | |||
|
|||
/** When no flattening is needed, the record column is just one json blob. */ | |||
override fun getRecordColumns(json: JsonNode): List<String> { | |||
return listOf(Jsons.serialize(json)) | |||
val tmp = Jsons.serialize(json) | |||
println("tmp: $tmp") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll remove this.
470eee5
to
f5414ea
Compare
480dc23
to
285767f
Compare
f5414ea
to
5c9ea03
Compare
285767f
to
92cca0f
Compare
@@ -22,6 +25,29 @@ class AvroRecordFactory(private val schema: Schema?, private val converter: Json | |||
companion object { | |||
private val MAPPER: ObjectMapper = MoreMappers.initMapper() | |||
private val WRITER: ObjectWriter = MAPPER.writer() | |||
|
|||
fun createV1JsonToAvroConverter(): JsonAvroConverter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we mark this as deprecated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possibly. It's still in use by GCS at least, but should go away completely when that's done.
|
||
return converter!!.convertToGenericDataRecord(WRITER.writeValueAsBytes(jsonRecord), schema) | ||
} | ||
|
||
@Throws(JsonProcessingException::class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deprecate?
@@ -84,6 +91,7 @@ protected constructor(protected val outputFormat: FileUploadFormat) : | |||
DateTime.now(DateTimeZone.UTC), | |||
s3DestinationConfig.pathFormat!!, | |||
) | |||
println("outputPrefix: $outputPrefix") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do
} | ||
|
||
run { | ||
standardInput = System.in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this addition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also for testing, will remove. (It's necessary to make the running gradle run
task accept input from stdin.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with minor comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems reasonable, had some comments/questions. The runtime code changes looked like mostly plumbing so I didn't read them in much detail; had some more questions in the test code. LGTM once everything is resolved
@@ -42,7 +42,7 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu | |||
*/ | |||
@Deprecated("") | |||
@Throws(IOException::class) | |||
protected abstract fun writeRecord(record: AirbyteRecordMessage) | |||
protected abstract fun writeRecord(record: AirbyteRecordMessage, generationId: Long = 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
weird thought: should we take generation ID via the constructor? since it's constant across the entire sync
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I considered that, as well as changes necessary to make my code use the new methods.
I think that's the correct design. Threading things through is an anti pattern.
But the even more correct pattern is for this code to know nothing about airbyte fields, which I imagine is how it will work in a new CDK. So I punted on doing the work to rewrite anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, is it constant per sync or per stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah yeah, it's per stream. Isn't this object instanced per stream? (totally fair that this would be too much work for no payoff though)
...nations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroRecordFactory.kt
Outdated
Show resolved
Hide resolved
) | ||
AvroSerializedBuffer(createStorageFunction.call(), codecFactory, schema) | ||
println("schema: $schema") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this println / use a logger instance
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a stray debug statement, which I'll remove
I thought we should not log client schemas?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
client data is bad to log, but we treat the schemas as non-sensitive (e.g. a lot of DB sources log out every column in every table for debugging purposes)
.name(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID) | ||
.type(Schema.create(Schema.Type.LONG)) | ||
.noDefault() | ||
val changeSchema: Schema = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this looks identical to the schema in AvroFieldConversionFailureListener, probably better to reuse that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll try to migrate it to AvroConstants
json.put(JavaBaseConstants.COLUMN_NAME_AB_ID, UUID.randomUUID().toString()) | ||
json.put(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, record.emittedAt) | ||
} | ||
|
||
@Deprecated("Deprecated in Java") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.... I know you didn't write this, but I assume this should just be deprecated everywhere? I'm assuming this method was deprecated before we introduced kotlin >.>
...ures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseCsvDestinationAcceptanceTest.kt
Outdated
Show resolved
Hide resolved
...es/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseJsonlDestinationAcceptanceTest.kt
Show resolved
Hide resolved
...-destinations/src/testFixtures/resources/v0/users_with_generation_id_configured_catalog.json
Outdated
Show resolved
Hide resolved
) | ||
|
||
val jsons = MoreMappers.initMapper().createObjectNode() | ||
// Iterate over every key value pair in the json node |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checking my understanding: given this node
:
{
"_airbyte_raw_id": "foo",
"_airbyte_data": {
"id": 42
}
}
we would return this jsons
:
{
"id": 42,
"_airbyte_data": {
"id": 42
}
}
(assuming yes: are we intentionally promoting all the data fields, and also keeping the airbyte_data sub-object?)
... also, why not just this? is the idea to preserve non-airbyte top-level fields?
val newObjectNode = node["_airbyte_data"].deepCopy()
newObject.put("_airbyte_data", node["_airbyte_data"])
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're correct, but that's a mistake. I'm missing an else
near the end.
The intention is to remove airbyte fields AND promote anything under _airbyte_data
to the top. I was trying to replicate the effective behavior of all the client implementations of the former retrieveRecords
method.
Your example should produce.
{
"id": 42
}
5c9ea03
to
4d4e5db
Compare
@@ -42,7 +42,7 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu | |||
*/ | |||
@Deprecated("") | |||
@Throws(IOException::class) | |||
protected abstract fun writeRecord(record: AirbyteRecordMessage) | |||
protected abstract fun writeRecord(record: AirbyteRecordMessage, generationId: Long = 0) | |||
|
|||
/** | |||
* TODO: (ryankfu) move destination to use serialized record string instead of passing entire |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other consumers using the Async rely on this method. We were unable to delete the method on L45 because of S3 not moving to Async and remnants of old calls. Any chance we can achieve that in this PR ?
See this comment inline on Graphite.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above.
4d4e5db
to
b7615bb
Compare
2ed6b8e
to
32a40ca
Compare
b7615bb
to
4d4e5db
Compare
f97e80b
to
80ef98f
Compare
40277b3
to
51c23f8
Compare
51c23f8
to
1844494
Compare
1844494
to
33e3e0d
Compare
1844494
to
194365d
Compare
39aa845
to
1585419
Compare
3acf1ff
to
491ad7c
Compare
491ad7c
to
a626bae
Compare
What
Accomplishes the field-renaming part of making S3 V2 compliant.
_airbyte_meta.changes
includes conversion failures for avro and parquetHow
useV2Field[Name]s: bool
is passed to each converterFieldConversionFailureListener
(added to thejson-avro-converter
artifact here), whichnull
for a new value)NULLED
change object into the record (after it's completely processed)AvroRecordFactory
, which is indirectly initialized on buffer creation (ie, at the start of each flush)useV2Fields
StreamDescriptor -> generationId
and passes it to the s3 flush worker, which in turn passesgenerationId
to the convertersretrieveRecords
method stripped the meta fields, I changed this and added a shim so most test cases would still get the stripped rowsReview Guide
In addition to validating the above, please look out for implications for other destinations that use s3, avro/parquet, and/or the test framework
User Impact
This is a breaking change. However it will be bundled with two more PRs that also contain breaking changes
Can this PR be safely reverted and rolled back?