From 2a386f2f2e0dff276ab8a6c6514b96b140b0faa8 Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Mon, 25 Nov 2024 20:41:12 -0500 Subject: [PATCH 1/5] Added udfs and some tests for splitting ranges with hpks --- .../azure/cosmos/SparkBridgeInternal.scala | 11 --- .../SparkBridgeImplementationInternal.scala | 59 ++++++++++- .../spark/udf/GetBucketForPartitionKey.scala | 23 +++++ .../spark/udf/GetFeedRangesForBuckets.scala | 25 +++++ .../spark/FeedRangesForBucketsITest.scala | 52 ++++++++++ .../spark/SparkE2EChangeFeedITest.scala | 2 +- .../com/azure/cosmos/CosmosContainerTest.java | 98 ++++++++++++++++--- .../com/azure/cosmos/rx/TestSuiteBase.java | 27 +++++ .../azure/cosmos/CosmosAsyncContainer.java | 4 +- .../feedranges/FeedRangeInternal.java | 37 +++---- .../cosmos/models/PartitionKeyBuilder.java | 8 -- 11 files changed, 293 insertions(+), 53 deletions(-) create mode 100644 sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetBucketForPartitionKey.scala create mode 100644 sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetFeedRangesForBuckets.scala create mode 100644 sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/FeedRangesForBucketsITest.scala diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/SparkBridgeInternal.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/SparkBridgeInternal.scala index 8773aab1132c2..95d8107f3fb62 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/SparkBridgeInternal.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/SparkBridgeInternal.scala @@ -48,17 +48,6 @@ private[cosmos] object SparkBridgeInternal { s"${database.getClient.getServiceEndpoint}|${database.getId}|${container.getId}" } - private[cosmos] def getNormalizedEffectiveRange - ( - container: CosmosAsyncContainer, - feedRange: FeedRange - ) : NormalizedRange = { - - SparkBridgeImplementationInternal - .rangeToNormalizedRange( - container.getNormalizedEffectiveRange(feedRange).block) - } - private[cosmos] def getPartitionKeyRanges ( container: CosmosAsyncContainer diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/implementation/SparkBridgeImplementationInternal.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/implementation/SparkBridgeImplementationInternal.scala index 259485285b6a6..4f95e12a6f65b 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/implementation/SparkBridgeImplementationInternal.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/implementation/SparkBridgeImplementationInternal.scala @@ -9,12 +9,13 @@ import com.azure.cosmos.implementation.guava25.base.MoreObjects.firstNonNull import com.azure.cosmos.implementation.guava25.base.Strings.emptyToNull import com.azure.cosmos.implementation.query.CompositeContinuationToken import com.azure.cosmos.implementation.routing.Range -import com.azure.cosmos.models.{FeedRange, PartitionKey, PartitionKeyBuilder, PartitionKeyDefinition, SparkModelBridgeInternal} +import com.azure.cosmos.models.{FeedRange, PartitionKey, PartitionKeyBuilder, PartitionKeyDefinition, PartitionKind, SparkModelBridgeInternal} import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait import com.azure.cosmos.spark.{ChangeFeedOffset, CosmosConstants, NormalizedRange} import com.azure.cosmos.{CosmosAsyncClient, CosmosClientBuilder, DirectConnectionConfig, SparkBridgeInternal} import com.fasterxml.jackson.databind.ObjectMapper +import scala.collection.convert.ImplicitConversions.`list asScalaBuffer` import scala.collection.mutable // scalastyle:off underscore.import @@ -189,6 +190,11 @@ private[cosmos] object SparkBridgeImplementationInternal extends BasicLoggingTra new Range[String](range.min, range.max, true, false) } + private[cosmos] def toCosmosRange(range: String): Range[String] = { + val parts = range.split("-") + new Range[String](parts(0), parts(1), true, false) + } + def doRangesOverlap(left: NormalizedRange, right: NormalizedRange): Boolean = { Range.checkOverlapping(toCosmosRange(left), toCosmosRange(right)) } @@ -244,6 +250,57 @@ private[cosmos] object SparkBridgeImplementationInternal extends BasicLoggingTra rangeToNormalizedRange(effectiveRange) } + private[cosmos] def trySplitFeedRanges + ( + partitionKeyDefinitionJson: String, + feedRange: FeedRangeEpkImpl, + bucketCount: Int + ): Array[String] = { + + val pkDefinition = SparkModelBridgeInternal.createPartitionKeyDefinitionFromJson(partitionKeyDefinitionJson) + val feedRanges = FeedRangeInternal.trySplitCore(pkDefinition, feedRange.getRange, bucketCount) + val normalizedRanges = new Array[String](feedRanges.size()) + for (i <- feedRanges.indices) { + val normalizedRange = rangeToNormalizedRange(feedRanges(i).getRange) + normalizedRanges(i) = s"${normalizedRange.min}-${normalizedRange.max}" + } + normalizedRanges + } + + def findBucket(feedRanges: Array[String], pkValue: Object, pkDefinition: PartitionKeyDefinition):Int = { + val partitionKey = new PartitionKeyBuilder() + var pk: PartitionKey = null + // refactor this + if (pkDefinition.getKind == PartitionKind.MULTI_HASH) { + val objectMapper = new ObjectMapper() + val json = pkValue.toString + try { + val partitionKeyValues = objectMapper.readValue(json, classOf[Array[String]]) + for (value <- partitionKeyValues) { + partitionKey.add(value.trim) + } + pk = partitionKey.build() + } catch { + case e: Exception => + logInfo("Invalid partition key paths: " + json, e) + } + } else if (pkDefinition.getKind == PartitionKind.HASH) { + pk = new PartitionKey(pkValue) + } + val feedRangeFromPk = FeedRange.forLogicalPartition(pk).asInstanceOf[FeedRangePartitionKeyImpl] + val effectiveRangeFromPk = feedRangeFromPk.getEffectiveRange(pkDefinition) + + for (i <- feedRanges.indices) { + val range = SparkBridgeImplementationInternal.toCosmosRange(feedRanges(i)) + if (range.contains(effectiveRangeFromPk.getMin)) { + return i + } + + } + -1 + + } + def setIoThreadCountPerCoreFactor ( config: DirectConnectionConfig, diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetBucketForPartitionKey.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetBucketForPartitionKey.scala new file mode 100644 index 0000000000000..33d672bdbcb72 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetBucketForPartitionKey.scala @@ -0,0 +1,23 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.spark.udf + +import com.azure.cosmos.implementation.SparkBridgeImplementationInternal +import com.azure.cosmos.models.SparkModelBridgeInternal +import com.azure.cosmos.spark.CosmosPredicates.requireNotNullOrEmpty +import org.apache.spark.sql.api.java.UDF3 + +@SerialVersionUID(1L) +class GetBucketForPartitionKey extends UDF3[String, Object, Array[String], Int] { + override def call + ( + partitionKeyDefinitionJson: String, + partitionKeyValue: Object, + feedRangesForBuckets: Array[String] + ): Int = { + requireNotNullOrEmpty(partitionKeyDefinitionJson, "partitionKeyDefinitionJson") + + val pkDefinition = SparkModelBridgeInternal.createPartitionKeyDefinitionFromJson(partitionKeyDefinitionJson) + SparkBridgeImplementationInternal.findBucket(feedRangesForBuckets, partitionKeyValue, pkDefinition) + } +} \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetFeedRangesForBuckets.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetFeedRangesForBuckets.scala new file mode 100644 index 0000000000000..005dfddcb1158 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetFeedRangesForBuckets.scala @@ -0,0 +1,25 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.spark.udf + +import com.azure.cosmos.implementation.SparkBridgeImplementationInternal +import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl +import com.azure.cosmos.models.FeedRange +import com.azure.cosmos.spark.CosmosPredicates.requireNotNullOrEmpty +import org.apache.spark.sql.api.java.UDF2 + +@SerialVersionUID(1L) +class GetFeedRangesForBuckets extends UDF2[String, Int, Array[String]] { + override def call + ( + partitionKeyDefinitionJson: String, + bucketCount: Int + ): Array[String] = { + + requireNotNullOrEmpty(partitionKeyDefinitionJson, "partitionKeyDefinitionJson") + + SparkBridgeImplementationInternal.trySplitFeedRanges(partitionKeyDefinitionJson, + FeedRange.forFullRange().asInstanceOf[FeedRangeEpkImpl], + bucketCount) + } +} diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/FeedRangesForBucketsITest.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/FeedRangesForBucketsITest.scala new file mode 100644 index 0000000000000..e1f9f165f61be --- /dev/null +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/FeedRangesForBucketsITest.scala @@ -0,0 +1,52 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.spark + +import com.azure.cosmos.implementation.{TestConfigurations, Utils} +import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait +import com.azure.cosmos.spark.udf.{GetBucketForPartitionKey, GetFeedRangeForHierarchicalPartitionKeyValues, GetFeedRangesForBuckets} +import org.apache.spark.sql.types._ + +import java.util.UUID + +class FeedRangesForBucketsITest + extends IntegrationSpec + with SparkWithDropwizardAndSlf4jMetrics + with CosmosClient + with AutoCleanableCosmosContainerWithSubpartitions + with BasicLoggingTrait + with MetricAssertions { + + //scalastyle:off multiple.string.literals + //scalastyle:off magic.number + + override def afterEach(): Unit = { + this.reinitializeContainer() + } + + "feed ranges" can "can be split into different buckets" in { + spark.udf.register("GetFeedRangesForBuckets", new GetFeedRangesForBuckets(), ArrayType(StringType)) + val pkDefinition = "{\"paths\":[\"/id\"],\"kind\":\"Hash\"}" + var dummyDf = spark.sql(s"SELECT GetFeedRangesForBuckets('$pkDefinition', 5)") + val expectedFeedRanges = Array("-05C1C9CD673398", "05C1C9CD673398-05C1D9CD673398", + "05C1D9CD673398-05C1E399CD6732", "05C1E399CD6732-05C1E9CD673398", "05C1E9CD673398-FF") + val feedRange = dummyDf + .collect()(0) + .getList[String](0) + .toArray + + logInfo(s"FeedRange from UDF: $feedRange") + assert(feedRange.sameElements(expectedFeedRanges), "Feed ranges do not match the expected values") + +// spark.udf.register("GetBucketForPartitionKey", new GetBucketForPartitionKey(), IntegerType) +// dummyDf = spark.sql(s"SELECT GetBucketForPartitionKey('$pkDefinition', 4979ea4a-6ba6-42ee-b9e6-1f5bf996a01f, '$feedRange')") +// val bucket = dummyDf.collect()(0).getInt(0) +// assert(bucket == 0, "Bucket does not match the expected value") + + + } + + //scalastyle:on magic.number + //scalastyle:on multiple.string.literals +} + diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EChangeFeedITest.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EChangeFeedITest.scala index f6144f78f0aaf..981581bec3102 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EChangeFeedITest.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EChangeFeedITest.scala @@ -22,7 +22,7 @@ class SparkE2EChangeFeedITest extends IntegrationSpec with SparkWithDropwizardAndSlf4jMetrics with CosmosClient - with CosmosContainerWithRetention + with CosmosContainer with BasicLoggingTrait with MetricAssertions { diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerTest.java index d2c6093c3d3ee..edecf71badec7 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerTest.java @@ -668,7 +668,6 @@ public void getNormalizedFeedRanges_HashV2() { public void getFeedRanges_withMultiplePartitions() throws Exception { String collectionName = UUID.randomUUID().toString(); CosmosContainerProperties containerProperties = getCollectionDefinition(collectionName); - CosmosContainerRequestOptions options = new CosmosContainerRequestOptions(); CosmosContainerResponse containerResponse = createdDatabase.createContainer( containerProperties, ThroughputProperties.createManualThroughput(18000)); @@ -697,11 +696,6 @@ public void getFeedRanges_withMultiplePartitions() throws Exception { .isNotNull() .hasSize(3); - String leftMin = getEffectiveRange(syncContainer, feedRangesAfterSplit.get(0)).getMin(); - String rightMin = firstEpkRange.getMin(); - String leftMax = getEffectiveRange(syncContainer, feedRangesAfterSplit.get(0)).getMax(); - String rightMax = firstEpkRange.getMax(); - assertThat(getEffectiveRange(syncContainer, feedRangesAfterSplit.get(0)).equals(firstEpkRange)) .isTrue(); @@ -716,7 +710,6 @@ public void getFeedRanges_withMultiplePartitions() throws Exception { public void getFeedRanges_withMultiplePartitions_HashV2() throws Exception { String collectionName = UUID.randomUUID().toString(); CosmosContainerProperties containerProperties = getCollectionDefinitionForHashV2(collectionName); - CosmosContainerRequestOptions options = new CosmosContainerRequestOptions(); CosmosContainerResponse containerResponse = createdDatabase.createContainer( containerProperties, ThroughputProperties.createManualThroughput(18000)); @@ -752,11 +745,6 @@ public void getFeedRanges_withMultiplePartitions_HashV2() throws Exception { .isNotNull() .hasSize(3); - String leftMin = getEffectiveRange(syncContainer, feedRangesAfterSplit.get(0)).getMin(); - String rightMin = firstEpkRange.getMin(); - String leftMax = getEffectiveRange(syncContainer, feedRangesAfterSplit.get(0)).getMax(); - String rightMax = firstEpkRange.getMax(); - assertThat(getEffectiveRange(syncContainer, feedRangesAfterSplit.get(0)).equals(firstEpkRange)) .isTrue(); @@ -767,6 +755,92 @@ public void getFeedRanges_withMultiplePartitions_HashV2() throws Exception { .isTrue(); } + @Test(groups = { "emulator" }, timeOut = TIMEOUT) + public void getFeedRanges_withMultiplePartitions_HashV2_HPK() { + String collectionName = UUID.randomUUID().toString(); + CosmosContainerProperties containerProperties = getCollectionDefinitionForHashV2WithHpk(collectionName); + createdDatabase.createContainer( + containerProperties, + ThroughputProperties.createManualThroughput(11000)); + this.createdContainer = createdDatabase.getContainer(collectionName); + + CosmosContainer syncContainer = createdDatabase.getContainer(collectionName); + + List feedRanges = syncContainer.getFeedRanges(); + assertThat(feedRanges) + .isNotNull() + .hasSize(2); + + assertFeedRange( + feedRanges.get(0), + "{\"Range\":{\"min\":\"\",\"max\":\"1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF\"}}"); + assertFeedRange( + feedRanges.get(1), + "{\"Range\":{\"min\":\"1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF\"," + + "\"max\":\"FF\"}}"); + + Range firstEpkRange = getEffectiveRange(syncContainer, feedRanges.get(0)); + Range secondEpkRange = getEffectiveRange(syncContainer, feedRanges.get(1)); + + List feedRangesAfterSplit = syncContainer + .asyncContainer + .trySplitFeedRange(FeedRange.forFullRange(), 2) + .block(); + assertThat(feedRangesAfterSplit) + .isNotNull() + .hasSize(2); + + assertThat(getEffectiveRange(syncContainer, feedRangesAfterSplit.get(0)).equals(firstEpkRange)) + .isTrue(); + + assertThat(getEffectiveRange(syncContainer, feedRangesAfterSplit.get(1)).equals(secondEpkRange)) + .isTrue(); + + } + + @Test(groups = { "emulator" }, timeOut = TIMEOUT) + public void getFeedRanges_withMultiplePartitions_HashV1_HPK() { + String collectionName = UUID.randomUUID().toString(); + CosmosContainerProperties containerProperties = getCollectionDefinitionForHashV2WithHpk(collectionName); + createdDatabase.createContainer( + containerProperties, + ThroughputProperties.createManualThroughput(11000)); + this.createdContainer = createdDatabase.getContainer(collectionName); + + CosmosContainer syncContainer = createdDatabase.getContainer(collectionName); + + List feedRanges = syncContainer.getFeedRanges(); + assertThat(feedRanges) + .isNotNull() + .hasSize(2); + + assertFeedRange( + feedRanges.get(0), + "{\"Range\":{\"min\":\"\",\"max\":\"1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF\"}}"); + assertFeedRange( + feedRanges.get(1), + "{\"Range\":{\"min\":\"1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF\"," + + "\"max\":\"FF\"}}"); + + Range firstEpkRange = getEffectiveRange(syncContainer, feedRanges.get(0)); + Range secondEpkRange = getEffectiveRange(syncContainer, feedRanges.get(1)); + + List feedRangesAfterSplit = syncContainer + .asyncContainer + .trySplitFeedRange(FeedRange.forFullRange(), 2) + .block(); + assertThat(feedRangesAfterSplit) + .isNotNull() + .hasSize(2); + + assertThat(getEffectiveRange(syncContainer, feedRangesAfterSplit.get(0)).equals(firstEpkRange)) + .isTrue(); + + assertThat(getEffectiveRange(syncContainer, feedRangesAfterSplit.get(1)).equals(secondEpkRange)) + .isTrue(); + + } + private static Range getEffectiveRange(CosmosContainer container, FeedRange feedRange) { AsyncDocumentClient clientWrapper = container.asyncContainer.getDatabase().getDocClientWrapper(); return FeedRangeInternal diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java index 82d78e196cac4..6d7d54b725c9c 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java @@ -56,6 +56,7 @@ import com.azure.cosmos.models.PartitionKey; import com.azure.cosmos.models.PartitionKeyDefinition; import com.azure.cosmos.models.PartitionKeyDefinitionVersion; +import com.azure.cosmos.models.PartitionKind; import com.azure.cosmos.models.SqlQuerySpec; import com.azure.cosmos.models.ThroughputProperties; import com.azure.cosmos.util.CosmosPagedFlux; @@ -671,6 +672,32 @@ static protected CosmosContainerProperties getCollectionDefinitionForHashV2(Stri return collectionDefinition; } + static protected CosmosContainerProperties getCollectionDefinitionForHashV2WithHpk(String collectionId) { + PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition(); + ArrayList paths = new ArrayList<>(); + paths.add("/state"); + paths.add("/city"); + paths.add("/zipcode"); + partitionKeyDef.setPaths(paths); + partitionKeyDef.setVersion(PartitionKeyDefinitionVersion.V2); + partitionKeyDef.setKind(PartitionKind.MULTI_HASH); + + return new CosmosContainerProperties(collectionId, partitionKeyDef); + } + + + static protected CosmosContainerProperties getCollectionDefinitionWithHpk(String collectionId) { + PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition(); + ArrayList paths = new ArrayList<>(); + paths.add("/state"); + paths.add("/city"); + paths.add("/zipcode"); + partitionKeyDef.setPaths(paths); + partitionKeyDef.setKind(PartitionKind.MULTI_HASH); + + return new CosmosContainerProperties(collectionId, partitionKeyDef); + } + static protected CosmosContainerProperties getCollectionDefinitionWithRangeRangeIndexWithIdAsPartitionKey() { return getCollectionDefinitionWithRangeRangeIndex(Collections.singletonList("/id")); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java index e15a88e126186..188a9fd7e468f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java @@ -2676,8 +2676,8 @@ Mono> getFeedRanges(boolean forceRefresh) { } /** - * Attempts to split a feedrange into {@lparamtargetedCountAfterAplit} sub ranges. This is a best - * effort - it is possible that the list of feed ranges returned has less than {@lparamtargetedCountAfterAplit} + * Attempts to split a feedrange into {@lparamtargetedCountAfterSplit} sub ranges. This is a best + * effort - it is possible that the list of feed ranges returned has less than {@lparamtargetedCountAfterSplit} * sub ranges * @param feedRange * @param targetedCountAfterSplit diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeInternal.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeInternal.java index 65151fe9d309b..602019986024d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeInternal.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeInternal.java @@ -17,7 +17,6 @@ import com.azure.cosmos.models.FeedRange; import com.azure.cosmos.models.PartitionKeyDefinition; import com.azure.cosmos.models.PartitionKeyDefinitionVersion; -import com.azure.cosmos.models.PartitionKind; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize; @@ -220,29 +219,31 @@ public Mono> trySplit( PartitionKeyDefinition pkDefinition = collectionValueHolder.v.getPartitionKey(); - if (targetedSplitCount <= 1 || - effectiveRange.isSingleValue() || - // splitting ranges into sub ranges only possible for hash partitioning - pkDefinition.getKind() != PartitionKind.HASH) { + if (targetedSplitCount <= 1 || effectiveRange.isSingleValue()) { return Collections.singletonList(new FeedRangeEpkImpl(effectiveRange)); } - PartitionKeyDefinitionVersion effectivePKVersion = - pkDefinition.getVersion() != null - ? pkDefinition.getVersion() - : PartitionKeyDefinitionVersion.V1; - switch (effectivePKVersion) { - case V1: - return trySplitWithHashV1(effectiveRange, targetedSplitCount); + return trySplitCore(pkDefinition, effectiveRange, targetedSplitCount); + }); + } - case V2: - return trySplitWithHashV2(effectiveRange, targetedSplitCount); + public static List trySplitCore(PartitionKeyDefinition pkDefinition, Range effectiveRange, int targetedSplitCount) { + PartitionKeyDefinitionVersion effectivePKVersion = + pkDefinition.getVersion() != null + ? pkDefinition.getVersion() + : PartitionKeyDefinitionVersion.V1; + switch (effectivePKVersion) { + case V1: + return trySplitWithHashV1(effectiveRange, targetedSplitCount); + + case V2: + return trySplitWithHashV2(effectiveRange, targetedSplitCount); + + default: + return Collections.singletonList(new FeedRangeEpkImpl(effectiveRange)); + } - default: - return Collections.singletonList(new FeedRangeEpkImpl(effectiveRange)); - } - }); } static List trySplitWithHashV1( diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/PartitionKeyBuilder.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/PartitionKeyBuilder.java index fc7647bdae472..d57e330093100 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/PartitionKeyBuilder.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/PartitionKeyBuilder.java @@ -3,19 +3,11 @@ package com.azure.cosmos.models; -import com.azure.cosmos.implementation.JsonSerializable; -import com.azure.cosmos.implementation.PartitionKeyHelper; import com.azure.cosmos.implementation.Undefined; -import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.routing.PartitionKeyInternal; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.node.ObjectNode; import java.util.ArrayList; import java.util.List; -import java.util.Map; - -import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; /** * Builder for partition keys. From 11effce2558f08a9130602b0b6b56849e552cd0c Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Tue, 3 Dec 2024 14:04:02 -0500 Subject: [PATCH 2/5] Added test more test coverage --- .../azure-cosmos-spark_3-1_2-12/CHANGELOG.md | 1 + .../azure-cosmos-spark_3-2_2-12/CHANGELOG.md | 1 + .../azure-cosmos-spark_3-3_2-12/CHANGELOG.md | 1 + .../azure-cosmos-spark_3-4_2-12/CHANGELOG.md | 1 + .../azure-cosmos-spark_3-5_2-12/CHANGELOG.md | 1 + .../SparkBridgeImplementationInternal.scala | 58 ++++------ .../spark/FeedRangesForBucketsITest.scala | 104 ++++++++++++++++-- 7 files changed, 120 insertions(+), 47 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md index 72ad2af6e22c8..af9197a3b335f 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md @@ -3,6 +3,7 @@ ### 4.35.0-beta.1 (Unreleased) #### Features Added +* Added the udfs `GetFeedRangesForBuckets` and `GetBucketForPartitionKey` to ease mapping of cosmos partition key to databricks table partition key. - See [PR 43092](https://github.com/Azure/azure-sdk-for-java/pull/43092) #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md index c21623a209e5b..e63363e545b7f 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md @@ -3,6 +3,7 @@ ### 4.35.0-beta.1 (Unreleased) #### Features Added +* Added the udfs `GetFeedRangesForBuckets` and `GetBucketForPartitionKey` to ease mapping of cosmos partition key to databricks table partition key. - See [PR 43092](https://github.com/Azure/azure-sdk-for-java/pull/43092) #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md index e920fd1572f89..972fd183ee42c 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md @@ -3,6 +3,7 @@ ### 4.35.0-beta.1 (Unreleased) #### Features Added +* Added the udfs `GetFeedRangesForBuckets` and `GetBucketForPartitionKey` to ease mapping of cosmos partition key to databricks table partition key. - See [PR 43092](https://github.com/Azure/azure-sdk-for-java/pull/43092) #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md index 276adeb27d8ef..19efc4841fc47 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md @@ -3,6 +3,7 @@ ### 4.35.0-beta.1 (Unreleased) #### Features Added +* Added the udfs `GetFeedRangesForBuckets` and `GetBucketForPartitionKey` to ease mapping of cosmos partition key to databricks table partition key. - See [PR 43092](https://github.com/Azure/azure-sdk-for-java/pull/43092) #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md index 8829b806e7472..1a097297e18e3 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md @@ -3,6 +3,7 @@ ### 4.35.0-beta.1 (Unreleased) #### Features Added +* Added the udfs `GetFeedRangesForBuckets` and `GetBucketForPartitionKey` to ease mapping of cosmos partition key to databricks table partition key. - See [PR 43092](https://github.com/Azure/azure-sdk-for-java/pull/43092) #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/implementation/SparkBridgeImplementationInternal.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/implementation/SparkBridgeImplementationInternal.scala index 4f95e12a6f65b..ad53e60c7c6a0 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/implementation/SparkBridgeImplementationInternal.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/implementation/SparkBridgeImplementationInternal.scala @@ -210,7 +210,7 @@ private[cosmos] object SparkBridgeImplementationInternal extends BasicLoggingTra partitionKeyDefinitionJson: String ): NormalizedRange = { val pkDefinition = SparkModelBridgeInternal.createPartitionKeyDefinitionFromJson(partitionKeyDefinitionJson) - partitionKeyToNormalizedRange(new PartitionKey(partitionKeyValue), pkDefinition) + partitionKeyToNormalizedRange(getPartitionKeyValue(pkDefinition, partitionKeyValue), pkDefinition) } private[cosmos] def partitionKeyToNormalizedRange( @@ -226,27 +226,13 @@ private[cosmos] object SparkBridgeImplementationInternal extends BasicLoggingTra partitionKeyValueJsonArray: Object, partitionKeyDefinitionJson: String ): NormalizedRange = { - - val partitionKey = new PartitionKeyBuilder() - val objectMapper = new ObjectMapper() - val json = partitionKeyValueJsonArray.toString - try { - val partitionKeyValues = objectMapper.readValue(json, classOf[Array[String]]) - for (value <- partitionKeyValues) { - partitionKey.add(value.trim) - } - partitionKey.build() - } catch { - case e: Exception => - logInfo("Invalid partition key paths: " + json, e) - } - - val feedRange = FeedRange - .forLogicalPartition(partitionKey.build()) + val pkDefinition = SparkModelBridgeInternal.createPartitionKeyDefinitionFromJson(partitionKeyDefinitionJson) + val partitionKey = getPartitionKeyValue(pkDefinition, partitionKeyValueJsonArray) + val feedRange = FeedRange + .forLogicalPartition(partitionKey) .asInstanceOf[FeedRangePartitionKeyImpl] - val pkDefinition = SparkModelBridgeInternal.createPartitionKeyDefinitionFromJson(partitionKeyDefinitionJson) - val effectiveRange = feedRange.getEffectiveRange(pkDefinition) + val effectiveRange = feedRange.getEffectiveRange(pkDefinition) rangeToNormalizedRange(effectiveRange) } @@ -268,10 +254,23 @@ private[cosmos] object SparkBridgeImplementationInternal extends BasicLoggingTra } def findBucket(feedRanges: Array[String], pkValue: Object, pkDefinition: PartitionKeyDefinition):Int = { + val pk = getPartitionKeyValue(pkDefinition, pkValue) + val feedRangeFromPk = FeedRange.forLogicalPartition(pk).asInstanceOf[FeedRangePartitionKeyImpl] + val effectiveRangeFromPk = feedRangeFromPk.getEffectiveRange(pkDefinition) + + for (i <- feedRanges.indices) { + val range = SparkBridgeImplementationInternal.toCosmosRange(feedRanges(i)) + if (range.contains(effectiveRangeFromPk.getMin)) { + return i + } + } + throw new IllegalArgumentException("The partition key value does not belong to any of the feed ranges") + } + + private def getPartitionKeyValue(pkDefinition: PartitionKeyDefinition, pkValue: Object): PartitionKey = { val partitionKey = new PartitionKeyBuilder() var pk: PartitionKey = null - // refactor this - if (pkDefinition.getKind == PartitionKind.MULTI_HASH) { + if (pkDefinition.getKind.equals(PartitionKind.MULTI_HASH)) { val objectMapper = new ObjectMapper() val json = pkValue.toString try { @@ -284,21 +283,10 @@ private[cosmos] object SparkBridgeImplementationInternal extends BasicLoggingTra case e: Exception => logInfo("Invalid partition key paths: " + json, e) } - } else if (pkDefinition.getKind == PartitionKind.HASH) { + } else if (pkDefinition.getKind.equals(PartitionKind.HASH)) { pk = new PartitionKey(pkValue) } - val feedRangeFromPk = FeedRange.forLogicalPartition(pk).asInstanceOf[FeedRangePartitionKeyImpl] - val effectiveRangeFromPk = feedRangeFromPk.getEffectiveRange(pkDefinition) - - for (i <- feedRanges.indices) { - val range = SparkBridgeImplementationInternal.toCosmosRange(feedRanges(i)) - if (range.contains(effectiveRangeFromPk.getMin)) { - return i - } - - } - -1 - + pk } def setIoThreadCountPerCoreFactor diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/FeedRangesForBucketsITest.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/FeedRangesForBucketsITest.scala index e1f9f165f61be..fee12b4f8e52a 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/FeedRangesForBucketsITest.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/FeedRangesForBucketsITest.scala @@ -2,18 +2,22 @@ // Licensed under the MIT License. package com.azure.cosmos.spark -import com.azure.cosmos.implementation.{TestConfigurations, Utils} +import com.azure.cosmos.CosmosAsyncContainer +import com.azure.cosmos.implementation.{SparkBridgeImplementationInternal, Utils} +import com.azure.cosmos.models.CosmosQueryRequestOptions import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait -import com.azure.cosmos.spark.udf.{GetBucketForPartitionKey, GetFeedRangeForHierarchicalPartitionKeyValues, GetFeedRangesForBuckets} -import org.apache.spark.sql.types._ +import com.azure.cosmos.spark.udf.{GetBucketForPartitionKey, GetFeedRangesForBuckets} +import com.fasterxml.jackson.databind.node.ObjectNode +import org.apache.spark.sql.types.{ArrayType, IntegerType, StringType} import java.util.UUID +import scala.collection.mutable class FeedRangesForBucketsITest extends IntegrationSpec with SparkWithDropwizardAndSlf4jMetrics with CosmosClient - with AutoCleanableCosmosContainerWithSubpartitions + with CosmosContainer with BasicLoggingTrait with MetricAssertions { @@ -24,10 +28,10 @@ class FeedRangesForBucketsITest this.reinitializeContainer() } - "feed ranges" can "can be split into different buckets" in { + "feed ranges" can "be split into different buckets" in { spark.udf.register("GetFeedRangesForBuckets", new GetFeedRangesForBuckets(), ArrayType(StringType)) - val pkDefinition = "{\"paths\":[\"/id\"],\"kind\":\"Hash\"}" - var dummyDf = spark.sql(s"SELECT GetFeedRangesForBuckets('$pkDefinition', 5)") + var pkDefinition = "{\"paths\":[\"/id\"],\"kind\":\"Hash\"}" + val dummyDf = spark.sql(s"SELECT GetFeedRangesForBuckets('$pkDefinition', 5)") val expectedFeedRanges = Array("-05C1C9CD673398", "05C1C9CD673398-05C1D9CD673398", "05C1D9CD673398-05C1E399CD6732", "05C1E399CD6732-05C1E9CD673398", "05C1E9CD673398-FF") val feedRange = dummyDf @@ -35,15 +39,91 @@ class FeedRangesForBucketsITest .getList[String](0) .toArray - logInfo(s"FeedRange from UDF: $feedRange") assert(feedRange.sameElements(expectedFeedRanges), "Feed ranges do not match the expected values") + val lastId = "45170a78-eac0-4d3a-be5e-9b00bb5f4649" -// spark.udf.register("GetBucketForPartitionKey", new GetBucketForPartitionKey(), IntegerType) -// dummyDf = spark.sql(s"SELECT GetBucketForPartitionKey('$pkDefinition', 4979ea4a-6ba6-42ee-b9e6-1f5bf996a01f, '$feedRange')") -// val bucket = dummyDf.collect()(0).getInt(0) -// assert(bucket == 0, "Bucket does not match the expected value") + var bucket = new GetBucketForPartitionKey().call(pkDefinition, lastId, expectedFeedRanges) + assert(bucket == 0, "Bucket does not match the expected value") + // test with hpk partition key definition + pkDefinition = "{\"paths\":[\"/tenantId\",\"/userId\",\"/sessionId\"],\"kind\":\"MultiHash\"}" + val pkValues = "[\"" + lastId + "\"]" + bucket = new GetBucketForPartitionKey().call(pkDefinition, pkValues, expectedFeedRanges) + assert(bucket == 4, "Bucket does not match the expected value") + + } + + "feed ranges" can "be converted into buckets for new partition key" in { + feedRangesForBuckets(false) + } + + "feed ranges" can "be converted into buckets for new hierarchical partition key" in { + feedRangesForBuckets(true) + } + + def feedRangesForBuckets(hpk: Boolean): Unit = { + val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainer) + val docs = createItems(container, 50, hpk) + + spark.udf.register("GetFeedRangesForBuckets", new GetFeedRangesForBuckets(), ArrayType(StringType)) + val pkDefinition = if (hpk) {"{\"paths\":[\"/tenantId\",\"/userId\",\"/sessionId\"],\"kind\":\"MultiHash\"}"} + else {"{\"paths\":[\"/id\"],\"kind\":\"Hash\"}"} + + val dummyDf = spark.sql(s"SELECT GetFeedRangesForBuckets('$pkDefinition', 5)") + val feedRanges = dummyDf + .collect()(0) + .getList[String](0) + .toArray(new Array[String](0)) + + spark.udf.register("GetBucketForPartitionKey", new GetBucketForPartitionKey(), IntegerType) + val bucketToDocsMap = mutable.Map[Int, List[ObjectNode]]().withDefaultValue(List()) + + for (doc <- docs) { + val lastId = if (!hpk) doc.get("id").asText() else "[\"" + doc.get("tenantId").asText() + "\"]" + val bucket = new GetBucketForPartitionKey().call(pkDefinition, lastId, feedRanges) + // Add the document to the corresponding bucket in the map + bucketToDocsMap(bucket) = doc :: bucketToDocsMap(bucket) + } + + for (i <- feedRanges.indices) { + val range = SparkBridgeImplementationInternal.toCosmosRange(feedRanges(i)) + val feedRange = SparkBridgeImplementationInternal.toFeedRange(SparkBridgeImplementationInternal.rangeToNormalizedRange(range)) + val requestOptions = new CosmosQueryRequestOptions().setFeedRange(feedRange) + container.queryItems("SELECT * FROM c", requestOptions, classOf[ObjectNode]).byPage().collectList().block().forEach { rsp => + val results = rsp.getResults + var numDocs = 0 + val expectedResults = bucketToDocsMap(i) + results.forEach(doc => { + assert(expectedResults.collect({ + case expectedDoc if expectedDoc.get("id").asText() == doc.get("id").asText() => expectedDoc + }).size >= 0, "Document not found in the expected bucket") + numDocs += 1 + }) + assert(numDocs == results.size(), "Number of documents in the bucket does not match the number of docs for that feed range") + } + } + } + + def createItems(container: CosmosAsyncContainer, numOfItems: Int, hpk: Boolean): Array[ObjectNode] = { + val docs = new Array[ObjectNode](numOfItems) + for (sequenceNumber <- 1 to numOfItems) { + val lastId = UUID.randomUUID().toString + val objectNode = Utils.getSimpleObjectMapper.createObjectNode() + objectNode.put("name", "Shrodigner's cat") + objectNode.put("type", "cat") + objectNode.put("age", 20) + objectNode.put("sequenceNumber", sequenceNumber) + objectNode.put("id", lastId) + if (hpk) { + objectNode.put("tenantId", lastId) + objectNode.put("userId", "userId1") + objectNode.put("sessionId", "sessionId1") + } + docs(sequenceNumber - 1) = objectNode + container.createItem(objectNode).block() + } + docs } //scalastyle:on magic.number From b887e86aac1dc9df9550e6ed844303b076a73600 Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Tue, 3 Dec 2024 15:19:36 -0500 Subject: [PATCH 3/5] revert incorrect change --- .../com/azure/cosmos/spark/udf/GetBucketForPartitionKey.scala | 2 +- .../com/azure/cosmos/spark/SparkE2EChangeFeedITest.scala | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetBucketForPartitionKey.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetBucketForPartitionKey.scala index 33d672bdbcb72..7373a58d95fe3 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetBucketForPartitionKey.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetBucketForPartitionKey.scala @@ -20,4 +20,4 @@ class GetBucketForPartitionKey extends UDF3[String, Object, Array[String], Int] val pkDefinition = SparkModelBridgeInternal.createPartitionKeyDefinitionFromJson(partitionKeyDefinitionJson) SparkBridgeImplementationInternal.findBucket(feedRangesForBuckets, partitionKeyValue, pkDefinition) } -} \ No newline at end of file +} diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EChangeFeedITest.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EChangeFeedITest.scala index 65261f38754e4..90cdd508bcce3 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EChangeFeedITest.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EChangeFeedITest.scala @@ -5,11 +5,9 @@ package com.azure.cosmos.spark import com.azure.cosmos.SparkBridgeInternal import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState import com.azure.cosmos.implementation.{TestConfigurations, Utils} -import com.azure.cosmos.models.PartitionKey import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait import com.azure.cosmos.spark.udf.{CreateChangeFeedOffsetFromSpark2, CreateSpark2ContinuationsFromChangeFeedOffset, GetFeedRangeForPartitionKeyValue} import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.spark.sql.functions import org.apache.spark.sql.types._ import java.io.{BufferedReader, InputStreamReader} @@ -22,7 +20,7 @@ class SparkE2EChangeFeedITest extends IntegrationSpec with SparkWithDropwizardAndSlf4jMetrics with CosmosClient - with CosmosContainer + with CosmosContainerWithRetention with BasicLoggingTrait with MetricAssertions { From e280624f77ce8c2bb029780601bd8c414bc6d6a2 Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Wed, 18 Dec 2024 10:28:25 -0500 Subject: [PATCH 4/5] Reacting to comments --- .../SparkBridgeImplementationInternal.scala | 44 +++++++--- .../spark/udf/GetFeedRangesForBuckets.scala | 25 ------ .../spark/udf/GetFeedRangesForContainer.scala | 54 +++++++++++++ ...ey.scala => GetOverlappingFeedRange.scala} | 6 +- ...cala => FeedRangesForContainerITest.scala} | 80 +++++++++++-------- .../feedranges/FeedRangeInternal.java | 32 ++++---- 6 files changed, 149 insertions(+), 92 deletions(-) delete mode 100644 sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetFeedRangesForBuckets.scala create mode 100644 sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetFeedRangesForContainer.scala rename sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/{GetBucketForPartitionKey.scala => GetOverlappingFeedRange.scala} (77%) rename sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/{FeedRangesForBucketsITest.scala => FeedRangesForContainerITest.scala} (59%) diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/implementation/SparkBridgeImplementationInternal.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/implementation/SparkBridgeImplementationInternal.scala index ad53e60c7c6a0..ad23682d42304 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/implementation/SparkBridgeImplementationInternal.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/implementation/SparkBridgeImplementationInternal.scala @@ -238,22 +238,44 @@ private[cosmos] object SparkBridgeImplementationInternal extends BasicLoggingTra private[cosmos] def trySplitFeedRanges ( - partitionKeyDefinitionJson: String, - feedRange: FeedRangeEpkImpl, - bucketCount: Int - ): Array[String] = { + cosmosClient: CosmosAsyncClient, + containerName: String, + databaseName: String, + targetedCount: Int + ): List[String] = { + val container = cosmosClient + .getDatabase(databaseName) + .getContainer(containerName) - val pkDefinition = SparkModelBridgeInternal.createPartitionKeyDefinitionFromJson(partitionKeyDefinitionJson) - val feedRanges = FeedRangeInternal.trySplitCore(pkDefinition, feedRange.getRange, bucketCount) - val normalizedRanges = new Array[String](feedRanges.size()) + val epkRange = rangeToNormalizedRange(FeedRange.forFullRange().asInstanceOf[FeedRangeEpkImpl].getRange) + val feedRanges = SparkBridgeInternal.trySplitFeedRange(container, epkRange, targetedCount) + var normalizedRanges: List[String] = List() for (i <- feedRanges.indices) { - val normalizedRange = rangeToNormalizedRange(feedRanges(i).getRange) - normalizedRanges(i) = s"${normalizedRange.min}-${normalizedRange.max}" + normalizedRanges = normalizedRanges :+ s"${feedRanges(i).min}-${feedRanges(i).max}" } normalizedRanges } - def findBucket(feedRanges: Array[String], pkValue: Object, pkDefinition: PartitionKeyDefinition):Int = { + private[cosmos] def getFeedRangesForContainer + ( + cosmosClient: CosmosAsyncClient, + containerName: String, + databaseName: String + ): List[String] = { + val container = cosmosClient + .getDatabase(databaseName) + .getContainer(containerName) + + val feedRanges: List[String] = List() + container.getFeedRanges().block.map(feedRange => { + val effectiveRangeFromPk = feedRange.asInstanceOf[FeedRangeEpkImpl].getEffectiveRange(null, null, null).block + val normalizedRange = rangeToNormalizedRange(effectiveRangeFromPk) + s"${normalizedRange.min}-${normalizedRange.max}" :: feedRanges + }) + feedRanges + } + + def getOverlappingRange(feedRanges: Array[String], pkValue: Object, pkDefinition: PartitionKeyDefinition): String = { val pk = getPartitionKeyValue(pkDefinition, pkValue) val feedRangeFromPk = FeedRange.forLogicalPartition(pk).asInstanceOf[FeedRangePartitionKeyImpl] val effectiveRangeFromPk = feedRangeFromPk.getEffectiveRange(pkDefinition) @@ -261,7 +283,7 @@ private[cosmos] object SparkBridgeImplementationInternal extends BasicLoggingTra for (i <- feedRanges.indices) { val range = SparkBridgeImplementationInternal.toCosmosRange(feedRanges(i)) if (range.contains(effectiveRangeFromPk.getMin)) { - return i + return feedRanges(i) } } throw new IllegalArgumentException("The partition key value does not belong to any of the feed ranges") diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetFeedRangesForBuckets.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetFeedRangesForBuckets.scala deleted file mode 100644 index 005dfddcb1158..0000000000000 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetFeedRangesForBuckets.scala +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. -package com.azure.cosmos.spark.udf - -import com.azure.cosmos.implementation.SparkBridgeImplementationInternal -import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl -import com.azure.cosmos.models.FeedRange -import com.azure.cosmos.spark.CosmosPredicates.requireNotNullOrEmpty -import org.apache.spark.sql.api.java.UDF2 - -@SerialVersionUID(1L) -class GetFeedRangesForBuckets extends UDF2[String, Int, Array[String]] { - override def call - ( - partitionKeyDefinitionJson: String, - bucketCount: Int - ): Array[String] = { - - requireNotNullOrEmpty(partitionKeyDefinitionJson, "partitionKeyDefinitionJson") - - SparkBridgeImplementationInternal.trySplitFeedRanges(partitionKeyDefinitionJson, - FeedRange.forFullRange().asInstanceOf[FeedRangeEpkImpl], - bucketCount) - } -} diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetFeedRangesForContainer.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetFeedRangesForContainer.scala new file mode 100644 index 0000000000000..02f3482266e2f --- /dev/null +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetFeedRangesForContainer.scala @@ -0,0 +1,54 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.spark.udf + +import com.azure.cosmos.implementation.SparkBridgeImplementationInternal +import com.azure.cosmos.spark.{CosmosClientCache, CosmosClientCacheItem, CosmosClientConfiguration, CosmosConfig, CosmosContainerConfig, CosmosReadConfig, Loan} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.api.java.UDF2 + +@SerialVersionUID(1L) +class GetFeedRangesForContainer extends UDF2[Map[String, String], Option[Int], Array[String]] { + override def call + ( + userProvidedConfig: Map[String, String], + targetedCount: Option[Int] + ): Array[String] = { + + val effectiveUserConfig = CosmosConfig.getEffectiveConfig(None, None, userProvidedConfig) + var feedRanges = List[String]() + val cosmosContainerConfig: CosmosContainerConfig = + CosmosContainerConfig.parseCosmosContainerConfig(effectiveUserConfig, None, None) + val readConfig = CosmosReadConfig.parseCosmosReadConfig(effectiveUserConfig) + val cosmosClientConfig = CosmosClientConfiguration( + effectiveUserConfig, + useEventualConsistency = readConfig.forceEventualConsistency, + CosmosClientConfiguration.getSparkEnvironmentInfo(SparkSession.getActiveSession)) + Loan( + List[Option[CosmosClientCacheItem]]( + Some(CosmosClientCache( + cosmosClientConfig, + None, + s"UDF GetFeedRangesForContainer" + )) + )) + .to(cosmosClientCacheItems => { + + if (targetedCount.isEmpty) { + feedRanges = SparkBridgeImplementationInternal.getFeedRangesForContainer( + cosmosClientCacheItems.head.get.cosmosClient, + cosmosContainerConfig.container, + cosmosContainerConfig.database + ) + } else { + feedRanges = SparkBridgeImplementationInternal.trySplitFeedRanges( + cosmosClientCacheItems.head.get.cosmosClient, + cosmosContainerConfig.container, + cosmosContainerConfig.database, + targetedCount.get) + } + }) + feedRanges.toArray + + } +} diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetBucketForPartitionKey.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetOverlappingFeedRange.scala similarity index 77% rename from sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetBucketForPartitionKey.scala rename to sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetOverlappingFeedRange.scala index 7373a58d95fe3..b93a8094d9de5 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetBucketForPartitionKey.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetOverlappingFeedRange.scala @@ -8,16 +8,16 @@ import com.azure.cosmos.spark.CosmosPredicates.requireNotNullOrEmpty import org.apache.spark.sql.api.java.UDF3 @SerialVersionUID(1L) -class GetBucketForPartitionKey extends UDF3[String, Object, Array[String], Int] { +class GetOverlappingFeedRange extends UDF3[String, Object, Array[String], String] { override def call ( partitionKeyDefinitionJson: String, partitionKeyValue: Object, feedRangesForBuckets: Array[String] - ): Int = { + ): String = { requireNotNullOrEmpty(partitionKeyDefinitionJson, "partitionKeyDefinitionJson") val pkDefinition = SparkModelBridgeInternal.createPartitionKeyDefinitionFromJson(partitionKeyDefinitionJson) - SparkBridgeImplementationInternal.findBucket(feedRangesForBuckets, partitionKeyValue, pkDefinition) + SparkBridgeImplementationInternal.getOverlappingRange(feedRangesForBuckets, partitionKeyValue, pkDefinition) } } diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/FeedRangesForBucketsITest.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/FeedRangesForContainerITest.scala similarity index 59% rename from sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/FeedRangesForBucketsITest.scala rename to sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/FeedRangesForContainerITest.scala index fee12b4f8e52a..35bd84ac6da1b 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/FeedRangesForBucketsITest.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/FeedRangesForContainerITest.scala @@ -3,17 +3,16 @@ package com.azure.cosmos.spark import com.azure.cosmos.CosmosAsyncContainer -import com.azure.cosmos.implementation.{SparkBridgeImplementationInternal, Utils} +import com.azure.cosmos.implementation.{SparkBridgeImplementationInternal, TestConfigurations, Utils} import com.azure.cosmos.models.CosmosQueryRequestOptions import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait -import com.azure.cosmos.spark.udf.{GetBucketForPartitionKey, GetFeedRangesForBuckets} +import com.azure.cosmos.spark.udf.{GetFeedRangesForContainer, GetOverlappingFeedRange} import com.fasterxml.jackson.databind.node.ObjectNode -import org.apache.spark.sql.types.{ArrayType, IntegerType, StringType} import java.util.UUID import scala.collection.mutable -class FeedRangesForBucketsITest +class FeedRangesForContainerITest extends IntegrationSpec with SparkWithDropwizardAndSlf4jMetrics with CosmosClient @@ -28,62 +27,73 @@ class FeedRangesForBucketsITest this.reinitializeContainer() } - "feed ranges" can "be split into different buckets" in { - spark.udf.register("GetFeedRangesForBuckets", new GetFeedRangesForBuckets(), ArrayType(StringType)) + "feed ranges" can "be split into different sub feed ranges" in { + + val cosmosEndpoint = TestConfigurations.HOST + val cosmosMasterKey = TestConfigurations.MASTER_KEY + + val cfg = Map( + "spark.cosmos.accountEndpoint" -> cosmosEndpoint, + "spark.cosmos.accountKey" -> cosmosMasterKey, + "spark.cosmos.database" -> cosmosDatabase, + "spark.cosmos.container" -> cosmosContainer, + ) var pkDefinition = "{\"paths\":[\"/id\"],\"kind\":\"Hash\"}" - val dummyDf = spark.sql(s"SELECT GetFeedRangesForBuckets('$pkDefinition', 5)") + val feedRanges = new GetFeedRangesForContainer().call(cfg, Option(5)) val expectedFeedRanges = Array("-05C1C9CD673398", "05C1C9CD673398-05C1D9CD673398", "05C1D9CD673398-05C1E399CD6732", "05C1E399CD6732-05C1E9CD673398", "05C1E9CD673398-FF") - val feedRange = dummyDf - .collect()(0) - .getList[String](0) - .toArray - assert(feedRange.sameElements(expectedFeedRanges), "Feed ranges do not match the expected values") + + + assert(feedRanges.sameElements(expectedFeedRanges), "Feed ranges do not match the expected values") val lastId = "45170a78-eac0-4d3a-be5e-9b00bb5f4649" - var bucket = new GetBucketForPartitionKey().call(pkDefinition, lastId, expectedFeedRanges) - assert(bucket == 0, "Bucket does not match the expected value") + var feedRangeResult = new GetOverlappingFeedRange().call(pkDefinition, lastId, expectedFeedRanges) + assert(feedRangeResult == "-05C1C9CD673398", "feed range does not match the expected value") // test with hpk partition key definition pkDefinition = "{\"paths\":[\"/tenantId\",\"/userId\",\"/sessionId\"],\"kind\":\"MultiHash\"}" val pkValues = "[\"" + lastId + "\"]" - bucket = new GetBucketForPartitionKey().call(pkDefinition, pkValues, expectedFeedRanges) - assert(bucket == 4, "Bucket does not match the expected value") + feedRangeResult = new GetOverlappingFeedRange().call(pkDefinition, pkValues, expectedFeedRanges) + assert(feedRangeResult == "05C1E9CD673398-FF", "feed range does not match the expected value") } - "feed ranges" can "be converted into buckets for new partition key" in { - feedRangesForBuckets(false) + "feed ranges" can "be mapped to new partition key" in { + feedRangesForPK(false) } - "feed ranges" can "be converted into buckets for new hierarchical partition key" in { - feedRangesForBuckets(true) + "feed ranges" can "be mapped to new hierarchical partition key" in { + feedRangesForPK(true) } - def feedRangesForBuckets(hpk: Boolean): Unit = { + def feedRangesForPK(hpk: Boolean): Unit = { + + val cosmosEndpoint = TestConfigurations.HOST + val cosmosMasterKey = TestConfigurations.MASTER_KEY + val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainer) val docs = createItems(container, 50, hpk) + val cfg = Map( + "spark.cosmos.accountEndpoint" -> cosmosEndpoint, + "spark.cosmos.accountKey" -> cosmosMasterKey, + "spark.cosmos.database" -> cosmosDatabase, + "spark.cosmos.container" -> cosmosContainer, + ) - spark.udf.register("GetFeedRangesForBuckets", new GetFeedRangesForBuckets(), ArrayType(StringType)) val pkDefinition = if (hpk) {"{\"paths\":[\"/tenantId\",\"/userId\",\"/sessionId\"],\"kind\":\"MultiHash\"}"} else {"{\"paths\":[\"/id\"],\"kind\":\"Hash\"}"} - val dummyDf = spark.sql(s"SELECT GetFeedRangesForBuckets('$pkDefinition', 5)") - val feedRanges = dummyDf - .collect()(0) - .getList[String](0) - .toArray(new Array[String](0)) + val feedRanges = new GetFeedRangesForContainer().call(cfg, Option(5)) - spark.udf.register("GetBucketForPartitionKey", new GetBucketForPartitionKey(), IntegerType) - val bucketToDocsMap = mutable.Map[Int, List[ObjectNode]]().withDefaultValue(List()) + val feedRangeToDocsMap = mutable.Map[String, List[ObjectNode]]().withDefaultValue(List()) for (doc <- docs) { val lastId = if (!hpk) doc.get("id").asText() else "[\"" + doc.get("tenantId").asText() + "\"]" - val bucket = new GetBucketForPartitionKey().call(pkDefinition, lastId, feedRanges) - // Add the document to the corresponding bucket in the map - bucketToDocsMap(bucket) = doc :: bucketToDocsMap(bucket) + val feedRange = new GetOverlappingFeedRange().call(pkDefinition, lastId, feedRanges) + // Add the document to the corresponding feed range in the map + feedRangeToDocsMap(feedRange) = doc :: feedRangeToDocsMap(feedRange) } for (i <- feedRanges.indices) { @@ -93,14 +103,14 @@ class FeedRangesForBucketsITest container.queryItems("SELECT * FROM c", requestOptions, classOf[ObjectNode]).byPage().collectList().block().forEach { rsp => val results = rsp.getResults var numDocs = 0 - val expectedResults = bucketToDocsMap(i) + val expectedResults = feedRangeToDocsMap(feedRanges(i)) results.forEach(doc => { assert(expectedResults.collect({ case expectedDoc if expectedDoc.get("id").asText() == doc.get("id").asText() => expectedDoc - }).size >= 0, "Document not found in the expected bucket") + }).size >= 0, "Document not found in the expected feed range") numDocs += 1 }) - assert(numDocs == results.size(), "Number of documents in the bucket does not match the number of docs for that feed range") + assert(numDocs == results.size(), "Number of documents in the target feed range does not match the number of docs for that feed range") } } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeInternal.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeInternal.java index 602019986024d..0e7b693537279 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeInternal.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeInternal.java @@ -224,27 +224,23 @@ public Mono> trySplit( return Collections.singletonList(new FeedRangeEpkImpl(effectiveRange)); } - return trySplitCore(pkDefinition, effectiveRange, targetedSplitCount); + PartitionKeyDefinitionVersion effectivePKVersion = + pkDefinition.getVersion() != null + ? pkDefinition.getVersion() + : PartitionKeyDefinitionVersion.V1; + switch (effectivePKVersion) { + case V1: + return trySplitWithHashV1(effectiveRange, targetedSplitCount); + + case V2: + return trySplitWithHashV2(effectiveRange, targetedSplitCount); + + default: + return Collections.singletonList(new FeedRangeEpkImpl(effectiveRange)); + } }); } - public static List trySplitCore(PartitionKeyDefinition pkDefinition, Range effectiveRange, int targetedSplitCount) { - PartitionKeyDefinitionVersion effectivePKVersion = - pkDefinition.getVersion() != null - ? pkDefinition.getVersion() - : PartitionKeyDefinitionVersion.V1; - switch (effectivePKVersion) { - case V1: - return trySplitWithHashV1(effectiveRange, targetedSplitCount); - - case V2: - return trySplitWithHashV2(effectiveRange, targetedSplitCount); - - default: - return Collections.singletonList(new FeedRangeEpkImpl(effectiveRange)); - } - - } static List trySplitWithHashV1( Range effectiveRange, From e4b33301e96019d7ae91490733289782b2fc8615 Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Wed, 18 Dec 2024 20:42:04 -0500 Subject: [PATCH 5/5] Fix method names in change log --- sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md | 2 +- sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md | 2 +- sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md | 2 +- sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md | 2 +- sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md | 2 +- .../com/azure/cosmos/spark/udf/GetOverlappingFeedRange.scala | 4 ++-- 6 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md index d5848a3cb2257..dcb52f2f06082 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md @@ -3,7 +3,7 @@ ### 4.36.0-beta.1 (Unreleased) #### Features Added -* Added the udfs `GetFeedRangesForBuckets` and `GetBucketForPartitionKey` to ease mapping of cosmos partition key to databricks table partition key. - See [PR 43092](https://github.com/Azure/azure-sdk-for-java/pull/43092) +* Added the udfs `GetFeedRangesForContainer` and `GetOverlappingFeedRange` to ease mapping of cosmos partition key to databricks table partition key. - See [PR 43092](https://github.com/Azure/azure-sdk-for-java/pull/43092) #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md index 45e8235488194..3947423014b51 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md @@ -3,7 +3,7 @@ ### 4.36.0-beta.1 (Unreleased) #### Features Added -* Added the udfs `GetFeedRangesForBuckets` and `GetBucketForPartitionKey` to ease mapping of cosmos partition key to databricks table partition key. - See [PR 43092](https://github.com/Azure/azure-sdk-for-java/pull/43092) +* Added the udfs `GetFeedRangesForContainer` and `GetOverlappingFeedRange` to ease mapping of cosmos partition key to databricks table partition key. - See [PR 43092](https://github.com/Azure/azure-sdk-for-java/pull/43092) #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md index 7f6d89f9add0d..cc5453f8e4a73 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md @@ -3,7 +3,7 @@ ### 4.36.0-beta.1 (Unreleased) #### Features Added -* Added the udfs `GetFeedRangesForBuckets` and `GetBucketForPartitionKey` to ease mapping of cosmos partition key to databricks table partition key. - See [PR 43092](https://github.com/Azure/azure-sdk-for-java/pull/43092) +* Added the udfs `GetFeedRangesForContainer` and `GetOverlappingFeedRange` to ease mapping of cosmos partition key to databricks table partition key. - See [PR 43092](https://github.com/Azure/azure-sdk-for-java/pull/43092) #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md index a3b01242df788..a1bdab35c982b 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md @@ -3,7 +3,7 @@ ### 4.36.0-beta.1 (Unreleased) #### Features Added -* Added the udfs `GetFeedRangesForBuckets` and `GetBucketForPartitionKey` to ease mapping of cosmos partition key to databricks table partition key. - See [PR 43092](https://github.com/Azure/azure-sdk-for-java/pull/43092) +* Added the udfs `GetFeedRangesForContainer` and `GetOverlappingFeedRange` to ease mapping of cosmos partition key to databricks table partition key. - See [PR 43092](https://github.com/Azure/azure-sdk-for-java/pull/43092) #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md index fc5e13376568e..31e285a542f74 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md @@ -3,7 +3,7 @@ ### 4.36.0-beta.1 (Unreleased) #### Features Added -* Added the udfs `GetFeedRangesForBuckets` and `GetBucketForPartitionKey` to ease mapping of cosmos partition key to databricks table partition key. - See [PR 43092](https://github.com/Azure/azure-sdk-for-java/pull/43092) +* Added the udfs `GetFeedRangesForContainer` and `GetOverlappingFeedRange` to ease mapping of cosmos partition key to databricks table partition key. - See [PR 43092](https://github.com/Azure/azure-sdk-for-java/pull/43092) #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetOverlappingFeedRange.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetOverlappingFeedRange.scala index b93a8094d9de5..1d71b7ed5c674 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetOverlappingFeedRange.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/udf/GetOverlappingFeedRange.scala @@ -13,11 +13,11 @@ class GetOverlappingFeedRange extends UDF3[String, Object, Array[String], String ( partitionKeyDefinitionJson: String, partitionKeyValue: Object, - feedRangesForBuckets: Array[String] + targetFeedRanges: Array[String] ): String = { requireNotNullOrEmpty(partitionKeyDefinitionJson, "partitionKeyDefinitionJson") val pkDefinition = SparkModelBridgeInternal.createPartitionKeyDefinitionFromJson(partitionKeyDefinitionJson) - SparkBridgeImplementationInternal.getOverlappingRange(feedRangesForBuckets, partitionKeyValue, pkDefinition) + SparkBridgeImplementationInternal.getOverlappingRange(targetFeedRanges, partitionKeyValue, pkDefinition) } }