diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md index e13a3ea46e2ef..876d3ec19c07b 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md @@ -3,6 +3,7 @@ ### 4.36.0-beta.1 (Unreleased) #### Features Added +* Added the udfs `GetFeedRangesForContainer` and `GetOverlappingFeedRange` to ease mapping of cosmos partition key to databricks table partition key. - See [PR 43092](https://github.com/Azure/azure-sdk-for-java/pull/43092) #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md index dc9fb1ca7a56b..afd38f10d2d08 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md @@ -3,6 +3,7 @@ ### 4.36.0-beta.1 (Unreleased) #### Features Added +* Added the udfs `GetFeedRangesForContainer` and `GetOverlappingFeedRange` to ease mapping of cosmos partition key to databricks table partition key. - See [PR 43092](https://github.com/Azure/azure-sdk-for-java/pull/43092) #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md index d4042c11afe74..c683bcc589362 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md @@ -3,6 +3,7 @@ ### 4.36.0-beta.1 (Unreleased) #### Features Added +* Added the udfs `GetFeedRangesForContainer` and `GetOverlappingFeedRange` to ease mapping of cosmos partition key to databricks table partition key. - See [PR 43092](https://github.com/Azure/azure-sdk-for-java/pull/43092) #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md index 6e0fdadf810de..608d3e2497a36 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md @@ -3,6 +3,7 @@ ### 4.36.0-beta.1 (Unreleased) #### Features Added +* Added the udfs `GetFeedRangesForContainer` and `GetOverlappingFeedRange` to ease mapping of cosmos partition key to databricks table partition key. - See [PR 43092](https://github.com/Azure/azure-sdk-for-java/pull/43092) #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md index 02d6e74ac76fb..1baaf8697927f 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md @@ -3,6 +3,7 @@ ### 4.36.0-beta.1 (Unreleased) #### Features Added +* Added the udfs `GetFeedRangesForContainer` and `GetOverlappingFeedRange` to ease mapping of cosmos partition key to databricks table partition key. - See [PR 43092](https://github.com/Azure/azure-sdk-for-java/pull/43092) #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/SparkBridgeInternal.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/SparkBridgeInternal.scala index 8773aab1132c2..95d8107f3fb62 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/SparkBridgeInternal.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/SparkBridgeInternal.scala @@ -48,17 +48,6 @@ private[cosmos] object SparkBridgeInternal { s"${database.getClient.getServiceEndpoint}|${database.getId}|${container.getId}" } - private[cosmos] def getNormalizedEffectiveRange - ( - container: CosmosAsyncContainer, - feedRange: FeedRange - ) : NormalizedRange = { - - SparkBridgeImplementationInternal - .rangeToNormalizedRange( - container.getNormalizedEffectiveRange(feedRange).block) - } - private[cosmos] def getPartitionKeyRanges ( container: CosmosAsyncContainer diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/implementation/SparkBridgeImplementationInternal.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/implementation/SparkBridgeImplementationInternal.scala index 259485285b6a6..ad23682d42304 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/implementation/SparkBridgeImplementationInternal.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/implementation/SparkBridgeImplementationInternal.scala @@ -9,12 +9,13 @@ import com.azure.cosmos.implementation.guava25.base.MoreObjects.firstNonNull import com.azure.cosmos.implementation.guava25.base.Strings.emptyToNull import com.azure.cosmos.implementation.query.CompositeContinuationToken import com.azure.cosmos.implementation.routing.Range -import com.azure.cosmos.models.{FeedRange, PartitionKey, PartitionKeyBuilder, PartitionKeyDefinition, SparkModelBridgeInternal} +import com.azure.cosmos.models.{FeedRange, PartitionKey, PartitionKeyBuilder, PartitionKeyDefinition, PartitionKind, SparkModelBridgeInternal} import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait import com.azure.cosmos.spark.{ChangeFeedOffset, CosmosConstants, NormalizedRange} import com.azure.cosmos.{CosmosAsyncClient, CosmosClientBuilder, DirectConnectionConfig, SparkBridgeInternal} import com.fasterxml.jackson.databind.ObjectMapper +import scala.collection.convert.ImplicitConversions.`list asScalaBuffer` import scala.collection.mutable // scalastyle:off underscore.import @@ -189,6 +190,11 @@ private[cosmos] object SparkBridgeImplementationInternal extends BasicLoggingTra new Range[String](range.min, range.max, true, false) } + private[cosmos] def toCosmosRange(range: String): Range[String] = { + val parts = range.split("-") + new Range[String](parts(0), parts(1), true, false) + } + def doRangesOverlap(left: NormalizedRange, right: NormalizedRange): Boolean = { Range.checkOverlapping(toCosmosRange(left), toCosmosRange(right)) } @@ -204,7 +210,7 @@ private[cosmos] object SparkBridgeImplementationInternal extends BasicLoggingTra partitionKeyDefinitionJson: String ): NormalizedRange = { val pkDefinition = SparkModelBridgeInternal.createPartitionKeyDefinitionFromJson(partitionKeyDefinitionJson) - partitionKeyToNormalizedRange(new PartitionKey(partitionKeyValue), pkDefinition) + partitionKeyToNormalizedRange(getPartitionKeyValue(pkDefinition, partitionKeyValue), pkDefinition) } private[cosmos] def partitionKeyToNormalizedRange( @@ -220,28 +226,89 @@ private[cosmos] object SparkBridgeImplementationInternal extends BasicLoggingTra partitionKeyValueJsonArray: Object, partitionKeyDefinitionJson: String ): NormalizedRange = { + val pkDefinition = SparkModelBridgeInternal.createPartitionKeyDefinitionFromJson(partitionKeyDefinitionJson) + val partitionKey = getPartitionKeyValue(pkDefinition, partitionKeyValueJsonArray) + val feedRange = FeedRange + .forLogicalPartition(partitionKey) + .asInstanceOf[FeedRangePartitionKeyImpl] + + val effectiveRange = feedRange.getEffectiveRange(pkDefinition) + rangeToNormalizedRange(effectiveRange) + } + + private[cosmos] def trySplitFeedRanges + ( + cosmosClient: CosmosAsyncClient, + containerName: String, + databaseName: String, + targetedCount: Int + ): List[String] = { + val container = cosmosClient + .getDatabase(databaseName) + .getContainer(containerName) + + val epkRange = rangeToNormalizedRange(FeedRange.forFullRange().asInstanceOf[FeedRangeEpkImpl].getRange) + val feedRanges = SparkBridgeInternal.trySplitFeedRange(container, epkRange, targetedCount) + var normalizedRanges: List[String] = List() + for (i <- feedRanges.indices) { + normalizedRanges = normalizedRanges :+ s"${feedRanges(i).min}-${feedRanges(i).max}" + } + normalizedRanges + } - val partitionKey = new PartitionKeyBuilder() + private[cosmos] def getFeedRangesForContainer + ( + cosmosClient: CosmosAsyncClient, + containerName: String, + databaseName: String + ): List[String] = { + val container = cosmosClient + .getDatabase(databaseName) + .getContainer(containerName) + + val feedRanges: List[String] = List() + container.getFeedRanges().block.map(feedRange => { + val effectiveRangeFromPk = feedRange.asInstanceOf[FeedRangeEpkImpl].getEffectiveRange(null, null, null).block + val normalizedRange = rangeToNormalizedRange(effectiveRangeFromPk) + s"${normalizedRange.min}-${normalizedRange.max}" :: feedRanges + }) + feedRanges + } + + def getOverlappingRange(feedRanges: Array[String], pkValue: Object, pkDefinition: PartitionKeyDefinition): String = { + val pk = getPartitionKeyValue(pkDefinition, pkValue) + val feedRangeFromPk = FeedRange.forLogicalPartition(pk).asInstanceOf[FeedRangePartitionKeyImpl] + val effectiveRangeFromPk = feedRangeFromPk.getEffectiveRange(pkDefinition) + + for (i <- feedRanges.indices) { + val range = SparkBridgeImplementationInternal.toCosmosRange(feedRanges(i)) + if (range.contains(effectiveRangeFromPk.getMin)) { + return feedRanges(i) + } + } + throw new IllegalArgumentException("The partition key value does not belong to any of the feed ranges") + } + + private def getPartitionKeyValue(pkDefinition: PartitionKeyDefinition, pkValue: Object): PartitionKey = { + val partitionKey = new PartitionKeyBuilder() + var pk: PartitionKey = null + if (pkDefinition.getKind.equals(PartitionKind.MULTI_HASH)) { val objectMapper = new ObjectMapper() - val json = partitionKeyValueJsonArray.toString + val json = pkValue.toString try { - val partitionKeyValues = objectMapper.readValue(json, classOf[Array[String]]) - for (value <- partitionKeyValues) { - partitionKey.add(value.trim) - } - partitionKey.build() + val partitionKeyValues = objectMapper.readValue(json, classOf[Array[String]]) + for (value <- partitionKeyValues) { + partitionKey.add(value.trim) + } + pk = partitionKey.build() } catch { - case e: Exception => - logInfo("Invalid partition key paths: " + json, e) + case e: Exception => + logInfo("Invalid partition key paths: " + json, e) } - - val feedRange = FeedRange - .forLogicalPartition(partitionKey.build()) - .asInstanceOf[FeedRangePartitionKeyImpl] - - val pkDefinition = SparkModelBridgeInternal.createPartitionKeyDefinitionFromJson(partitionKeyDefinitionJson) - val effectiveRange = feedRange.getEffectiveRange(pkDefinition) - rangeToNormalizedRange(effectiveRange) + } else if (pkDefinition.getKind.equals(PartitionKind.HASH)) { + pk = new PartitionKey(pkValue) + } + pk } def setIoThreadCountPerCoreFactor diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetFeedRangesForContainer.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetFeedRangesForContainer.scala new file mode 100644 index 0000000000000..02f3482266e2f --- /dev/null +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetFeedRangesForContainer.scala @@ -0,0 +1,54 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.spark.udf + +import com.azure.cosmos.implementation.SparkBridgeImplementationInternal +import com.azure.cosmos.spark.{CosmosClientCache, CosmosClientCacheItem, CosmosClientConfiguration, CosmosConfig, CosmosContainerConfig, CosmosReadConfig, Loan} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.api.java.UDF2 + +@SerialVersionUID(1L) +class GetFeedRangesForContainer extends UDF2[Map[String, String], Option[Int], Array[String]] { + override def call + ( + userProvidedConfig: Map[String, String], + targetedCount: Option[Int] + ): Array[String] = { + + val effectiveUserConfig = CosmosConfig.getEffectiveConfig(None, None, userProvidedConfig) + var feedRanges = List[String]() + val cosmosContainerConfig: CosmosContainerConfig = + CosmosContainerConfig.parseCosmosContainerConfig(effectiveUserConfig, None, None) + val readConfig = CosmosReadConfig.parseCosmosReadConfig(effectiveUserConfig) + val cosmosClientConfig = CosmosClientConfiguration( + effectiveUserConfig, + useEventualConsistency = readConfig.forceEventualConsistency, + CosmosClientConfiguration.getSparkEnvironmentInfo(SparkSession.getActiveSession)) + Loan( + List[Option[CosmosClientCacheItem]]( + Some(CosmosClientCache( + cosmosClientConfig, + None, + s"UDF GetFeedRangesForContainer" + )) + )) + .to(cosmosClientCacheItems => { + + if (targetedCount.isEmpty) { + feedRanges = SparkBridgeImplementationInternal.getFeedRangesForContainer( + cosmosClientCacheItems.head.get.cosmosClient, + cosmosContainerConfig.container, + cosmosContainerConfig.database + ) + } else { + feedRanges = SparkBridgeImplementationInternal.trySplitFeedRanges( + cosmosClientCacheItems.head.get.cosmosClient, + cosmosContainerConfig.container, + cosmosContainerConfig.database, + targetedCount.get) + } + }) + feedRanges.toArray + + } +} diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetOverlappingFeedRange.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetOverlappingFeedRange.scala new file mode 100644 index 0000000000000..1d71b7ed5c674 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetOverlappingFeedRange.scala @@ -0,0 +1,23 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.spark.udf + +import com.azure.cosmos.implementation.SparkBridgeImplementationInternal +import com.azure.cosmos.models.SparkModelBridgeInternal +import com.azure.cosmos.spark.CosmosPredicates.requireNotNullOrEmpty +import org.apache.spark.sql.api.java.UDF3 + +@SerialVersionUID(1L) +class GetOverlappingFeedRange extends UDF3[String, Object, Array[String], String] { + override def call + ( + partitionKeyDefinitionJson: String, + partitionKeyValue: Object, + targetFeedRanges: Array[String] + ): String = { + requireNotNullOrEmpty(partitionKeyDefinitionJson, "partitionKeyDefinitionJson") + + val pkDefinition = SparkModelBridgeInternal.createPartitionKeyDefinitionFromJson(partitionKeyDefinitionJson) + SparkBridgeImplementationInternal.getOverlappingRange(targetFeedRanges, partitionKeyValue, pkDefinition) + } +} diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/FeedRangesForContainerITest.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/FeedRangesForContainerITest.scala new file mode 100644 index 0000000000000..35bd84ac6da1b --- /dev/null +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/FeedRangesForContainerITest.scala @@ -0,0 +1,142 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.spark + +import com.azure.cosmos.CosmosAsyncContainer +import com.azure.cosmos.implementation.{SparkBridgeImplementationInternal, TestConfigurations, Utils} +import com.azure.cosmos.models.CosmosQueryRequestOptions +import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait +import com.azure.cosmos.spark.udf.{GetFeedRangesForContainer, GetOverlappingFeedRange} +import com.fasterxml.jackson.databind.node.ObjectNode + +import java.util.UUID +import scala.collection.mutable + +class FeedRangesForContainerITest + extends IntegrationSpec + with SparkWithDropwizardAndSlf4jMetrics + with CosmosClient + with CosmosContainer + with BasicLoggingTrait + with MetricAssertions { + + //scalastyle:off multiple.string.literals + //scalastyle:off magic.number + + override def afterEach(): Unit = { + this.reinitializeContainer() + } + + "feed ranges" can "be split into different sub feed ranges" in { + + val cosmosEndpoint = TestConfigurations.HOST + val cosmosMasterKey = TestConfigurations.MASTER_KEY + + val cfg = Map( + "spark.cosmos.accountEndpoint" -> cosmosEndpoint, + "spark.cosmos.accountKey" -> cosmosMasterKey, + "spark.cosmos.database" -> cosmosDatabase, + "spark.cosmos.container" -> cosmosContainer, + ) + var pkDefinition = "{\"paths\":[\"/id\"],\"kind\":\"Hash\"}" + val feedRanges = new GetFeedRangesForContainer().call(cfg, Option(5)) + val expectedFeedRanges = Array("-05C1C9CD673398", "05C1C9CD673398-05C1D9CD673398", + "05C1D9CD673398-05C1E399CD6732", "05C1E399CD6732-05C1E9CD673398", "05C1E9CD673398-FF") + + + + assert(feedRanges.sameElements(expectedFeedRanges), "Feed ranges do not match the expected values") + val lastId = "45170a78-eac0-4d3a-be5e-9b00bb5f4649" + + var feedRangeResult = new GetOverlappingFeedRange().call(pkDefinition, lastId, expectedFeedRanges) + assert(feedRangeResult == "-05C1C9CD673398", "feed range does not match the expected value") + + // test with hpk partition key definition + pkDefinition = "{\"paths\":[\"/tenantId\",\"/userId\",\"/sessionId\"],\"kind\":\"MultiHash\"}" + val pkValues = "[\"" + lastId + "\"]" + + feedRangeResult = new GetOverlappingFeedRange().call(pkDefinition, pkValues, expectedFeedRanges) + assert(feedRangeResult == "05C1E9CD673398-FF", "feed range does not match the expected value") + + } + + "feed ranges" can "be mapped to new partition key" in { + feedRangesForPK(false) + } + + "feed ranges" can "be mapped to new hierarchical partition key" in { + feedRangesForPK(true) + } + + def feedRangesForPK(hpk: Boolean): Unit = { + + val cosmosEndpoint = TestConfigurations.HOST + val cosmosMasterKey = TestConfigurations.MASTER_KEY + + val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainer) + val docs = createItems(container, 50, hpk) + val cfg = Map( + "spark.cosmos.accountEndpoint" -> cosmosEndpoint, + "spark.cosmos.accountKey" -> cosmosMasterKey, + "spark.cosmos.database" -> cosmosDatabase, + "spark.cosmos.container" -> cosmosContainer, + ) + + val pkDefinition = if (hpk) {"{\"paths\":[\"/tenantId\",\"/userId\",\"/sessionId\"],\"kind\":\"MultiHash\"}"} + else {"{\"paths\":[\"/id\"],\"kind\":\"Hash\"}"} + + val feedRanges = new GetFeedRangesForContainer().call(cfg, Option(5)) + + val feedRangeToDocsMap = mutable.Map[String, List[ObjectNode]]().withDefaultValue(List()) + + for (doc <- docs) { + val lastId = if (!hpk) doc.get("id").asText() else "[\"" + doc.get("tenantId").asText() + "\"]" + val feedRange = new GetOverlappingFeedRange().call(pkDefinition, lastId, feedRanges) + // Add the document to the corresponding feed range in the map + feedRangeToDocsMap(feedRange) = doc :: feedRangeToDocsMap(feedRange) + } + + for (i <- feedRanges.indices) { + val range = SparkBridgeImplementationInternal.toCosmosRange(feedRanges(i)) + val feedRange = SparkBridgeImplementationInternal.toFeedRange(SparkBridgeImplementationInternal.rangeToNormalizedRange(range)) + val requestOptions = new CosmosQueryRequestOptions().setFeedRange(feedRange) + container.queryItems("SELECT * FROM c", requestOptions, classOf[ObjectNode]).byPage().collectList().block().forEach { rsp => + val results = rsp.getResults + var numDocs = 0 + val expectedResults = feedRangeToDocsMap(feedRanges(i)) + results.forEach(doc => { + assert(expectedResults.collect({ + case expectedDoc if expectedDoc.get("id").asText() == doc.get("id").asText() => expectedDoc + }).size >= 0, "Document not found in the expected feed range") + numDocs += 1 + }) + assert(numDocs == results.size(), "Number of documents in the target feed range does not match the number of docs for that feed range") + } + } + } + + def createItems(container: CosmosAsyncContainer, numOfItems: Int, hpk: Boolean): Array[ObjectNode] = { + val docs = new Array[ObjectNode](numOfItems) + for (sequenceNumber <- 1 to numOfItems) { + val lastId = UUID.randomUUID().toString + val objectNode = Utils.getSimpleObjectMapper.createObjectNode() + objectNode.put("name", "Shrodigner's cat") + objectNode.put("type", "cat") + objectNode.put("age", 20) + objectNode.put("sequenceNumber", sequenceNumber) + objectNode.put("id", lastId) + if (hpk) { + objectNode.put("tenantId", lastId) + objectNode.put("userId", "userId1") + objectNode.put("sessionId", "sessionId1") + } + docs(sequenceNumber - 1) = objectNode + container.createItem(objectNode).block() + } + docs + } + + //scalastyle:on magic.number + //scalastyle:on multiple.string.literals +} + diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EChangeFeedITest.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EChangeFeedITest.scala index f4c858ef0bf48..90cdd508bcce3 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EChangeFeedITest.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EChangeFeedITest.scala @@ -5,11 +5,9 @@ package com.azure.cosmos.spark import com.azure.cosmos.SparkBridgeInternal import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState import com.azure.cosmos.implementation.{TestConfigurations, Utils} -import com.azure.cosmos.models.PartitionKey import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait import com.azure.cosmos.spark.udf.{CreateChangeFeedOffsetFromSpark2, CreateSpark2ContinuationsFromChangeFeedOffset, GetFeedRangeForPartitionKeyValue} import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.spark.sql.functions import org.apache.spark.sql.types._ import java.io.{BufferedReader, InputStreamReader} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerTest.java index d2c6093c3d3ee..edecf71badec7 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerTest.java @@ -668,7 +668,6 @@ public void getNormalizedFeedRanges_HashV2() { public void getFeedRanges_withMultiplePartitions() throws Exception { String collectionName = UUID.randomUUID().toString(); CosmosContainerProperties containerProperties = getCollectionDefinition(collectionName); - CosmosContainerRequestOptions options = new CosmosContainerRequestOptions(); CosmosContainerResponse containerResponse = createdDatabase.createContainer( containerProperties, ThroughputProperties.createManualThroughput(18000)); @@ -697,11 +696,6 @@ public void getFeedRanges_withMultiplePartitions() throws Exception { .isNotNull() .hasSize(3); - String leftMin = getEffectiveRange(syncContainer, feedRangesAfterSplit.get(0)).getMin(); - String rightMin = firstEpkRange.getMin(); - String leftMax = getEffectiveRange(syncContainer, feedRangesAfterSplit.get(0)).getMax(); - String rightMax = firstEpkRange.getMax(); - assertThat(getEffectiveRange(syncContainer, feedRangesAfterSplit.get(0)).equals(firstEpkRange)) .isTrue(); @@ -716,7 +710,6 @@ public void getFeedRanges_withMultiplePartitions() throws Exception { public void getFeedRanges_withMultiplePartitions_HashV2() throws Exception { String collectionName = UUID.randomUUID().toString(); CosmosContainerProperties containerProperties = getCollectionDefinitionForHashV2(collectionName); - CosmosContainerRequestOptions options = new CosmosContainerRequestOptions(); CosmosContainerResponse containerResponse = createdDatabase.createContainer( containerProperties, ThroughputProperties.createManualThroughput(18000)); @@ -752,11 +745,6 @@ public void getFeedRanges_withMultiplePartitions_HashV2() throws Exception { .isNotNull() .hasSize(3); - String leftMin = getEffectiveRange(syncContainer, feedRangesAfterSplit.get(0)).getMin(); - String rightMin = firstEpkRange.getMin(); - String leftMax = getEffectiveRange(syncContainer, feedRangesAfterSplit.get(0)).getMax(); - String rightMax = firstEpkRange.getMax(); - assertThat(getEffectiveRange(syncContainer, feedRangesAfterSplit.get(0)).equals(firstEpkRange)) .isTrue(); @@ -767,6 +755,92 @@ public void getFeedRanges_withMultiplePartitions_HashV2() throws Exception { .isTrue(); } + @Test(groups = { "emulator" }, timeOut = TIMEOUT) + public void getFeedRanges_withMultiplePartitions_HashV2_HPK() { + String collectionName = UUID.randomUUID().toString(); + CosmosContainerProperties containerProperties = getCollectionDefinitionForHashV2WithHpk(collectionName); + createdDatabase.createContainer( + containerProperties, + ThroughputProperties.createManualThroughput(11000)); + this.createdContainer = createdDatabase.getContainer(collectionName); + + CosmosContainer syncContainer = createdDatabase.getContainer(collectionName); + + List feedRanges = syncContainer.getFeedRanges(); + assertThat(feedRanges) + .isNotNull() + .hasSize(2); + + assertFeedRange( + feedRanges.get(0), + "{\"Range\":{\"min\":\"\",\"max\":\"1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF\"}}"); + assertFeedRange( + feedRanges.get(1), + "{\"Range\":{\"min\":\"1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF\"," + + "\"max\":\"FF\"}}"); + + Range firstEpkRange = getEffectiveRange(syncContainer, feedRanges.get(0)); + Range secondEpkRange = getEffectiveRange(syncContainer, feedRanges.get(1)); + + List feedRangesAfterSplit = syncContainer + .asyncContainer + .trySplitFeedRange(FeedRange.forFullRange(), 2) + .block(); + assertThat(feedRangesAfterSplit) + .isNotNull() + .hasSize(2); + + assertThat(getEffectiveRange(syncContainer, feedRangesAfterSplit.get(0)).equals(firstEpkRange)) + .isTrue(); + + assertThat(getEffectiveRange(syncContainer, feedRangesAfterSplit.get(1)).equals(secondEpkRange)) + .isTrue(); + + } + + @Test(groups = { "emulator" }, timeOut = TIMEOUT) + public void getFeedRanges_withMultiplePartitions_HashV1_HPK() { + String collectionName = UUID.randomUUID().toString(); + CosmosContainerProperties containerProperties = getCollectionDefinitionForHashV2WithHpk(collectionName); + createdDatabase.createContainer( + containerProperties, + ThroughputProperties.createManualThroughput(11000)); + this.createdContainer = createdDatabase.getContainer(collectionName); + + CosmosContainer syncContainer = createdDatabase.getContainer(collectionName); + + List feedRanges = syncContainer.getFeedRanges(); + assertThat(feedRanges) + .isNotNull() + .hasSize(2); + + assertFeedRange( + feedRanges.get(0), + "{\"Range\":{\"min\":\"\",\"max\":\"1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF\"}}"); + assertFeedRange( + feedRanges.get(1), + "{\"Range\":{\"min\":\"1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF\"," + + "\"max\":\"FF\"}}"); + + Range firstEpkRange = getEffectiveRange(syncContainer, feedRanges.get(0)); + Range secondEpkRange = getEffectiveRange(syncContainer, feedRanges.get(1)); + + List feedRangesAfterSplit = syncContainer + .asyncContainer + .trySplitFeedRange(FeedRange.forFullRange(), 2) + .block(); + assertThat(feedRangesAfterSplit) + .isNotNull() + .hasSize(2); + + assertThat(getEffectiveRange(syncContainer, feedRangesAfterSplit.get(0)).equals(firstEpkRange)) + .isTrue(); + + assertThat(getEffectiveRange(syncContainer, feedRangesAfterSplit.get(1)).equals(secondEpkRange)) + .isTrue(); + + } + private static Range getEffectiveRange(CosmosContainer container, FeedRange feedRange) { AsyncDocumentClient clientWrapper = container.asyncContainer.getDatabase().getDocClientWrapper(); return FeedRangeInternal diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java index 82d78e196cac4..6d7d54b725c9c 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java @@ -56,6 +56,7 @@ import com.azure.cosmos.models.PartitionKey; import com.azure.cosmos.models.PartitionKeyDefinition; import com.azure.cosmos.models.PartitionKeyDefinitionVersion; +import com.azure.cosmos.models.PartitionKind; import com.azure.cosmos.models.SqlQuerySpec; import com.azure.cosmos.models.ThroughputProperties; import com.azure.cosmos.util.CosmosPagedFlux; @@ -671,6 +672,32 @@ static protected CosmosContainerProperties getCollectionDefinitionForHashV2(Stri return collectionDefinition; } + static protected CosmosContainerProperties getCollectionDefinitionForHashV2WithHpk(String collectionId) { + PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition(); + ArrayList paths = new ArrayList<>(); + paths.add("/state"); + paths.add("/city"); + paths.add("/zipcode"); + partitionKeyDef.setPaths(paths); + partitionKeyDef.setVersion(PartitionKeyDefinitionVersion.V2); + partitionKeyDef.setKind(PartitionKind.MULTI_HASH); + + return new CosmosContainerProperties(collectionId, partitionKeyDef); + } + + + static protected CosmosContainerProperties getCollectionDefinitionWithHpk(String collectionId) { + PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition(); + ArrayList paths = new ArrayList<>(); + paths.add("/state"); + paths.add("/city"); + paths.add("/zipcode"); + partitionKeyDef.setPaths(paths); + partitionKeyDef.setKind(PartitionKind.MULTI_HASH); + + return new CosmosContainerProperties(collectionId, partitionKeyDef); + } + static protected CosmosContainerProperties getCollectionDefinitionWithRangeRangeIndexWithIdAsPartitionKey() { return getCollectionDefinitionWithRangeRangeIndex(Collections.singletonList("/id")); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java index e15a88e126186..188a9fd7e468f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java @@ -2676,8 +2676,8 @@ Mono> getFeedRanges(boolean forceRefresh) { } /** - * Attempts to split a feedrange into {@lparamtargetedCountAfterAplit} sub ranges. This is a best - * effort - it is possible that the list of feed ranges returned has less than {@lparamtargetedCountAfterAplit} + * Attempts to split a feedrange into {@lparamtargetedCountAfterSplit} sub ranges. This is a best + * effort - it is possible that the list of feed ranges returned has less than {@lparamtargetedCountAfterSplit} * sub ranges * @param feedRange * @param targetedCountAfterSplit diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeInternal.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeInternal.java index 65151fe9d309b..0e7b693537279 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeInternal.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeInternal.java @@ -17,7 +17,6 @@ import com.azure.cosmos.models.FeedRange; import com.azure.cosmos.models.PartitionKeyDefinition; import com.azure.cosmos.models.PartitionKeyDefinitionVersion; -import com.azure.cosmos.models.PartitionKind; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize; @@ -220,18 +219,15 @@ public Mono> trySplit( PartitionKeyDefinition pkDefinition = collectionValueHolder.v.getPartitionKey(); - if (targetedSplitCount <= 1 || - effectiveRange.isSingleValue() || - // splitting ranges into sub ranges only possible for hash partitioning - pkDefinition.getKind() != PartitionKind.HASH) { + if (targetedSplitCount <= 1 || effectiveRange.isSingleValue()) { return Collections.singletonList(new FeedRangeEpkImpl(effectiveRange)); } PartitionKeyDefinitionVersion effectivePKVersion = - pkDefinition.getVersion() != null - ? pkDefinition.getVersion() - : PartitionKeyDefinitionVersion.V1; + pkDefinition.getVersion() != null + ? pkDefinition.getVersion() + : PartitionKeyDefinitionVersion.V1; switch (effectivePKVersion) { case V1: return trySplitWithHashV1(effectiveRange, targetedSplitCount); @@ -245,6 +241,7 @@ public Mono> trySplit( }); } + static List trySplitWithHashV1( Range effectiveRange, int targetedSplitCount) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/PartitionKeyBuilder.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/PartitionKeyBuilder.java index fc7647bdae472..d57e330093100 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/PartitionKeyBuilder.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/PartitionKeyBuilder.java @@ -3,19 +3,11 @@ package com.azure.cosmos.models; -import com.azure.cosmos.implementation.JsonSerializable; -import com.azure.cosmos.implementation.PartitionKeyHelper; import com.azure.cosmos.implementation.Undefined; -import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.routing.PartitionKeyInternal; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.node.ObjectNode; import java.util.ArrayList; import java.util.List; -import java.util.Map; - -import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; /** * Builder for partition keys.