From ad9b18e1d33b6a5607d7741161973656bc7dd0da Mon Sep 17 00:00:00 2001 From: Kaituo Li Date: Mon, 13 Nov 2023 15:29:19 -0800 Subject: [PATCH] Handle Missing jobStartTime in JSON Deserialization - Added handling for scenarios where jobStartTime is not present in the JSON input. - Ensures FlintInstance deserialization remains robust and error-free even when jobStartTime is missing. Testing: 1. Extended unit tests to cover the new case. 2. Conducted manual sanity tests to ensure stability and correctness. --- .../opensearch/flint/app/FlintInstance.scala | 17 +++++++-- .../flint/app/FlintInstanceTest.scala | 37 +++++++++++++++++++ .../org/apache/spark/sql/FlintREPL.scala | 4 +- 3 files changed, 52 insertions(+), 6 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/app/FlintInstance.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/app/FlintInstance.scala index 52b7d9736..baaca4045 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/app/FlintInstance.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/app/FlintInstance.scala @@ -10,7 +10,7 @@ import java.util.{Map => JavaMap} import scala.collection.JavaConverters._ import scala.collection.mutable -import org.json4s.{Formats, NoTypeHints} +import org.json4s.{Formats, JNothing, JNull, NoTypeHints} import org.json4s.JsonAST.{JArray, JString} import org.json4s.native.JsonMethods.parse import org.json4s.native.Serialization @@ -24,7 +24,7 @@ class FlintInstance( var state: String, val lastUpdateTime: Long, // We need jobStartTime to check if HMAC token is expired or not - val jobStartTime: Long, + val jobStartTime: Long = 0, val excludedJobIds: Seq[String] = Seq.empty[String], val error: Option[String] = None) {} @@ -39,7 +39,10 @@ object FlintInstance { val jobId = (meta \ "jobId").extract[String] val sessionId = (meta \ "sessionId").extract[String] val lastUpdateTime = (meta \ "lastUpdateTime").extract[Long] - val jobStartTime = (meta \ "jobStartTime").extract[Long] + val jobStartTime: Long = meta \ "jobStartTime" match { + case JNothing | JNull => 0L // Default value for missing or null jobStartTime + case value => value.extract[Long] + } // To handle the possibility of excludeJobIds not being present, // we use extractOpt which gives us an Option[Seq[String]]. // If it is not present, it will return None, which we can then @@ -75,7 +78,13 @@ object FlintInstance { val jobId = scalaSource("jobId").asInstanceOf[String] val sessionId = scalaSource("sessionId").asInstanceOf[String] val lastUpdateTime = scalaSource("lastUpdateTime").asInstanceOf[Long] - val jobStartTime = scalaSource("jobStartTime").asInstanceOf[Long] + // Safely extract 'jobStartTime' considering potential null or absence + // Safely extract 'jobStartTime' considering potential null or absence + val jobStartTime: Long = scalaSource.get("jobStartTime") match { + case Some(value: java.lang.Long) => + value.longValue() // Convert java.lang.Long to Scala Long + case _ => 0L // Default value if 'jobStartTime' is null or not present + } // We safely handle the possibility of excludeJobIds being absent or not a list. val excludeJobIds: Seq[String] = scalaSource.get("excludeJobIds") match { diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/app/FlintInstanceTest.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/app/FlintInstanceTest.scala index 31749c794..caf5a84d4 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/app/FlintInstanceTest.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/app/FlintInstanceTest.scala @@ -116,4 +116,41 @@ class FlintInstanceTest extends SparkFunSuite with Matchers { } } + test("deserializeFromMap should handle missing jobStartTime") { + val sourceMap = new JavaHashMap[String, AnyRef]() + sourceMap.put("applicationId", "app1") + sourceMap.put("jobId", "job1") + sourceMap.put("sessionId", "session1") + sourceMap.put("state", "running") + sourceMap.put("lastUpdateTime", java.lang.Long.valueOf(1234567890L)) + // jobStartTime is not added, simulating its absence + sourceMap.put("excludeJobIds", java.util.Arrays.asList("job2", "job3")) + sourceMap.put("error", "An error occurred") + + val result = FlintInstance.deserializeFromMap(sourceMap) + + assert(result.applicationId == "app1") + assert(result.jobId == "job1") + assert(result.sessionId == "session1") + assert(result.state == "running") + assert(result.lastUpdateTime == 1234567890L) + assert(result.jobStartTime == 0L) // Default value for missing jobStartTime + assert(result.excludedJobIds == Seq("job2", "job3")) + assert(result.error.contains("An error occurred")) + } + + test("deserialize should correctly parse a FlintInstance without jobStartTime from JSON") { + val json = + """{"applicationId":"app-123","jobId":"job-456","sessionId":"session-789","state":"RUNNING","lastUpdateTime":1620000000000,"excludeJobIds":["job-101","job-202"]}""" + val instance = FlintInstance.deserialize(json) + + instance.applicationId shouldBe "app-123" + instance.jobId shouldBe "job-456" + instance.sessionId shouldBe "session-789" + instance.state shouldBe "RUNNING" + instance.lastUpdateTime shouldBe 1620000000000L + instance.jobStartTime shouldBe 0L // Default or expected value for missing jobStartTime + instance.excludedJobIds should contain allOf ("job-101", "job-202") + instance.error shouldBe None + } } diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala index e444b71ee..37df70d02 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala @@ -224,7 +224,7 @@ object FlintREPL extends Logging with FlintJobExecutor { val excludeJobIds = confExcludeJobs.split(",").toList // Convert Array to Lis if (excludeJobIds.contains(jobId)) { - // Edge case, current job is excluded, exit the application + logInfo(s"current job is excluded, exit the application") return true } @@ -234,7 +234,7 @@ object FlintREPL extends Logging with FlintJobExecutor { if (source != null) { val existingExcludedJobIds = parseExcludedJobIds(source) if (excludeJobIds.sorted == existingExcludedJobIds.sorted) { - // Edge case, duplicate job running, exit the application + logInfo("duplicate job running, exit the application") return true } }