Skip to content

Commit

Permalink
UDFs for Mapping Feed Ranges to Buckets (#43092)
Browse files Browse the repository at this point in the history
* Added udfs and some tests for splitting ranges with hpks

* Added test more test coverage

* revert incorrect change

* Reacting to comments

* Fix method names in change log
  • Loading branch information
tvaron3 authored Jan 6, 2025
1 parent eb738d8 commit 005ceb4
Show file tree
Hide file tree
Showing 16 changed files with 430 additions and 62 deletions.
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.36.0-beta.1 (Unreleased)

#### Features Added
* Added the udfs `GetFeedRangesForContainer` and `GetOverlappingFeedRange` to ease mapping of cosmos partition key to databricks table partition key. - See [PR 43092](https://github.com/Azure/azure-sdk-for-java/pull/43092)

#### Breaking Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.36.0-beta.1 (Unreleased)

#### Features Added
* Added the udfs `GetFeedRangesForContainer` and `GetOverlappingFeedRange` to ease mapping of cosmos partition key to databricks table partition key. - See [PR 43092](https://github.com/Azure/azure-sdk-for-java/pull/43092)

#### Breaking Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.36.0-beta.1 (Unreleased)

#### Features Added
* Added the udfs `GetFeedRangesForContainer` and `GetOverlappingFeedRange` to ease mapping of cosmos partition key to databricks table partition key. - See [PR 43092](https://github.com/Azure/azure-sdk-for-java/pull/43092)

#### Breaking Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.36.0-beta.1 (Unreleased)

#### Features Added
* Added the udfs `GetFeedRangesForContainer` and `GetOverlappingFeedRange` to ease mapping of cosmos partition key to databricks table partition key. - See [PR 43092](https://github.com/Azure/azure-sdk-for-java/pull/43092)

#### Breaking Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.36.0-beta.1 (Unreleased)

#### Features Added
* Added the udfs `GetFeedRangesForContainer` and `GetOverlappingFeedRange` to ease mapping of cosmos partition key to databricks table partition key. - See [PR 43092](https://github.com/Azure/azure-sdk-for-java/pull/43092)

#### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,6 @@ private[cosmos] object SparkBridgeInternal {
s"${database.getClient.getServiceEndpoint}|${database.getId}|${container.getId}"
}

private[cosmos] def getNormalizedEffectiveRange
(
container: CosmosAsyncContainer,
feedRange: FeedRange
) : NormalizedRange = {

SparkBridgeImplementationInternal
.rangeToNormalizedRange(
container.getNormalizedEffectiveRange(feedRange).block)
}

private[cosmos] def getPartitionKeyRanges
(
container: CosmosAsyncContainer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ import com.azure.cosmos.implementation.guava25.base.MoreObjects.firstNonNull
import com.azure.cosmos.implementation.guava25.base.Strings.emptyToNull
import com.azure.cosmos.implementation.query.CompositeContinuationToken
import com.azure.cosmos.implementation.routing.Range
import com.azure.cosmos.models.{FeedRange, PartitionKey, PartitionKeyBuilder, PartitionKeyDefinition, SparkModelBridgeInternal}
import com.azure.cosmos.models.{FeedRange, PartitionKey, PartitionKeyBuilder, PartitionKeyDefinition, PartitionKind, SparkModelBridgeInternal}
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait
import com.azure.cosmos.spark.{ChangeFeedOffset, CosmosConstants, NormalizedRange}
import com.azure.cosmos.{CosmosAsyncClient, CosmosClientBuilder, DirectConnectionConfig, SparkBridgeInternal}
import com.fasterxml.jackson.databind.ObjectMapper

import scala.collection.convert.ImplicitConversions.`list asScalaBuffer`
import scala.collection.mutable

// scalastyle:off underscore.import
Expand Down Expand Up @@ -189,6 +190,11 @@ private[cosmos] object SparkBridgeImplementationInternal extends BasicLoggingTra
new Range[String](range.min, range.max, true, false)
}

private[cosmos] def toCosmosRange(range: String): Range[String] = {
val parts = range.split("-")
new Range[String](parts(0), parts(1), true, false)
}

def doRangesOverlap(left: NormalizedRange, right: NormalizedRange): Boolean = {
Range.checkOverlapping(toCosmosRange(left), toCosmosRange(right))
}
Expand All @@ -204,7 +210,7 @@ private[cosmos] object SparkBridgeImplementationInternal extends BasicLoggingTra
partitionKeyDefinitionJson: String
): NormalizedRange = {
val pkDefinition = SparkModelBridgeInternal.createPartitionKeyDefinitionFromJson(partitionKeyDefinitionJson)
partitionKeyToNormalizedRange(new PartitionKey(partitionKeyValue), pkDefinition)
partitionKeyToNormalizedRange(getPartitionKeyValue(pkDefinition, partitionKeyValue), pkDefinition)
}

private[cosmos] def partitionKeyToNormalizedRange(
Expand All @@ -220,28 +226,89 @@ private[cosmos] object SparkBridgeImplementationInternal extends BasicLoggingTra
partitionKeyValueJsonArray: Object,
partitionKeyDefinitionJson: String
): NormalizedRange = {
val pkDefinition = SparkModelBridgeInternal.createPartitionKeyDefinitionFromJson(partitionKeyDefinitionJson)
val partitionKey = getPartitionKeyValue(pkDefinition, partitionKeyValueJsonArray)
val feedRange = FeedRange
.forLogicalPartition(partitionKey)
.asInstanceOf[FeedRangePartitionKeyImpl]

val effectiveRange = feedRange.getEffectiveRange(pkDefinition)
rangeToNormalizedRange(effectiveRange)
}

private[cosmos] def trySplitFeedRanges
(
cosmosClient: CosmosAsyncClient,
containerName: String,
databaseName: String,
targetedCount: Int
): List[String] = {
val container = cosmosClient
.getDatabase(databaseName)
.getContainer(containerName)

val epkRange = rangeToNormalizedRange(FeedRange.forFullRange().asInstanceOf[FeedRangeEpkImpl].getRange)
val feedRanges = SparkBridgeInternal.trySplitFeedRange(container, epkRange, targetedCount)
var normalizedRanges: List[String] = List()
for (i <- feedRanges.indices) {
normalizedRanges = normalizedRanges :+ s"${feedRanges(i).min}-${feedRanges(i).max}"
}
normalizedRanges
}

val partitionKey = new PartitionKeyBuilder()
private[cosmos] def getFeedRangesForContainer
(
cosmosClient: CosmosAsyncClient,
containerName: String,
databaseName: String
): List[String] = {
val container = cosmosClient
.getDatabase(databaseName)
.getContainer(containerName)

val feedRanges: List[String] = List()
container.getFeedRanges().block.map(feedRange => {
val effectiveRangeFromPk = feedRange.asInstanceOf[FeedRangeEpkImpl].getEffectiveRange(null, null, null).block
val normalizedRange = rangeToNormalizedRange(effectiveRangeFromPk)
s"${normalizedRange.min}-${normalizedRange.max}" :: feedRanges
})
feedRanges
}

def getOverlappingRange(feedRanges: Array[String], pkValue: Object, pkDefinition: PartitionKeyDefinition): String = {
val pk = getPartitionKeyValue(pkDefinition, pkValue)
val feedRangeFromPk = FeedRange.forLogicalPartition(pk).asInstanceOf[FeedRangePartitionKeyImpl]
val effectiveRangeFromPk = feedRangeFromPk.getEffectiveRange(pkDefinition)

for (i <- feedRanges.indices) {
val range = SparkBridgeImplementationInternal.toCosmosRange(feedRanges(i))
if (range.contains(effectiveRangeFromPk.getMin)) {
return feedRanges(i)
}
}
throw new IllegalArgumentException("The partition key value does not belong to any of the feed ranges")
}

private def getPartitionKeyValue(pkDefinition: PartitionKeyDefinition, pkValue: Object): PartitionKey = {
val partitionKey = new PartitionKeyBuilder()
var pk: PartitionKey = null
if (pkDefinition.getKind.equals(PartitionKind.MULTI_HASH)) {
val objectMapper = new ObjectMapper()
val json = partitionKeyValueJsonArray.toString
val json = pkValue.toString
try {
val partitionKeyValues = objectMapper.readValue(json, classOf[Array[String]])
for (value <- partitionKeyValues) {
partitionKey.add(value.trim)
}
partitionKey.build()
val partitionKeyValues = objectMapper.readValue(json, classOf[Array[String]])
for (value <- partitionKeyValues) {
partitionKey.add(value.trim)
}
pk = partitionKey.build()
} catch {
case e: Exception =>
logInfo("Invalid partition key paths: " + json, e)
case e: Exception =>
logInfo("Invalid partition key paths: " + json, e)
}

val feedRange = FeedRange
.forLogicalPartition(partitionKey.build())
.asInstanceOf[FeedRangePartitionKeyImpl]

val pkDefinition = SparkModelBridgeInternal.createPartitionKeyDefinitionFromJson(partitionKeyDefinitionJson)
val effectiveRange = feedRange.getEffectiveRange(pkDefinition)
rangeToNormalizedRange(effectiveRange)
} else if (pkDefinition.getKind.equals(PartitionKind.HASH)) {
pk = new PartitionKey(pkValue)
}
pk
}

def setIoThreadCountPerCoreFactor
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.spark.udf

import com.azure.cosmos.implementation.SparkBridgeImplementationInternal
import com.azure.cosmos.spark.{CosmosClientCache, CosmosClientCacheItem, CosmosClientConfiguration, CosmosConfig, CosmosContainerConfig, CosmosReadConfig, Loan}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.api.java.UDF2

@SerialVersionUID(1L)
class GetFeedRangesForContainer extends UDF2[Map[String, String], Option[Int], Array[String]] {
override def call
(
userProvidedConfig: Map[String, String],
targetedCount: Option[Int]
): Array[String] = {

val effectiveUserConfig = CosmosConfig.getEffectiveConfig(None, None, userProvidedConfig)
var feedRanges = List[String]()
val cosmosContainerConfig: CosmosContainerConfig =
CosmosContainerConfig.parseCosmosContainerConfig(effectiveUserConfig, None, None)
val readConfig = CosmosReadConfig.parseCosmosReadConfig(effectiveUserConfig)
val cosmosClientConfig = CosmosClientConfiguration(
effectiveUserConfig,
useEventualConsistency = readConfig.forceEventualConsistency,
CosmosClientConfiguration.getSparkEnvironmentInfo(SparkSession.getActiveSession))
Loan(
List[Option[CosmosClientCacheItem]](
Some(CosmosClientCache(
cosmosClientConfig,
None,
s"UDF GetFeedRangesForContainer"
))
))
.to(cosmosClientCacheItems => {

if (targetedCount.isEmpty) {
feedRanges = SparkBridgeImplementationInternal.getFeedRangesForContainer(
cosmosClientCacheItems.head.get.cosmosClient,
cosmosContainerConfig.container,
cosmosContainerConfig.database
)
} else {
feedRanges = SparkBridgeImplementationInternal.trySplitFeedRanges(
cosmosClientCacheItems.head.get.cosmosClient,
cosmosContainerConfig.container,
cosmosContainerConfig.database,
targetedCount.get)
}
})
feedRanges.toArray

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.spark.udf

import com.azure.cosmos.implementation.SparkBridgeImplementationInternal
import com.azure.cosmos.models.SparkModelBridgeInternal
import com.azure.cosmos.spark.CosmosPredicates.requireNotNullOrEmpty
import org.apache.spark.sql.api.java.UDF3

@SerialVersionUID(1L)
class GetOverlappingFeedRange extends UDF3[String, Object, Array[String], String] {
override def call
(
partitionKeyDefinitionJson: String,
partitionKeyValue: Object,
targetFeedRanges: Array[String]
): String = {
requireNotNullOrEmpty(partitionKeyDefinitionJson, "partitionKeyDefinitionJson")

val pkDefinition = SparkModelBridgeInternal.createPartitionKeyDefinitionFromJson(partitionKeyDefinitionJson)
SparkBridgeImplementationInternal.getOverlappingRange(targetFeedRanges, partitionKeyValue, pkDefinition)
}
}
Loading

0 comments on commit 005ceb4

Please sign in to comment.