Skip to content

Commit

Permalink
Rename splitStateToDisk to spillStateToDisk (#1904)
Browse files Browse the repository at this point in the history
* Rename splitStateToDisk to spillStateToDisk

* Add migration

* Rename label

* Review

Co-authored-by: Damian Święcki <dsw@touk.pl>
  • Loading branch information
dswiecki and Damian Święcki authored Jul 14, 2021
1 parent 4c7e101 commit 1aee704
Show file tree
Hide file tree
Showing 16 changed files with 151 additions and 30 deletions.
2 changes: 1 addition & 1 deletion demo/docker/testData/DetectLargeTransactions.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"id" : "DetectLargeTransactions",
"typeSpecificData" : {
"parallelism" : 2,
"splitStateToDisk" : true,
"spillStateToDisk" : true,
"useAsyncInterpretation" : null,
"checkpointIntervalInSeconds" : null,
"type" : "StreamMetaData"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"id" : "DetectLargeTransactions",
"typeSpecificData" : {
"parallelism" : 2,
"splitStateToDisk" : true,
"spillStateToDisk" : true,
"useAsyncInterpretation" : null,
"checkpointIntervalInSeconds" : null,
"type" : "StreamMetaData"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ object ProcessAdditionalFields {
@ConfiguredJsonCodec sealed trait TypeSpecificData

case class StreamMetaData(parallelism: Option[Int] = None,
//we assume it's safer to split state to disk and fix performance than to fix heap problems...
splitStateToDisk: Option[Boolean] = Some(true),
//we assume it's safer to spill state to disk and fix performance than to fix heap problems...
spillStateToDisk: Option[Boolean] = Some(true),
useAsyncInterpretation: Option[Boolean] = None,
checkpointIntervalInSeconds: Option[Long] = None) extends TypeSpecificData {

def checkpointIntervalDuration : Option[Duration]= checkpointIntervalInSeconds.map(Duration.apply(_, TimeUnit.SECONDS))

def shouldUseAsyncInterpretation(implicit defaultValue: DefaultAsyncInterpretationValue) : Boolean = useAsyncInterpretation.getOrElse(defaultValue.value)
Expand All @@ -63,4 +63,4 @@ object StreamMetaData {
}
}

case class StandaloneMetaData(path: Option[String]) extends TypeSpecificData
case class StandaloneMetaData(path: Option[String]) extends TypeSpecificData
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ class DefaultStreamExecutionEnvPreparer(checkpointConfig: Option[CheckpointConfi

configureCheckpoints(env, streamMetaData)

(rocksDBStateBackendConfig, streamMetaData.splitStateToDisk) match {
(rocksDBStateBackendConfig, streamMetaData.spillStateToDisk) match {
case (Some(config), Some(true)) =>
logger.info("Using RocksDB state backend")
configureRocksDBBackend(env, config)
case (None, Some(true)) =>
//TODO: handle non-configured rocksDB more transparently e.g. hide checkbox on FE?
logger.warn("RocksDB not configured, cannot use splitStateToDisk")
logger.warn("RocksDB not configured, cannot use spillStateToDisk")
case _ =>
logger.info("Using default state backend configured by cluster")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class FlinkStreamingProcessRegistrarSpec extends FlatSpec with Matchers with Pro

//TODO: some better check than "it does not crash"?
it should "use rocksDB backend" in {
val process = EspProcess(MetaData("proc1", StreamMetaData(splitStateToDisk = Some(true))),
val process = EspProcess(MetaData("proc1", StreamMetaData(spillStateToDisk = Some(true))),
ExceptionHandlerRef(List.empty),
NonEmptyList.of(GraphBuilder.source("id", "input")
.customNode("custom2", "outRec", "stateCustom", "keyBy" -> "#input.id", "stringVal" -> "'terefere'")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class ProcessMetaDataBuilder private[build](metaData: MetaData) {

//TODO: exception when non-streaming process?
def stateOnDisk(useStateOnDisk: Boolean) =
new ProcessMetaDataBuilder(metaData.copy(typeSpecificData = metaData.typeSpecificData.asInstanceOf[StreamMetaData].copy(splitStateToDisk = Some(useStateOnDisk))))
new ProcessMetaDataBuilder(metaData.copy(typeSpecificData = metaData.typeSpecificData.asInstanceOf[StreamMetaData].copy(spillStateToDisk = Some(useStateOnDisk))))


//TODO: exception when non-standalone process?
Expand Down Expand Up @@ -66,4 +66,4 @@ object StandaloneProcessBuilder {
def id(id: ProcessName) =
new ProcessMetaDataBuilder(MetaData(id.value, StandaloneMetaData(None)))

}
}
10 changes: 5 additions & 5 deletions ui/client/components/graph/node-modal/NodeDetailsContent.js
Original file line number Diff line number Diff line change
Expand Up @@ -403,11 +403,11 @@ export class NodeDetailsContent extends React.Component {
),
this.createField(
"checkbox",
"Should split state to disk",
"typeSpecificProperties.splitStateToDisk",
"Spill state to disk",
"typeSpecificProperties.spillStateToDisk",
false,
[errorValidator(fieldErrors, "splitStateToDisk")],
"splitStateToDisk",
[errorValidator(fieldErrors, "spillStateToDisk")],
"spillStateToDisk",
false,
false,
"split-state-disk",
Expand Down Expand Up @@ -716,7 +716,7 @@ export class NodeDetailsContent extends React.Component {
case "Properties": {
const commonFields = "subprocessVersions"
const fields = this.props.node.typeSpecificProperties.type === "StreamMetaData" ?
["parallelism", "checkpointIntervalInSeconds", "splitStateToDisk", "useAsyncInterpretation"] :
["parallelism", "checkpointIntervalInSeconds", "spillStateToDisk", "useAsyncInterpretation"] :
["path"]
const additionalFields = Object.entries(this.props.additionalPropertiesConfig).map(([fieldName, fieldConfig]) => fieldName)
const exceptionHandlerFields = this.state.editedNode.exceptionHandler.parameters.map(param => param.name)
Expand Down
2 changes: 1 addition & 1 deletion ui/client/cypress/fixtures/testProcess.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"id": "e2e-1612884598459-process-test-process",
"typeSpecificData": {
"parallelism": 1,
"splitStateToDisk": true,
"spillStateToDisk": true,
"useAsyncInterpretation": null,
"checkpointIntervalInSeconds": null,
"type": "StreamMetaData"
Expand Down
8 changes: 4 additions & 4 deletions ui/client/cypress/fixtures/testProcess2.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"id" : "jwl-test",
"typeSpecificData" : {
"parallelism" : 1,
"splitStateToDisk" : true,
"spillStateToDisk" : true,
"useAsyncInterpretation" : null,
"checkpointIntervalInSeconds" : null,
"type" : "StreamMetaData"
Expand All @@ -14,11 +14,11 @@
"groups" : [
],
"properties" : {

}
},
"subprocessVersions" : {

}
},
"exceptionHandlerRef" : {
Expand Down Expand Up @@ -298,4 +298,4 @@
],
"additionalBranches" : [
]
}
}
2 changes: 1 addition & 1 deletion ui/client/cypress/fixtures/variables.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"id" : "variables",
"typeSpecificData" : {
"parallelism" : 1,
"splitStateToDisk" : true,
"spillStateToDisk" : true,
"useAsyncInterpretation" : null,
"checkpointIntervalInSeconds" : null,
"type" : "StreamMetaData"
Expand Down
6 changes: 3 additions & 3 deletions ui/client/cypress/fixtures/withSqlEditor.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"id" : "sql-editor",
"typeSpecificData" : {
"parallelism" : 1,
"splitStateToDisk" : true,
"spillStateToDisk" : true,
"useAsyncInterpretation" : null,
"checkpointIntervalInSeconds" : null,
"type" : "StreamMetaData"
Expand All @@ -19,7 +19,7 @@
}
},
"subprocessVersions" : {

}
},
"exceptionHandlerRef" : {
Expand Down Expand Up @@ -81,4 +81,4 @@
],
"additionalBranches" : [
]
}
}
6 changes: 3 additions & 3 deletions ui/client/cypress/fixtures/withSqlEditor2.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"id" : "e2e-1619718559678-sql-test-process",
"typeSpecificData" : {
"parallelism" : null,
"splitStateToDisk" : true,
"spillStateToDisk" : true,
"useAsyncInterpretation" : null,
"checkpointIntervalInSeconds" : null,
"type" : "StreamMetaData"
Expand All @@ -19,7 +19,7 @@
}
},
"subprocessVersions" : {

}
},
"exceptionHandlerRef" : {
Expand Down Expand Up @@ -103,4 +103,4 @@
}
]
]
}
}
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package db.migration

import io.circe._
import pl.touk.nussknacker.ui.db.migration.ProcessJsonMigration

trait V1_030__SpillStateToDisk extends ProcessJsonMigration {

override def updateProcessJson(jsonProcess: Json): Option[Json] =
V1_030__SpillStateToDisk.renameSpillStateToDisk(jsonProcess)
}

object V1_030__SpillStateToDisk {

final val emptyAdditionalBranches = Json.fromValues(List.empty)

private[migration] def renameSpillStateToDisk(jsonProcess: Json): Option[Json] = {
val typeSpecificDataCursor = jsonProcess.hcursor.downField("metaData").downField("typeSpecificData")
val updatedTypeSpecificData = typeSpecificDataCursor
.withFocus { typeSpecificData =>
val splitStateToDisk = typeSpecificData.hcursor.downField("splitStateToDisk").focus
splitStateToDisk match {
case Some(oldValue) =>
return typeSpecificDataCursor.withFocus { json =>
json.mapObject(_.add("spillStateToDisk", oldValue))
.hcursor.downField("splitStateToDisk").delete.top.get
}.top
case None => typeSpecificData
}
}
updatedTypeSpecificData.top
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package db.migration

import io.circe.Json
import org.scalatest.{FlatSpec, Matchers}
import pl.touk.nussknacker.engine.api.CirceUtil

class V1_030__SpillStateToDiskSpec extends FlatSpec with Matchers {

private lazy val expectedJsonWithSpillStateToDisk = {
val rawJsonString =
"""{
| "metaData": {
| "id": "empty-2",
| "typeSpecificData": {
| "parallelism": 1,
| "spillStateToDisk": true,
| "useAsyncInterpretation": null,
| "checkpointIntervalInSeconds": null,
| "type": "StreamMetaData"
| }
| }
|}
|""".stripMargin
Some(CirceUtil.decodeJsonUnsafe[Json](rawJsonString, "Invalid json string."))
}

private lazy val expectedJsonWithoutSpillStateToDisk = {
val rawJsonString =
"""{
| "metaData": {
| "id": "empty-2",
| "typeSpecificData": {
| "parallelism": 1,
| "useAsyncInterpretation": null,
| "checkpointIntervalInSeconds": null,
| "type": "StreamMetaData"
| }
| }
|}
|""".stripMargin
Some(CirceUtil.decodeJsonUnsafe[Json](rawJsonString, "Invalid json string."))
}

it should "convert exists splitStateToDisk to spillStateToDisk" in {
val rawJsonString =
"""
|{
| "metaData": {
| "id": "empty-2",
| "typeSpecificData": {
| "parallelism": 1,
| "splitStateToDisk": true,
| "useAsyncInterpretation": null,
| "checkpointIntervalInSeconds": null,
| "type": "StreamMetaData"
| }
| }
|}
|""".stripMargin

val oldJson = CirceUtil.decodeJsonUnsafe[Json](rawJsonString, "Invalid json string.")
val converted = V1_030__SpillStateToDisk.renameSpillStateToDisk(oldJson)

converted shouldBe expectedJsonWithSpillStateToDisk
}

it should "do noting when property not specified" in {
val rawJsonString =
"""
|{
| "metaData": {
| "id": "empty-2",
| "typeSpecificData": {
| "parallelism": 1,
| "useAsyncInterpretation": null,
| "checkpointIntervalInSeconds": null,
| "type": "StreamMetaData"
| }
| }
|}
|""".stripMargin

val oldJson = CirceUtil.decodeJsonUnsafe[Json](rawJsonString, "Invalid json string.")
val converted = V1_030__SpillStateToDisk.renameSpillStateToDisk(oldJson)

converted shouldBe expectedJsonWithoutSpillStateToDisk
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class UiProcessMarshallerSpec extends FlatSpec with Matchers {
val processWithPartialAdditionalFields =
s"""
|{
| "metaData" : { "id": "custom", "typeSpecificData": {"type": "StreamMetaData", "parallelism" : 2, "splitStateToDisk" : true }, "additionalFields": {"description": "$someProcessDescription"}},
| "metaData" : { "id": "custom", "typeSpecificData": {"type": "StreamMetaData", "parallelism" : 2, "spillStateToDisk" : true }, "additionalFields": {"description": "$someProcessDescription"}},
| "exceptionHandlerRef" : { "parameters" : [ { "name": "errorsTopic", "expression": { "language": "spel", "expression": "error.topic" }}]},
| "nodes" : [
| {
Expand All @@ -30,7 +30,7 @@ class UiProcessMarshallerSpec extends FlatSpec with Matchers {
val processWithFullAdditionalFields =
s"""
|{
| "metaData" : { "id": "custom", "typeSpecificData": {"type": "StreamMetaData", "parallelism" : 2, "splitStateToDisk" : true }, "isSubprocess": false, "subprocessVersions": {}, "additionalFields": { "description": "$someProcessDescription", "groups": [], "properties": {}} },
| "metaData" : { "id": "custom", "typeSpecificData": {"type": "StreamMetaData", "parallelism" : 2, "spillStateToDisk" : true }, "isSubprocess": false, "subprocessVersions": {}, "additionalFields": { "description": "$someProcessDescription", "groups": [], "properties": {}} },
| "exceptionHandlerRef" : { "parameters" : [ { "name": "errorsTopic", "expression": { "language": "spel", "expression": "error.topic" }}]},
| "nodes" : [
| {
Expand Down

0 comments on commit 1aee704

Please sign in to comment.