diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md index 72ad2af6e22c8..74a0772dbc9be 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md @@ -7,6 +7,7 @@ #### Breaking Changes #### Bugs Fixed +* Fixed an issue when using `ChangeFeed` causing some cosmos partitions to not be fully processed in some cases. - See [PR 42553](https://github.com/Azure/azure-sdk-for-java/pull/42553) #### Other Changes @@ -27,7 +28,7 @@ ### 4.33.0 (2024-06-22) #### Features Added -* Added a service trait `CosmosClinetBuilderInterceptor` to allow intercepting and customizing the CosmosClient creation. - See [PR 40714](https://github.com/Azure/azure-sdk-for-java/pull/40714) +* Added a service trait `CosmosClientBuilderInterceptor` to allow intercepting and customizing the CosmosClient creation. - See [PR 40714](https://github.com/Azure/azure-sdk-for-java/pull/40714) #### Bugs Fixed * Fixed a race condition resulting in not always re-enqueueing retries for bulk writes. - See [PR 40714](https://github.com/Azure/azure-sdk-for-java/pull/40714) diff --git a/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md index c21623a209e5b..7d5311c52c0ad 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md @@ -7,6 +7,7 @@ #### Breaking Changes #### Bugs Fixed +* Fixed an issue when using `ChangeFeed` causing some cosmos partitions to not be fully processed in some cases. - See [PR 42553](https://github.com/Azure/azure-sdk-for-java/pull/42553) #### Other Changes @@ -27,7 +28,7 @@ ### 4.33.0 (2024-06-22) #### Features Added -* Added a service trait `CosmosClinetBuilderInterceptor` to allow intercepting and customizing the CosmosClient creation. - See [PR 40714](https://github.com/Azure/azure-sdk-for-java/pull/40714) +* Added a service trait `CosmosClientBuilderInterceptor` to allow intercepting and customizing the CosmosClient creation. - See [PR 40714](https://github.com/Azure/azure-sdk-for-java/pull/40714) #### Bugs Fixed * Fixed a race condition resulting in not always re-enqueueing retries for bulk writes. - See [PR 40714](https://github.com/Azure/azure-sdk-for-java/pull/40714) diff --git a/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md index e920fd1572f89..f43934ba6d270 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md @@ -7,6 +7,7 @@ #### Breaking Changes #### Bugs Fixed +* Fixed an issue when using `ChangeFeed` causing some cosmos partitions to not be fully processed in some cases. - See [PR 42553](https://github.com/Azure/azure-sdk-for-java/pull/42553) #### Other Changes @@ -27,7 +28,7 @@ ### 4.33.0 (2024-06-22) #### Features Added -* Added a service trait `CosmosClinetBuilderInterceptor` to allow intercepting and customizing the CosmosClient creation. - See [PR 40714](https://github.com/Azure/azure-sdk-for-java/pull/40714) +* Added a service trait `CosmosClientBuilderInterceptor` to allow intercepting and customizing the CosmosClient creation. - See [PR 40714](https://github.com/Azure/azure-sdk-for-java/pull/40714) #### Bugs Fixed * Fixed a race condition resulting in not always re-enqueueing retries for bulk writes. - See [PR 40714](https://github.com/Azure/azure-sdk-for-java/pull/40714) diff --git a/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md index 276adeb27d8ef..f1f0a603d20d3 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md @@ -7,6 +7,7 @@ #### Breaking Changes #### Bugs Fixed +* Fixed an issue when using `ChangeFeed` causing some cosmos partitions to not be fully processed in some cases. - See [PR 42553](https://github.com/Azure/azure-sdk-for-java/pull/42553) #### Other Changes @@ -27,7 +28,7 @@ ### 4.33.0 (2024-06-22) #### Features Added -* Added a service trait `CosmosClinetBuilderInterceptor` to allow intercepting and customizing the CosmosClient creation. - See [PR 40714](https://github.com/Azure/azure-sdk-for-java/pull/40714) +* Added a service trait `CosmosClientBuilderInterceptor` to allow intercepting and customizing the CosmosClient creation. - See [PR 40714](https://github.com/Azure/azure-sdk-for-java/pull/40714) #### Bugs Fixed * Fixed a race condition resulting in not always re-enqueueing retries for bulk writes. - See [PR 40714](https://github.com/Azure/azure-sdk-for-java/pull/40714) diff --git a/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md index 8829b806e7472..d2337d791c4ef 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md @@ -7,6 +7,7 @@ #### Breaking Changes #### Bugs Fixed +* Fixed an issue when using `ChangeFeed` causing some cosmos partitions to not be fully processed in some cases. - See [PR 42553](https://github.com/Azure/azure-sdk-for-java/pull/42553) #### Other Changes @@ -27,7 +28,7 @@ ### 4.33.0 (2024-06-22) #### Features Added -* Added a service trait `CosmosClinetBuilderInterceptor` to allow intercepting and customizing the CosmosClient creation. - See [PR 40714](https://github.com/Azure/azure-sdk-for-java/pull/40714) +* Added a service trait `CosmosClientBuilderInterceptor` to allow intercepting and customizing the CosmosClient creation. - See [PR 40714](https://github.com/Azure/azure-sdk-for-java/pull/40714) #### Bugs Fixed * Fixed a race condition resulting in not always re-enqueueing retries for bulk writes. - See [PR 40714](https://github.com/Azure/azure-sdk-for-java/pull/40714) diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedPartitionReader.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedPartitionReader.scala index c47b3ba11dfb1..e940b037f3bf8 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedPartitionReader.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedPartitionReader.scala @@ -175,6 +175,10 @@ private case class ChangeFeedPartitionReader case ChangeFeedModes.FullFidelity | ChangeFeedModes.AllVersionsAndDeletes => changeFeedItemDeserializerV1 } + if (this.partition.endLsn.isDefined) { + ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper.getCosmosChangeFeedRequestOptionsAccessor + .setEndLSN(options, this.partition.endLsn.get) + } options.setCustomItemSerializer(itemDeserializer) } @@ -224,28 +228,12 @@ private case class ChangeFeedPartitionReader }, readConfig.maxItemCount, readConfig.prefetchBufferSize, - operationContextAndListenerTuple + operationContextAndListenerTuple, + this.partition.endLsn ) override def next(): Boolean = { - this.iterator.hasNext && this.validateNextLsn - } - - private[this] def validateNextLsn: Boolean = { - this.partition.endLsn match { - case None => - // In batch mode endLsn is cleared - we will always continue reading until the change feed is - // completely drained so all partitions return 304 - true - case Some(endLsn) => - // In streaming mode we only continue until we hit the endOffset's continuation Lsn - val node = this.iterator.head() - assert(node.lsn != null, "Change feed responses must have _lsn property.") - assert(node.lsn != "", "Change feed responses must have non empty _lsn.") - val nextLsn = SparkBridgeImplementationInternal.toLsn(node.lsn) - - nextLsn <= endLsn - } + this.iterator.hasNext } override def get(): InternalRow = { diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ItemsPartitionReader.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ItemsPartitionReader.scala index 5feb83d5ccb7e..db30e92e93ea0 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ItemsPartitionReader.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ItemsPartitionReader.scala @@ -250,7 +250,8 @@ private case class ItemsPartitionReader }, readConfig.maxItemCount, readConfig.prefetchBufferSize, - operationContextAndListenerTuple + operationContextAndListenerTuple, + None ) private val rowSerializer: ExpressionEncoder.Serializer[Row] = RowSerializerPool.getOrCreateSerializer(readSchema) diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIterator.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIterator.scala index 9afe51d43f49d..7fb2b2dad0a5c 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIterator.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIterator.scala @@ -2,7 +2,7 @@ // Licensed under the MIT License. package com.azure.cosmos.spark -import com.azure.cosmos.{CosmosException, spark} +import com.azure.cosmos.CosmosException import com.azure.cosmos.implementation.spark.OperationContextAndListenerTuple import com.azure.cosmos.models.FeedResponse import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait @@ -14,9 +14,8 @@ import java.util.concurrent.atomic.{AtomicLong, AtomicReference} import scala.util.Random import scala.util.control.Breaks import scala.concurrent.{Await, ExecutionContext, Future} -import com.azure.cosmos.implementation.OperationCancelledException +import com.azure.cosmos.implementation.{ChangeFeedSparkRowItem, OperationCancelledException, SparkBridgeImplementationInternal} -import scala.concurrent.duration.FiniteDuration // scalastyle:off underscore.import import scala.collection.JavaConverters._ @@ -42,7 +41,8 @@ private class TransientIOErrorsRetryingIterator[TSparkRow] val cosmosPagedFluxFactory: String => CosmosPagedFlux[TSparkRow], val pageSize: Int, val pagePrefetchBufferSize: Int, - val operationContextAndListener: Option[OperationContextAndListenerTuple] + val operationContextAndListener: Option[OperationContextAndListenerTuple], + val endLsn: Option[Long] ) extends BufferedIterator[TSparkRow] with BasicLoggingTrait with AutoCloseable { private[spark] var maxRetryIntervalInMs = CosmosConstants.maxRetryIntervalForTransientFailuresInMs @@ -162,7 +162,7 @@ private class TransientIOErrorsRetryingIterator[TSparkRow] val iteratorCandidate = feedResponse.getResults.iterator().asScala.buffered lastContinuationToken.set(feedResponse.getContinuationToken) - if (iteratorCandidate.hasNext) { + if (iteratorCandidate.hasNext && validateNextLsn(iteratorCandidate)) { currentItemIterator = Some(iteratorCandidate) Some(true) } else { @@ -178,7 +178,7 @@ private class TransientIOErrorsRetryingIterator[TSparkRow] private def hasBufferedNext: Boolean = { currentItemIterator match { - case Some(iterator) => if (iterator.hasNext) { + case Some(iterator) => if (iterator.hasNext && validateNextLsn(iterator)) { true } else { currentItemIterator = None @@ -239,6 +239,27 @@ private class TransientIOErrorsRetryingIterator[TSparkRow] returnValue.get } + private[this] def validateNextLsn(itemIterator: BufferedIterator[TSparkRow]): Boolean = { + this.endLsn match { + case None => + // Only relevant in change feed + // In batch mode endLsn is cleared - we will always continue reading until the change feed is + // completely drained so all partitions return 304 + true + case Some(endLsn) => + // In streaming mode we only continue until we hit the endOffset's continuation Lsn + if (itemIterator.isEmpty) { + return false + } + val node = itemIterator.head.asInstanceOf[ChangeFeedSparkRowItem] + assert(node.lsn != null, "Change feed responses must have _lsn property.") + assert(node.lsn != "", "Change feed responses must have non empty _lsn.") + val nextLsn = SparkBridgeImplementationInternal.toLsn(node.lsn) + + nextLsn <= endLsn + } + } + // Correct way to cancel a flux and dispose it // https://github.com/reactor/reactor-core/blob/main/reactor-core/src/test/java/reactor/core/publisher/scenarios/FluxTests.java#L837 override def close(): Unit = { diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/ChangeFeedPartitionReaderITest.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/ChangeFeedPartitionReaderITest.scala new file mode 100644 index 0000000000000..087f81fd35538 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/ChangeFeedPartitionReaderITest.scala @@ -0,0 +1,315 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.spark + +import com.azure.cosmos.implementation.apachecommons.lang.StringUtils +import com.azure.cosmos.{CosmosAsyncContainer, CosmosException} +import com.azure.cosmos.implementation.{CosmosClientMetadataCachesSnapshot, TestConfigurations, Utils} +import com.azure.cosmos.models.{PartitionKey, ThroughputProperties} +import com.azure.cosmos.spark.diagnostics.DiagnosticsContext +import com.fasterxml.jackson.databind.node.ObjectNode +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types.{StringType, StructField, StructType} + +import java.lang.Thread.sleep +import java.util.{Base64, UUID} +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Random + +class ChangeFeedPartitionReaderITest + extends IntegrationSpec + with Spark + with CosmosContainer + with CosmosClient { + + + "change feed partition reader" should "honor endLSN during split" in { + val cosmosEndpoint = TestConfigurations.HOST + val cosmosMasterKey = TestConfigurations.MASTER_KEY + val testId = UUID.randomUUID().toString + val sourceContainerResponse = cosmosClient.getDatabase(cosmosDatabase).createContainer( + "source_" + testId, + "/sequenceNumber", + ThroughputProperties.createManualThroughput(11000)).block() + val sourceContainer = cosmosClient.getDatabase(cosmosDatabase).getContainer(sourceContainerResponse.getProperties.getId) + val rid = sourceContainerResponse.getProperties.getResourceId + val continuationState = s"""{ + "V": 1, + "Rid": "$rid", + "Mode": "INCREMENTAL", + "StartFrom": { + "Type": "BEGINNING" + }, + "Continuation": { + "V": 1, + "Rid": "$rid", + "Continuation": [ + { + "token": "1", + "range": { + "min": "", + "max": "FF" + } + } + ], + "Range": { + "min": "", + "max": "FF" + } + } +}""" + val encoder = Base64.getEncoder + val encodedBytes = encoder.encode(continuationState.getBytes("UTF-8")) + val continuationStateEncoded = new String(encodedBytes, "UTF-8") + + val changeFeedCfg = Map( + "spark.cosmos.accountEndpoint" -> cosmosEndpoint, + "spark.cosmos.accountKey" -> cosmosMasterKey, + "spark.cosmos.database" -> cosmosDatabase, + "spark.cosmos.container" -> sourceContainer.getId(), + "spark.cosmos.read.inferSchema.enabled" -> "false", + ) + var inputtedDocuments = 10 + var lsn1 = 0L + var lsn2 = 0L + for (_ <- 0 until inputtedDocuments) { + lsn1 = ingestTestDocuments(sourceContainer, 1) + lsn2 = ingestTestDocuments(sourceContainer, 2) + } + inputtedDocuments *= 2 + + while (lsn1 != lsn2) { + if (lsn1 < lsn2) { + lsn1 = ingestTestDocuments(sourceContainer, 1) + } else { + lsn2 = ingestTestDocuments(sourceContainer, 2) + } + inputtedDocuments += 1 + } + + val structs = Array( + StructField("_rawBody", StringType, false), + StructField("_etag", StringType, false), + StructField("_ts", StringType, false), + StructField("id", StringType, false), + StructField("_lsn", StringType, false) + ) + val schema = new StructType(structs) + + val diagnosticContext = DiagnosticsContext(UUID.randomUUID(), "") + val cosmosClientStateHandles = initializeAndBroadcastCosmosClientStatesForContainer(changeFeedCfg) + val diagnosticsConfig = new DiagnosticsConfig(None, false, None) + val cosmosInputPartition = new CosmosInputPartition(NormalizedRange("", "FF"), Some(lsn1), + Some(continuationStateEncoded)) + val changeFeedPartitionReader = new ChangeFeedPartitionReader( + cosmosInputPartition, + changeFeedCfg, + schema, + diagnosticContext, + cosmosClientStateHandles, + diagnosticsConfig, + "" + ) + var count = 0 + + while (changeFeedPartitionReader.next()) { + changeFeedPartitionReader.get() + count += 1 + } + + count shouldEqual inputtedDocuments + } + + + "change feed partition reader" should "honor endLSN during split and should hang" in { + val cosmosEndpoint = TestConfigurations.HOST + val cosmosMasterKey = TestConfigurations.MASTER_KEY + val testId = UUID.randomUUID().toString + val sourceContainerResponse = cosmosClient.getDatabase(cosmosDatabase).createContainer( + "source_" + testId, + "/sequenceNumber", + ThroughputProperties.createManualThroughput(11000)).block() + val sourceContainer = cosmosClient.getDatabase(cosmosDatabase).getContainer(sourceContainerResponse.getProperties.getId) + val rid = sourceContainerResponse.getProperties.getResourceId + val continuationState = s"""{ + "V": 1, + "Rid": "$rid", + "Mode": "INCREMENTAL", + "StartFrom": { + "Type": "BEGINNING" + }, + "Continuation": { + "V": 1, + "Rid": "$rid", + "Continuation": [ + { + "token": "1", + "range": { + "min": "", + "max": "FF" + } + } + ], + "Range": { + "min": "", + "max": "FF" + } + } +}""" + val encoder = Base64.getEncoder + val encodedBytes = encoder.encode(continuationState.getBytes("UTF-8")) + val continuationStateEncoded = new String(encodedBytes, "UTF-8") + + val changeFeedCfg = Map( + "spark.cosmos.accountEndpoint" -> cosmosEndpoint, + "spark.cosmos.accountKey" -> cosmosMasterKey, + "spark.cosmos.database" -> cosmosDatabase, + "spark.cosmos.container" -> sourceContainer.getId(), + "spark.cosmos.read.inferSchema.enabled" -> "false", + ) + var inputtedDocuments = 10 + var lsn1 = 0L + var lsn2 = 0L + for (_ <- 0 until inputtedDocuments) { + lsn1 = ingestTestDocuments(sourceContainer, 1) + lsn2 = ingestTestDocuments(sourceContainer, 2) + } + inputtedDocuments *= 2 + + val structs = Array( + StructField("_rawBody", StringType, false), + StructField("_etag", StringType, false), + StructField("_ts", StringType, false), + StructField("id", StringType, false), + StructField("_lsn", StringType, false) + ) + val schema = new StructType(structs) + + val diagnosticContext = DiagnosticsContext(UUID.randomUUID(), "") + val cosmosClientStateHandles = initializeAndBroadcastCosmosClientStatesForContainer(changeFeedCfg) + val diagnosticsConfig = new DiagnosticsConfig(None, false, None) + val cosmosInputPartition = new CosmosInputPartition(NormalizedRange("", "FF"), Some(Math.max(lsn1, lsn2) + 1) + , Some(continuationStateEncoded)) + val changeFeedPartitionReader = new ChangeFeedPartitionReader( + cosmosInputPartition, + changeFeedCfg, + schema, + diagnosticContext, + cosmosClientStateHandles, + diagnosticsConfig, + "" + ) + var count = 0 + implicit val ec: ExecutionContext = ExecutionContext.global + val future = Future { + while (changeFeedPartitionReader.next()) { + changeFeedPartitionReader.get() + count += 1 + } + } + sleep(2000) + future.isCompleted shouldEqual false + while (lsn1 != lsn2) { + if (lsn1 < lsn2) { + lsn1 = ingestTestDocuments(sourceContainer, 1) + } else { + lsn2 = ingestTestDocuments(sourceContainer, 2) + } + inputtedDocuments += 1 + } + for (_ <- 0 until 15) { + ingestTestDocuments(sourceContainer, Random.nextInt()) + } + future.isCompleted shouldEqual true + count shouldEqual inputtedDocuments + 2 + + } + + private[this] def initializeAndBroadcastCosmosClientStatesForContainer(config: Map[String, String]) + : Broadcast[CosmosClientMetadataCachesSnapshots] = { + val userConfig = CosmosConfig.getEffectiveConfig(None, None, config) + val effectiveUserConfig = CosmosConfig.getEffectiveConfig(None, None, userConfig) + val cosmosContainerConfig = CosmosContainerConfig.parseCosmosContainerConfig(effectiveUserConfig) + val readConfig = CosmosReadConfig.parseCosmosReadConfig(effectiveUserConfig) + val cosmosClientConfig = CosmosClientConfiguration( + effectiveUserConfig, + useEventualConsistency = readConfig.forceEventualConsistency, + "") + val calledFrom = s"ChangeFeedPartitionReaderTest.initializeAndBroadcastCosmosClientStateForContainer" + Loan( + List[Option[CosmosClientCacheItem]]( + Some(CosmosClientCache( + cosmosClientConfig, + None, + calledFrom)), + ThroughputControlHelper.getThroughputControlClientCacheItem( + effectiveUserConfig, + calledFrom, + None, + "" + ) + )) + .to(clientCacheItems => { + val container = + ThroughputControlHelper.getContainer( + effectiveUserConfig, + cosmosContainerConfig, + clientCacheItems(0).get, + clientCacheItems(1)) + + try { + container.readItem( + UUID.randomUUID().toString, new PartitionKey(UUID.randomUUID().toString), classOf[ObjectNode]) + .block() + } catch { + case _: CosmosException => + } + + val state = new CosmosClientMetadataCachesSnapshot() + state.serialize(clientCacheItems(0).get.cosmosClient) + + var throughputControlState: Option[CosmosClientMetadataCachesSnapshot] = None + if (clientCacheItems(1).isDefined) { + throughputControlState = Some(new CosmosClientMetadataCachesSnapshot()) + throughputControlState.get.serialize(clientCacheItems(1).get.cosmosClient) + } + + val metadataSnapshots = CosmosClientMetadataCachesSnapshots(state, throughputControlState) + val sparkSession = SparkSession.active + sparkSession.sparkContext.broadcast(metadataSnapshots) + }) + } + + + private[this] def ingestTestDocuments + ( + container: CosmosAsyncContainer, + sequenceNumber: Int + ): Long = { + val id = UUID.randomUUID().toString + val objectNode = Utils.getSimpleObjectMapper.createObjectNode() + objectNode.put("name", "Shrodigner's cat") + objectNode.put("type", "cat") + objectNode.put("age", 20) + objectNode.put("sequenceNumber", sequenceNumber) + objectNode.put("id", id) + val response = container.createItem(objectNode).block() + + getLSN(response.getSessionToken) + } + + private[this] def getLSN(sessionToken: String): Long = { + val parsedSessionToken = sessionToken.substring(sessionToken.indexOf(":")) + val segments = StringUtils.split(parsedSessionToken, "#") + var latestLsn = segments(0) + if (segments.length >= 2) { + // default to Global LSN + latestLsn = segments(1) + } + + latestLsn.toLong + } + +} diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIteratorITest.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIteratorITest.scala index 24aba6734888a..1eea43a57daaa 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIteratorITest.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIteratorITest.scala @@ -112,6 +112,7 @@ class TransientIOErrorsRetryingIteratorITest }, 2, Queues.XS_BUFFER_SIZE, + None, None ) retryingIterator.maxRetryIntervalInMs = 5 @@ -253,6 +254,7 @@ class TransientIOErrorsRetryingIteratorITest }, 2, Queues.XS_BUFFER_SIZE, + None, None ) retryingIterator.maxRetryIntervalInMs = 5 diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIteratorSpec.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIteratorSpec.scala index cfa95b7083df5..fbb7059052b8b 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIteratorSpec.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIteratorSpec.scala @@ -44,6 +44,7 @@ class TransientIOErrorsRetryingIteratorSpec extends UnitSpec with BasicLoggingTr continuationToken, pageCount, transientErrorCount, injectEmptyPages = false, injectedDelayOfFirstPage = None), pageSize, 1, + None, None ) iterator.maxRetryIntervalInMs = 5 @@ -63,6 +64,7 @@ class TransientIOErrorsRetryingIteratorSpec extends UnitSpec with BasicLoggingTr continuationToken, pageCount, transientErrorCount, injectEmptyPages = true, injectedDelayOfFirstPage = None), pageSize, 1, + None, None ) iterator.maxRetryIntervalInMs = 5 @@ -82,6 +84,7 @@ class TransientIOErrorsRetryingIteratorSpec extends UnitSpec with BasicLoggingTr continuationToken, pageCount, transientErrorCount, injectEmptyPages = false, injectedDelayOfFirstPage = Some(Duration.ofSeconds(70))), pageSize, 1, + None, None ) iterator.maxRetryIntervalInMs = 5 diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java index 00d76823b2f4e..134a6a7e09d08 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java @@ -7,6 +7,7 @@ package com.azure.cosmos; import com.azure.cosmos.implementation.DocumentCollection; +import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.RetryAnalyzer; import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState; @@ -65,6 +66,10 @@ import java.util.Map; import java.util.Queue; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -97,6 +102,38 @@ public static Object[][] changeFeedQueryCompleteAfterAvailableNowDataProvider() }; } + @DataProvider(name = "changeFeedQueryEndLSNDataProvider") + public static Object[][] changeFeedQueryEndLSNDataProvider() { + return new Object[][]{ + // container RU, continuous ingest items, partition count + // number of docs from cf, documents to write + + // endLSN is less than number of documents + { 400, true, 1, 3, 6}, + { 400, false, 1, 3, 6}, + // endLSN is equal to number of documents + { 400, false, 1, 3, 3}, + // both partitions have more than the endLSN + { 11000, true, 5, 6, 30}, + { 11000, false, 5, 6, 30}, + }; + } + + @DataProvider(name = "changeFeedQueryEndLSNHangDataProvider") + public static Object[][] changeFeedQueryEndLSNHangDataProvider() { + return new Object[][]{ + // container RU, partition count + // number of docs from cf, documents to write + + // endLSN is greater than number of documents + { 400, 1, 3, 2}, + // 2 partitions but only write to one, so + // that the other partition stops on 304 + { 11000, 1, 6, 6}, + { 11000, 1, 6, 6}, + }; + } + @DataProvider(name = "changeFeedSplitHandlingDataProvider") public static Object[][] changeFeedSplitHandlingDataProvider() { return new Object[][]{ @@ -115,14 +152,14 @@ public CosmosContainerChangeFeedTest(CosmosClientBuilder clientBuilder) { super(clientBuilder); } - @AfterClass(groups = { "emulator" }, timeOut = 3 * SHUTDOWN_TIMEOUT, alwaysRun = true) + @AfterClass(groups = { "emulator", "fast" }, timeOut = 3 * SHUTDOWN_TIMEOUT, alwaysRun = true) public void afterClass() { logger.info("starting ...."); safeDeleteSyncDatabase(createdDatabase); safeCloseSyncClient(client); } - @AfterMethod(groups = { "emulator" }) + @AfterMethod(groups = { "emulator", "fast" }) public void afterTest() throws Exception { if (this.createdContainer != null) { try { @@ -135,14 +172,14 @@ public void afterTest() throws Exception { } } - @BeforeMethod(groups = { "emulator" }) + @BeforeMethod(groups = { "emulator", "fast" }) public void beforeTest() throws Exception { this.createdContainer = null; this.createdAsyncContainer = null; this.partitionKeyToDocuments.clear(); } - @BeforeClass(groups = { "emulator" }, timeOut = SETUP_TIMEOUT) + @BeforeClass(groups = { "emulator", "fast" }, timeOut = SETUP_TIMEOUT) public void before_CosmosContainerTest() { client = getClientBuilder().buildClient(); createdDatabase = createSyncDatabase(client, preExistingDatabaseId); @@ -843,6 +880,109 @@ public void split_only_notModified() throws Exception { assertThat(stateAfterLastDrainAttempt.getContinuation().getCompositeContinuationTokens()).hasSize(3); } + @Test(groups = { "fast" }, dataProvider = "changeFeedQueryEndLSNDataProvider", timeOut = 100 * TIMEOUT) + public void changeFeedQueryCompleteAfterEndLSN( + int throughput, + boolean shouldContinuouslyIngestItems, + int partitionCount, + int expectedDocs, + int docsToWrite) { + String testContainerId = UUID.randomUUID().toString(); + + try { + CosmosContainerProperties containerProperties = new CosmosContainerProperties(testContainerId, "/mypk"); + CosmosAsyncContainer testContainer = + createCollection( + this.createdAsyncDatabase, + containerProperties, + new CosmosContainerRequestOptions(), + throughput); + + List feedRanges = testContainer.getFeedRanges().block(); + AtomicInteger currentPageCount = new AtomicInteger(0); + + List partitionKeys = insertDocumentsCore(partitionCount, docsToWrite, testContainer); + CosmosChangeFeedRequestOptions cosmosChangeFeedRequestOptions = + CosmosChangeFeedRequestOptions.createForProcessingFromBeginning(FeedRange.forFullRange()); + ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper.getCosmosChangeFeedRequestOptionsAccessor() + .setEndLSN(cosmosChangeFeedRequestOptions, 4L); + cosmosChangeFeedRequestOptions.setMaxPrefetchPageCount(8); + + AtomicInteger totalQueryCount = new AtomicInteger(0); + testContainer.queryChangeFeed(cosmosChangeFeedRequestOptions, JsonNode.class) + .byPage(1) + .flatMap(response -> { + int currentPage = currentPageCount.incrementAndGet(); + totalQueryCount.set(totalQueryCount.get() + response.getResults().size()); + + // Only start creating new items once we have looped through all feedRanges once to make the test behavior more deterministic + if (shouldContinuouslyIngestItems && currentPage >= feedRanges.size()) { + // Only keep adding to partitions that already have items in them + // again to make the test behavior more deterministic + return testContainer + .createItem(getDocumentDefinition(partitionKeys.get(currentPage % 1))).then(); + } else { + return Mono.empty(); + } + }) + .blockLast(); + assertThat(totalQueryCount.get()).isEqualTo(expectedDocs); + } finally { + safeDeleteCollection(this.createdAsyncDatabase.getContainer(testContainerId)); + } + } + + @Test(groups = { "fast" }, dataProvider = "changeFeedQueryEndLSNHangDataProvider", timeOut = 100 * TIMEOUT) + public void changeFeedQueryCompleteAfterEndLSNHang( + int throughput, + int partitionCount, + int expectedDocs, + int docsToWrite) throws InterruptedException { + String testContainerId = UUID.randomUUID().toString(); + + try { + CosmosContainerProperties containerProperties = new CosmosContainerProperties(testContainerId, "/mypk"); + CosmosAsyncContainer testContainer = + createCollection( + this.createdAsyncDatabase, + containerProperties, + new CosmosContainerRequestOptions(), + throughput); + + + insertDocuments(partitionCount, docsToWrite, testContainer); + CosmosChangeFeedRequestOptions cosmosChangeFeedRequestOptions = + CosmosChangeFeedRequestOptions.createForProcessingFromBeginning(FeedRange.forFullRange()); + ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper.getCosmosChangeFeedRequestOptionsAccessor() + .setEndLSN(cosmosChangeFeedRequestOptions, 4L); + cosmosChangeFeedRequestOptions.setMaxPrefetchPageCount(8); + + AtomicInteger totalQueryCount = new AtomicInteger(0); + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + + Future future = CompletableFuture.runAsync(() -> { + testContainer.queryChangeFeed(cosmosChangeFeedRequestOptions, JsonNode.class) + .byPage(1) + .flatMap(response -> { + totalQueryCount.set(totalQueryCount.get() + response.getResults().size()); + + return Mono.empty(); + }).blockLast(); + }, scheduler); + + Thread.sleep(2000); + assertThat(future.isDone()).isFalse(); + + insertDocuments(10, 25, testContainer); + assertThat(future.isDone()).isTrue(); + + + assertThat(totalQueryCount.get()).isEqualTo(expectedDocs); + } finally { + safeDeleteCollection(this.createdAsyncDatabase.getContainer(testContainerId)); + } + } + @Test(groups = { "emulator" }, dataProvider = "changeFeedQueryCompleteAfterAvailableNowDataProvider", timeOut = 100 * TIMEOUT) public void changeFeedQueryCompleteAfterAvailableNow( int throughput, @@ -900,14 +1040,23 @@ void insertDocuments( int partitionCount, int documentCount, CosmosAsyncContainer container) { + insertDocumentsCore(partitionCount, documentCount, container); + } + + List insertDocumentsCore( + int partitionCount, + int documentCount, + CosmosAsyncContainer container) { List docs = new ArrayList<>(); + List partitionKeys = new ArrayList<>(); for (int i = 0; i < partitionCount; i++) { String partitionKey = UUID.randomUUID().toString(); for (int j = 0; j < documentCount; j++) { docs.add(getDocumentDefinition(partitionKey)); } + partitionKeys.add(partitionKey); } ArrayList>> result = new ArrayList<>(); @@ -916,16 +1065,17 @@ void insertDocuments( } List insertedDocs = Flux.merge( - Flux.fromIterable(result), - 2) - .map(CosmosItemResponse::getItem).collectList().block(); + Flux.fromIterable(result), + 2) + .map(CosmosItemResponse::getItem).collectList().block(); for (ObjectNode doc : insertedDocs) { partitionKeyToDocuments.put( - doc.get(PARTITION_KEY_FIELD_NAME).textValue(), - doc); + doc.get(PARTITION_KEY_FIELD_NAME).textValue(), + doc); } logger.info("FINISHED INSERT"); + return partitionKeys; } void deleteDocuments( diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java index 1ee45b8830fd1..919885866e368 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java @@ -114,6 +114,10 @@ public Flux> executeAsync() { this.options.getMaxPrefetchPageCount(), ModelBridgeInternal.getChangeFeedIsSplitHandlingDisabled(this.options), this.options.isCompleteAfterAllCurrentChangesRetrieved(), + ImplementationBridgeHelpers + .CosmosChangeFeedRequestOptionsHelper + .getCosmosChangeFeedRequestOptionsAccessor() + .getEndLSN(this.options), ImplementationBridgeHelpers .CosmosChangeFeedRequestOptionsHelper .getCosmosChangeFeedRequestOptionsAccessor() diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java index c887021e557de..4bbdf764cce38 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java @@ -49,6 +49,7 @@ public final class CosmosChangeFeedRequestOptionsImpl implements OverridableRequ private String collectionRid; private Set keywordIdentifiers; private boolean completeAfterAllCurrentChangesRetrieved; + private Long endLSN; public CosmosChangeFeedRequestOptionsImpl(CosmosChangeFeedRequestOptionsImpl toBeCloned) { this.continuationState = toBeCloned.continuationState; @@ -70,6 +71,7 @@ public CosmosChangeFeedRequestOptionsImpl(CosmosChangeFeedRequestOptionsImpl toB this.partitionKeyDefinition = toBeCloned.partitionKeyDefinition; this.keywordIdentifiers = toBeCloned.keywordIdentifiers; this.completeAfterAllCurrentChangesRetrieved = toBeCloned.completeAfterAllCurrentChangesRetrieved; + this.endLSN = toBeCloned.endLSN; } public CosmosChangeFeedRequestOptionsImpl( @@ -363,6 +365,14 @@ public Set getKeywordIdentifiers() { return this.keywordIdentifiers; } + public void setEndLSN(Long endLSN) { + this.endLSN = endLSN; + } + + public Long getEndLSN() { + return endLSN; + } + public boolean isCompleteAfterAllCurrentChangesRetrieved() { return this.completeAfterAllCurrentChangesRetrieved; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java index c9b374ddad8aa..b34cf1b659134 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java @@ -29,7 +29,6 @@ import com.azure.cosmos.SessionRetryOptions; import com.azure.cosmos.ThroughputControlGroupConfig; import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair; -import com.azure.cosmos.implementation.batch.BulkExecutorDiagnosticsTracker; import com.azure.cosmos.implementation.batch.ItemBatchOperation; import com.azure.cosmos.implementation.batch.PartitionScopeThresholds; import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry; @@ -82,7 +81,6 @@ import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Scheduler; import java.net.URI; import java.time.Duration; @@ -91,7 +89,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -376,6 +373,8 @@ public interface CosmosChangeFeedRequestOptionsAccessor { CosmosChangeFeedRequestOptions setHeader(CosmosChangeFeedRequestOptions changeFeedRequestOptions, String name, String value); Map getHeader(CosmosChangeFeedRequestOptions changeFeedRequestOptions); CosmosChangeFeedRequestOptionsImpl getImpl(CosmosChangeFeedRequestOptions changeFeedRequestOptions); + CosmosChangeFeedRequestOptions setEndLSN(CosmosChangeFeedRequestOptions changeFeedRequestOptions, Long endLsn); + Long getEndLSN(CosmosChangeFeedRequestOptions changeFeedRequestOptions); void setOperationContext(CosmosChangeFeedRequestOptions changeFeedRequestOptions, OperationContextAndListenerTuple operationContext); OperationContextAndListenerTuple getOperationContext(CosmosChangeFeedRequestOptions changeFeedRequestOptions); CosmosDiagnosticsThresholds getDiagnosticsThresholds(CosmosChangeFeedRequestOptions options); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeCompositeContinuationImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeCompositeContinuationImpl.java index a0a65f5d74d32..ab266ff6bd340 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeCompositeContinuationImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeCompositeContinuationImpl.java @@ -269,11 +269,13 @@ public ShouldRetryResult handleChangeFeedNotModified(final FeedResponse r } @Override - public boolean hasFetchedAllChangesAvailableNow(FeedResponse response) { + public boolean hasFetchedAllChanges(FeedResponse responseMessage, Long endLSN) { + long lastestLSNFromSessionToken = this.getLatestLsnFromSessionToken(responseMessage.getSessionToken()); + long lsn = endLSN != null ? endLSN : lastestLSNFromSessionToken; FeedRangeLSNContext feedRangeLSNContext = this.updateFeedRangeEndLSNIfAbsent( this.currentToken.getRange(), - response.getSessionToken()); + lsn); feedRangeLSNContext.handleLSNFromContinuation(this.currentToken); // find next token which can fetch more @@ -282,9 +284,9 @@ public boolean hasFetchedAllChangesAvailableNow(FeedResponse response) { this.moveToNextToken(); } while ( !this.currentToken.getRange().equals(initialToken) && - this.hasFetchAllChangesAvailableNowForFeedRange(this.currentToken.getRange())); + this.hasFetchedAllChangesForFeedRange(this.currentToken.getRange())); - return this.hasFetchAllChangesAvailableNowForFeedRange(this.currentToken.getRange()); + return this.hasFetchedAllChangesForFeedRange(this.currentToken.getRange()); } @Override @@ -340,18 +342,18 @@ private Long getLatestLsnFromSessionToken(String sessionToken) { private FeedRangeLSNContext updateFeedRangeEndLSNIfAbsent( Range targetedRange, - String sessionToken) { + long endLSN) { return this.feedRangeLSNContextMap.computeIfAbsent( targetedRange, (range) -> { return new FeedRangeLSNContext( targetedRange, - this.getLatestLsnFromSessionToken(sessionToken) + endLSN ); }); } - private boolean hasFetchAllChangesAvailableNowForFeedRange(Range range) { + private boolean hasFetchedAllChangesForFeedRange(Range range) { return this.feedRangeLSNContextMap.containsKey(range) && this.feedRangeLSNContextMap.get(range).hasCompleted; } @@ -597,7 +599,7 @@ public FeedRangeLSNContext(Range range, Long endLSN) { public void handleLSNFromContinuation(CompositeContinuationToken compositeContinuationToken) { if (!compositeContinuationToken.getRange().equals(this.range)) { throw new IllegalStateException( - "Range in FeedRangeAvailableNowContext is different than the range in the continuationToken"); + "Range in FeedRangeLSNContext is different than the range in the continuationToken"); } String lsnFromContinuationToken = compositeContinuationToken.getToken(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeContinuation.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeContinuation.java index 1d0d67e6dc4a3..e015d91ec2769 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeContinuation.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeContinuation.java @@ -136,8 +136,8 @@ public static FeedRangeContinuation create( public abstract ShouldRetryResult handleChangeFeedNotModified( FeedResponse responseMessage); - public abstract boolean hasFetchedAllChangesAvailableNow( - FeedResponse responseMessage); + public abstract boolean hasFetchedAllChanges( + FeedResponse responseMessage, Long endLSN); public abstract Mono handleFeedRangeGone( RxDocumentClientImpl client, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java index ff1b37acad6cc..0ab2f47917029 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java @@ -43,6 +43,7 @@ class ChangeFeedFetcher extends Fetcher { private final Supplier createRequestFunc; private final Supplier feedRangeContinuationRetryPolicySupplier; private final boolean completeAfterAllCurrentChangesRetrieved; + private final Long endLSN; public ChangeFeedFetcher( RxDocumentClientImpl client, @@ -54,6 +55,7 @@ public ChangeFeedFetcher( int maxItemCount, boolean isSplitHandlingDisabled, boolean completeAfterAllCurrentChangesRetrieved, + Long endLSN, OperationContextAndListenerTuple operationContext, GlobalEndpointManager globalEndpointManager, GlobalPartitionEndpointManagerForCircuitBreaker globalPartitionEndpointManagerForCircuitBreaker) { @@ -79,6 +81,7 @@ public ChangeFeedFetcher( isSplitHandlingDisabled); this.createRequestFunc = createRequestFunc; this.completeAfterAllCurrentChangesRetrieved = completeAfterAllCurrentChangesRetrieved; + this.endLSN = endLSN; } @Override @@ -115,10 +118,11 @@ private Mono> nextPageInternal(DocumentClientRetryPolicy retryPo FeedRangeContinuation continuationSnapshot = this.changeFeedState.getContinuation(); - if (this.completeAfterAllCurrentChangesRetrieved) { + if (this.completeAfterAllCurrentChangesRetrieved || this.endLSN != null) { if (continuationSnapshot != null) { - //track the end-LSN available now for each sub-feedRange and then find the next sub-feedRange to fetch more changes - boolean shouldComplete = continuationSnapshot.hasFetchedAllChangesAvailableNow(r); + + //track the end-LSN for each sub-feedRange and then find the next sub-feedRange to fetch more changes + boolean shouldComplete = continuationSnapshot.hasFetchedAllChanges(r, endLSN); if (shouldComplete) { this.disableShouldFetchMore(); return Mono.just(r); @@ -155,7 +159,7 @@ protected String applyServerResponseContinuation( FeedResponse response) { boolean isNoChanges = feedResponseAccessor.getNoChanges(response); - boolean shouldMoveToNextTokenOnETagReplace = !isNoChanges && !this.completeAfterAllCurrentChangesRetrieved; + boolean shouldMoveToNextTokenOnETagReplace = !isNoChanges && !this.completeAfterAllCurrentChangesRetrieved && this.endLSN == null; return this.changeFeedState.applyServerResponseContinuation( serverContinuationToken, request, shouldMoveToNextTokenOnETagReplace); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java index c2b300350ed56..b41f6cfb1cdfb 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java @@ -98,6 +98,7 @@ public static Flux> getChangeFeedQueryResultAsObservable( int preFetchCount, boolean isSplitHandlingDisabled, boolean completeAfterAllCurrentChangesRetrieved, + Long endLsn, OperationContextAndListenerTuple operationContext) { return getPaginatedQueryResultAsObservable( @@ -111,6 +112,7 @@ public static Flux> getChangeFeedQueryResultAsObservable( maxPageSize, isSplitHandlingDisabled, completeAfterAllCurrentChangesRetrieved, + endLsn, operationContext, client.getGlobalEndpointManager(), client.getGlobalPartitionEndpointManagerForCircuitBreaker() diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java index 48f14c241d9b3..a89496554660f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java @@ -205,6 +205,16 @@ public boolean isCompleteAfterAllCurrentChangesRetrieved() { return this.actualRequestOptions.isCompleteAfterAllCurrentChangesRetrieved(); } + // This will override setCompleteAfterAllCurrentChangesRetrieved if both used together + CosmosChangeFeedRequestOptions setEndLSN(Long endLsn) { + this.actualRequestOptions.setEndLSN(endLsn); + return this; + } + + Long getEndLSN() { + return this.actualRequestOptions.getEndLSN(); + } + boolean isSplitHandlingDisabled() { return this.actualRequestOptions.isSplitHandlingDisabled(); } @@ -623,6 +633,16 @@ public CosmosChangeFeedRequestOptionsImpl getImpl(CosmosChangeFeedRequestOptions return changeFeedRequestOptions.getImpl(); } + @Override + public CosmosChangeFeedRequestOptions setEndLSN(CosmosChangeFeedRequestOptions changeFeedRequestOptions, Long endLsn) { + return changeFeedRequestOptions.setEndLSN(endLsn); + } + + @Override + public Long getEndLSN(CosmosChangeFeedRequestOptions changeFeedRequestOptions) { + return changeFeedRequestOptions.getEndLSN(); + } + @Override public void setOperationContext (