Skip to content

Commit

Permalink
Implement REPL mode in Spark and enhance error handling (#99)
Browse files Browse the repository at this point in the history
- **Read (R)**: Source queries from the OpenSearch flint-query-submission index.
- **Execute (E)**: Run queries within the SparkContext environment.
- **Publish (P)**:
  - Push results to the flint-query-result index.
  - Update query state in the flint-query-submission index.
- **Loop (L)**: Continue process until a set exit condition is reached.

Additional improvements:
- Enable cancelation of running statements in Spark.
- Fail statements that wait too long.
- Provide detailed error feedback.
- Introduce query run time metric.

Testing:
- Introduced unit tests.
- Conducted manual tests.

Signed-off-by: Kaituo Li <kaituo@amazon.com>
  • Loading branch information
kaituo authored Oct 24, 2023
1 parent af210ca commit 89a5a3f
Show file tree
Hide file tree
Showing 19 changed files with 1,614 additions and 257 deletions.
12 changes: 11 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,17 @@ lazy val sparkSqlApplication = (project in file("spark-sql-application"))
libraryDependencies ++= Seq(
"org.scalatest" %% "scalatest" % "3.2.15" % "test"),
libraryDependencies ++= deps(sparkVersion),
libraryDependencies += "com.typesafe.play" %% "play-json" % "2.9.2",
libraryDependencies ++= Seq(
"com.typesafe.play" %% "play-json" % "2.9.2",
// handle AmazonS3Exception
"com.amazonaws" % "aws-java-sdk-s3" % "1.12.568" % "provided"
// the transitive jackson.core dependency conflicts with existing scala
// error: Scala module 2.13.4 requires Jackson Databind version >= 2.13.0 and < 2.14.0 -
// Found jackson-databind version 2.14.2
exclude ("com.fasterxml.jackson.core", "jackson-databind"),
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
"org.mockito" %% "mockito-scala" % "1.16.42" % "test",
"org.scalatestplus" %% "mockito-4-6" % "3.2.15.0" % "test"),
// Assembly settings
// the sbt assembly plugin found multiple copies of the module-info.class file with
// different contents in the jars that it was merging flintCore dependencies.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ public interface FlintClient {
* @return {@link FlintWriter}
*/
FlintWriter createWriter(String indexName);

/**
* Create {@link RestHighLevelClient}.
* @return {@link RestHighLevelClient}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package org.opensearch.flint.core.storage;

import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.flint.core.FlintClient;
import org.opensearch.flint.core.FlintClientBuilder;
import org.opensearch.flint.core.FlintOptions;

import java.io.IOException;

public class OpenSearchUpdater {
private final String indexName;

private final FlintClient flintClient;


public OpenSearchUpdater(String indexName, FlintClient flintClient) {
this.indexName = indexName;
this.flintClient = flintClient;
}

public void upsert(String id, String doc) {
// we might need to keep the updater for a long time. Reusing the client may not work as the temporary
// credentials may expire.
// also, failure to close the client causes the job to be stuck in the running state as the client resource
// is not released.
try (RestHighLevelClient client = flintClient.createClient()) {
UpdateRequest
updateRequest =
new UpdateRequest(indexName, id).doc(doc, XContentType.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.docAsUpsert(true);
client.update(updateRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(String.format(
"Failed to execute update request on index: %s, id: %s",
indexName,
id), e);
}
}

public void update(String id, String doc) {
try (RestHighLevelClient client = flintClient.createClient()) {
UpdateRequest
updateRequest =
new UpdateRequest(indexName, id).doc(doc, XContentType.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
client.update(updateRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(String.format(
"Failed to execute update request on index: %s, id: %s",
indexName,
id), e);
}
}

public void updateIf(String id, String doc, long seqNo, long primaryTerm) {
try (RestHighLevelClient client = flintClient.createClient()) {
UpdateRequest
updateRequest =
new UpdateRequest(indexName, id).doc(doc, XContentType.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.setIfSeqNo(seqNo)
.setIfPrimaryTerm(primaryTerm);
client.update(updateRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(String.format(
"Failed to execute update request on index: %s, id: %s",
indexName,
id), e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.app

import org.json4s.{Formats, NoTypeHints}
import org.json4s.JsonAST.JString
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization

class FlintCommand(
var state: String,
val query: String,
// statementId is the statement type doc id
val statementId: String,
val queryId: String,
val submitTime: Long,
var error: Option[String] = None) {
def running(): Unit = {
state = "running"
}

def complete(): Unit = {
state = "success"
}

def fail(): Unit = {
state = "failed"
}

def isRunning(): Boolean = {
state == "running"
}

def isComplete(): Boolean = {
state == "success"
}

def isFailed(): Boolean = {
state == "failed"
}
}

object FlintCommand {

implicit val formats: Formats = Serialization.formats(NoTypeHints)

def deserialize(command: String): FlintCommand = {
val meta = parse(command)
val state = (meta \ "state").extract[String]
val query = (meta \ "query").extract[String]
val statementId = (meta \ "statementId").extract[String]
val queryId = (meta \ "queryId").extract[String]
val submitTime = (meta \ "submitTime").extract[Long]
val maybeError: Option[String] = (meta \ "error") match {
case JString(str) => Some(str)
case _ => None
}

new FlintCommand(state, query, statementId, queryId, submitTime, maybeError)
}

def serialize(flintCommand: FlintCommand): String = {
// we only need to modify state and error
Serialization.write(
Map("state" -> flintCommand.state, "error" -> flintCommand.error.getOrElse("")))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.app

import org.json4s.{Formats, NoTypeHints}
import org.json4s.JsonAST.JString
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization
import org.opensearch.index.seqno.SequenceNumbers

// lastUpdateTime is added to FlintInstance to track the last update time of the instance. Its unit is millisecond.
class FlintInstance(
val applicationId: String,
val jobId: String,
// sessionId is the session type doc id
val sessionId: String,
val state: String,
val lastUpdateTime: Long,
val error: Option[String] = None) {}

object FlintInstance {

implicit val formats: Formats = Serialization.formats(NoTypeHints)

def deserialize(job: String): FlintInstance = {
val meta = parse(job)
val applicationId = (meta \ "applicationId").extract[String]
val state = (meta \ "state").extract[String]
val jobId = (meta \ "jobId").extract[String]
val sessionId = (meta \ "sessionId").extract[String]
val lastUpdateTime = (meta \ "lastUpdateTime").extract[Long]
val maybeError: Option[String] = (meta \ "error") match {
case JString(str) => Some(str)
case _ => None
}

new FlintInstance(applicationId, jobId, sessionId, state, lastUpdateTime, maybeError)
}

def serialize(job: FlintInstance): String = {
Serialization.write(
Map(
"type" -> "session",
"sessionId" -> job.sessionId,
"error" -> job.error.getOrElse(""),
"applicationId" -> job.applicationId,
"jobId" -> job.jobId,
"state" -> job.state,
// update last update time
"lastUpdateTime" -> System.currentTimeMillis()))
}
}
Loading

0 comments on commit 89a5a3f

Please sign in to comment.