Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Change Feed Full Fidelity Tests #43483

Merged
merged 8 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Added null checking for previous images for deletes in full fidelity change feed. - See [PR 43483](https://github.com/Azure/azure-sdk-for-java/pull/43483)

#### Other Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Added null checking for previous images for deletes in full fidelity change feed. - See [PR 43483](https://github.com/Azure/azure-sdk-for-java/pull/43483)

#### Other Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Added null checking for previous images for deletes in full fidelity change feed. - See [PR 43483](https://github.com/Azure/azure-sdk-for-java/pull/43483)

#### Other Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Added null checking for previous images for deletes in full fidelity change feed. - See [PR 43483](https://github.com/Azure/azure-sdk-for-java/pull/43483)

#### Other Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Added null checking for previous images for deletes in full fidelity change feed. - See [PR 43483](https://github.com/Azure/azure-sdk-for-java/pull/43483)

#### Other Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,7 @@ private[cosmos] class CosmosRowConverterBase(
private def parseTtlExpired(objectNode: ObjectNode): Boolean = {
objectNode.get(MetadataJsonBodyAttributeName) match {
case metadataNode: JsonNode =>
metadataNode.get(TimeToLiveExpiredPropertyName) match {
return metadataNode.get(TimeToLiveExpiredPropertyName) match {
tvaron3 marked this conversation as resolved.
Show resolved Hide resolved
case valueNode: JsonNode =>
Option(valueNode).fold(false)(v => v.asBoolean(false))
case _ => false
Expand All @@ -705,29 +705,38 @@ private[cosmos] class CosmosRowConverterBase(

private def parseId(objectNode: ObjectNode): String = {
val currentNode = getCurrentOrPreviousNode(objectNode)
currentNode.get(IdAttributeName) match {
case valueNode: JsonNode =>
Option(valueNode).fold(null: String)(v => v.asText(null))
case _ => null
if (currentNode != null) {
return currentNode.get(IdAttributeName) match {
case valueNode: JsonNode =>
Option(valueNode).fold(null: String)(v => v.asText(null))
case _ => null
}
}
null
}

private def parseTimestamp(objectNode: ObjectNode): Long = {
val currentNode = getCurrentOrPreviousNode(objectNode)
currentNode.get(TimestampAttributeName) match {
case valueNode: JsonNode =>
Option(valueNode).fold(-1L)(v => v.asLong(-1))
case _ => -1L
if (currentNode != null) {
return currentNode.get(TimestampAttributeName) match {
case valueNode: JsonNode =>
Option(valueNode).fold(-1L)(v => v.asLong(-1))
case _ => -1L
}
}
-1L
}

private def parseETag(objectNode: ObjectNode): String = {
val currentNode = getCurrentOrPreviousNode(objectNode)
currentNode.get(ETagAttributeName) match {
case valueNode: JsonNode =>
Option(valueNode).fold(null: String)(v => v.asText(null))
case _ => null
if (currentNode != null) {
return currentNode.get(ETagAttributeName) match {
case valueNode: JsonNode =>
Option(valueNode).fold(null: String)(v => v.asText(null))
case _ => null
}
}
null
}

private def getCurrentOrPreviousNode(objectNode: ObjectNode): JsonNode = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,104 +444,104 @@ class SparkE2EChangeFeedITest
rowsArray2 should have size 50 - initialCount
}

// "spark change feed query (full fidelity)" should "honor checkpoint location" in {
// val cosmosEndpoint = TestConfigurations.HOST
// val cosmosMasterKey = TestConfigurations.MASTER_KEY
//
// val checkpointLocation = s"/tmp/checkpoints/${UUID.randomUUID().toString}"
// val cfg = Map(
// "spark.cosmos.accountEndpoint" -> cosmosEndpoint,
// "spark.cosmos.accountKey" -> cosmosMasterKey,
// "spark.cosmos.database" -> cosmosDatabase,
// "spark.cosmos.container" -> cosmosContainer,
// "spark.cosmos.read.inferSchema.enabled" -> "false",
// "spark.cosmos.changeFeed.mode" -> "FullFidelity",
// "spark.cosmos.changeFeed.startFrom" -> "NOW",
// "spark.cosmos.read.partitioning.strategy" -> "Restrictive",
// "spark.cosmos.changeFeed.batchCheckpointLocation" -> checkpointLocation
// )
//
// val df1 = spark.read.format("cosmos.oltp.changeFeed").options(cfg).load()
// val rowsArray1 = df1.collect()
// rowsArray1.length == 0 shouldEqual true
//
// df1.schema.equals(
// ChangeFeedTable.defaultFullFidelityChangeFeedSchemaForInferenceDisabled) shouldEqual true
//
// val hdfs = org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration)
//
// val startOffsetFolderLocation = Paths.get(checkpointLocation, "startOffset").toString
// val startOffsetFileLocation = Paths.get(startOffsetFolderLocation, "0").toString
// hdfs.exists(new Path(startOffsetFolderLocation)) shouldEqual true
// hdfs.exists(new Path(startOffsetFileLocation)) shouldEqual false
//
// val latestOffsetFolderLocation = Paths.get(checkpointLocation, "latestOffset").toString
// val latestOffsetFileLocation = Paths.get(latestOffsetFolderLocation, "0").toString
// hdfs.exists(new Path(latestOffsetFolderLocation)) shouldEqual true
// hdfs.exists(new Path(latestOffsetFileLocation)) shouldEqual true
//
// // TODO - check for the offset structure to make sure it looks like the new lease format.
//
// hdfs.copyToLocalFile(true, new Path(latestOffsetFileLocation), new Path(startOffsetFileLocation))
// assert(!hdfs.exists(new Path(latestOffsetFileLocation)))
//
// val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainer)
//
// val createdObjectIds = new ArrayBuffer[String]()
// val replacedObjectIds = new ArrayBuffer[String]()
// val deletedObjectIds = new ArrayBuffer[String]()
// for (sequenceNumber <- 1 to 5) {
// val objectNode = Utils.getSimpleObjectMapper.createObjectNode()
// objectNode.put("name", "Shrodigner's cat")
// objectNode.put("type", "cat")
// objectNode.put("age", 20)
// objectNode.put("sequenceNumber", sequenceNumber)
// val id = UUID.randomUUID().toString
// objectNode.put("id", id)
// createdObjectIds += id
// if (sequenceNumber % 2 == 0) {
// replacedObjectIds += id
// }
// if (sequenceNumber % 3 == 0) {
// deletedObjectIds += id
// }
// container.createItem(objectNode).block()
// }
//
// for (id <- replacedObjectIds) {
// val objectNode = Utils.getSimpleObjectMapper.createObjectNode()
// objectNode.put("name", "Shrodigner's cat")
// objectNode.put("type", "dog")
// objectNode.put("age", 25)
// objectNode.put("id", id)
// container.replaceItem(objectNode, id, new PartitionKey(id)).block()
// }
//
// for (id <- deletedObjectIds) {
// container.deleteItem(id, new PartitionKey(id)).block()
// }
//
// // wait for the log store to get these changes
// Thread.sleep(2000)
//
// val df2 = spark.read.format("cosmos.oltp.changeFeed").options(cfg).load()
// val groupedFrame = df2.groupBy(CosmosTableSchemaInferrer.OperationTypeAttributeName)
// .agg(functions.collect_list("id").as("ids"))
//
// val collectedFrame = groupedFrame.collect()
// collectedFrame.foreach(row => {
// val wrappedArray = row.get(1).asInstanceOf[mutable.WrappedArray[String]]
// val array = wrappedArray.array
// row.get(0) match {
// case "create" =>
// validateArraysUnordered(createdObjectIds, array)
// case "replace" =>
// validateArraysUnordered(replacedObjectIds, array)
// case "delete" =>
// validateArraysUnordered(deletedObjectIds, array)
// }
// })
// }
"spark change feed query (full fidelity)" should "honor checkpoint location" in {
val cosmosEndpoint = TestConfigurations.HOST
val cosmosMasterKey = TestConfigurations.MASTER_KEY

val checkpointLocation = s"/tmp/checkpoints/${UUID.randomUUID().toString}"
val cfg = Map(
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainer,
"spark.cosmos.read.inferSchema.enabled" -> "false",
"spark.cosmos.changeFeed.mode" -> "FullFidelity",
"spark.cosmos.changeFeed.startFrom" -> "NOW",
"spark.cosmos.read.partitioning.strategy" -> "Restrictive",
"spark.cosmos.changeFeed.batchCheckpointLocation" -> checkpointLocation
)

val df1 = spark.read.format("cosmos.oltp.changeFeed").options(cfg).load()
val rowsArray1 = df1.collect()
rowsArray1.length == 0 shouldEqual true

df1.schema.equals(
ChangeFeedTable.defaultFullFidelityChangeFeedSchemaForInferenceDisabled) shouldEqual true

val hdfs = org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration)

val startOffsetFolderLocation = Paths.get(checkpointLocation, "startOffset").toString
val startOffsetFileLocation = Paths.get(startOffsetFolderLocation, "0").toString
hdfs.exists(new Path(startOffsetFolderLocation)) shouldEqual true
hdfs.exists(new Path(startOffsetFileLocation)) shouldEqual false

val latestOffsetFolderLocation = Paths.get(checkpointLocation, "latestOffset").toString
val latestOffsetFileLocation = Paths.get(latestOffsetFolderLocation, "0").toString
hdfs.exists(new Path(latestOffsetFolderLocation)) shouldEqual true
hdfs.exists(new Path(latestOffsetFileLocation)) shouldEqual true

// TODO - check for the offset structure to make sure it looks like the new lease format.

hdfs.copyToLocalFile(true, new Path(latestOffsetFileLocation), new Path(startOffsetFileLocation))
assert(!hdfs.exists(new Path(latestOffsetFileLocation)))

val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainer)

val createdObjectIds = new ArrayBuffer[String]()
val replacedObjectIds = new ArrayBuffer[String]()
val deletedObjectIds = new ArrayBuffer[String]()
for (sequenceNumber <- 1 to 5) {
val objectNode = Utils.getSimpleObjectMapper.createObjectNode()
objectNode.put("name", "Shrodigner's cat")
objectNode.put("type", "cat")
objectNode.put("age", 20)
objectNode.put("sequenceNumber", sequenceNumber)
val id = UUID.randomUUID().toString
objectNode.put("id", id)
createdObjectIds += id
if (sequenceNumber % 2 == 0) {
replacedObjectIds += id
}
if (sequenceNumber % 3 == 0) {
deletedObjectIds += id
}
container.createItem(objectNode).block()
}

for (id <- replacedObjectIds) {
val objectNode = Utils.getSimpleObjectMapper.createObjectNode()
objectNode.put("name", "Shrodigner's cat")
objectNode.put("type", "dog")
objectNode.put("age", 25)
objectNode.put("id", id)
container.replaceItem(objectNode, id, new PartitionKey(id)).block()
}

for (id <- deletedObjectIds) {
container.deleteItem(id, new PartitionKey(id)).block()
}

// wait for the log store to get these changes
Thread.sleep(2000)

val df2 = spark.read.format("cosmos.oltp.changeFeed").options(cfg).load()
val groupedFrame = df2.groupBy(CosmosTableSchemaInferrer.OperationTypeAttributeName)
.agg(functions.collect_list("id").as("ids"))

val collectedFrame = groupedFrame.collect()
collectedFrame.foreach(row => {
val wrappedArray = row.get(1).asInstanceOf[mutable.WrappedArray[String]]
val array = wrappedArray.array
row.get(0) match {
case "create" =>
validateArraysUnordered(createdObjectIds, array)
case "replace" =>
validateArraysUnordered(replacedObjectIds, array)
case "delete" =>
validateArraysUnordered(deletedObjectIds, array)
}
})
}

"spark change feed query (incremental)" can "proceed with simulated Spark2 Checkpoint" in {
val cosmosEndpoint = TestConfigurations.HOST
Expand Down
Loading
Loading