Skip to content

Commit

Permalink
Migrate resutIndex and requestIndex state inconsistency issue (#174)
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <penghuo@gmail.com>
  • Loading branch information
penghuo authored Nov 22, 2023
1 parent e7f4c73 commit be82024
Showing 1 changed file with 7 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ object FlintREPL extends Logging with FlintJobExecutor {
var canPickUpNextStatement = true
while (currentTimeProvider
.currentEpochMillis() - lastActivityTime <= commandContext.inactivityLimitMillis && canPickUpNextStatement) {
logDebug(s"""read from ${commandContext.sessionIndex}""")
logInfo(
s"""read from ${commandContext.sessionIndex}, sessionId: $commandContext.sessionId""")
val flintReader: FlintReader =
createQueryReader(
commandContext.osClient,
Expand Down Expand Up @@ -339,7 +340,7 @@ object FlintREPL extends Logging with FlintJobExecutor {

flintSessionIndexUpdater.upsert(sessionId, serializedFlintInstance)

logDebug(
logInfo(
s"""Updated job: {"jobid": ${flintJob.jobId}, "sessionId": ${flintJob.sessionId}} from $sessionIndex""")
}

Expand Down Expand Up @@ -524,6 +525,9 @@ object FlintREPL extends Logging with FlintJobExecutor {
osClient: OSClient): Unit = {
try {
dataToWrite.foreach(df => writeDataFrameToOpensearch(df, resultIndex, osClient))
// todo. it is migration plan to handle https://github
// .com/opensearch-project/sql/issues/2436. Remove sleep after issue fixed in plugin.
Thread.sleep(2000)
if (flintCommand.isRunning() || flintCommand.isWaiting()) {
// we have set failed state in exception handling
flintCommand.complete()
Expand Down Expand Up @@ -691,7 +695,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
queryWaitTimeMillis)
}

logDebug(s"command complete: $flintCommand")
logInfo(s"command complete: $flintCommand")
(dataToWrite, verificationResult)
}

Expand Down

0 comments on commit be82024

Please sign in to comment.