Skip to content

Commit

Permalink
Implement blue/green deployment support in REPL mode
Browse files Browse the repository at this point in the history
Features:
- Add mutual exclusivity in session-based emr-s jobs to ensure only one job runs at a time, enhancing system stability during blue/green deployments. This allows active job exclusion and seamless task pickup by new jobs. Details in #94.

Fixes:
- Resolve a bug where long-running queries failed to cancel post-index mapping verification, by introducing timely query cancellation checks within the REPL loop.
- Address issue #138 with the proposed short-term fix, improving reliability.

Tests:
- Conducted manual testing to validate blue/green deployment support and query cancellation.
- Extended unit tests to cover new features and bug fixes.

Signed-off-by: Kaituo Li <kaituo@amazon.com>
  • Loading branch information
kaituo committed Nov 8, 2023
1 parent 19ad190 commit 655075f
Show file tree
Hide file tree
Showing 7 changed files with 753 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ class FlintCommand(
def isFailed(): Boolean = {
state == "failed"
}

def isWaiting(): Boolean = {
state == "waiting"
}

override def toString: String = {
s"FlintCommand(state=$state, query=$query, statementId=$statementId, queryId=$queryId, submitTime=$submitTime, error=$error)"
}
}

object FlintCommand {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ object FlintInstance {
}

def serialize(job: FlintInstance): String = {
// jobId is only readable by spark, thus we don't override jobId
Serialization.write(
Map(
"type" -> "session",
"sessionId" -> job.sessionId,
"error" -> job.error.getOrElse(""),
"applicationId" -> job.applicationId,
"jobId" -> job.jobId,
"state" -> job.state,
// update last update time
"lastUpdateTime" -> System.currentTimeMillis()))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.apache.spark.sql

import scala.concurrent.{ExecutionContextExecutor, Future}

import org.opensearch.flint.core.storage.{FlintReader, OpenSearchUpdater}

case class CommandContext(
flintReader: FlintReader,
spark: SparkSession,
dataSource: String,
resultIndex: String,
sessionId: String,
futureMappingCheck: Future[Either[String, Unit]],
executionContext: ExecutionContextExecutor,
flintSessionIndexUpdater: OpenSearchUpdater,
osClient: OSClient,
sessionIndex: String,
jobId: String)
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.apache.spark.sql

case class CommandState(
recordedLastActivityTime: Long,
recordedVerificationResult: VerificationResult)
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,8 @@ trait FlintJobExecutor {
sessionId: String): DataFrame = {
// Execute SQL query
val startTime = System.currentTimeMillis()
// we have to set job group in the same thread that started the query according to spark doc
spark.sparkContext.setJobGroup(queryId, "Job group for " + queryId, interruptOnCancel = true)
val result: DataFrame = spark.sql(query)
// Get Data
getFormattedData(
Expand Down Expand Up @@ -378,7 +380,7 @@ trait FlintJobExecutor {
case r: SparkException =>
handleQueryException(
r,
"Fail to run query. Cause",
"Spark exception. Cause",
spark,
dataSource,
query,
Expand All @@ -387,7 +389,7 @@ trait FlintJobExecutor {
case r: Exception =>
handleQueryException(
r,
"Fail to write result, cause",
"Fail to run query, cause",
spark,
dataSource,
query,
Expand Down
Loading

0 comments on commit 655075f

Please sign in to comment.