Skip to content

Commit

Permalink
Scenario labels in metadata (#6935)
Browse files Browse the repository at this point in the history
* Scenario labels in metadata

* Fixes

* Fixes

* Fixes

* Process version decoder

* Updated snapshots (#6936)

Co-authored-by: mateuszkp96 <26696870+mateuszkp96@users.noreply.github.com>

* Tests & changelog

* Review fixes

* Updated snapshots (#6951)

Co-authored-by: mateuszkp96 <26696870+mateuszkp96@users.noreply.github.com>

---------

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: mateuszkp96 <26696870+mateuszkp96@users.noreply.github.com>
  • Loading branch information
3 people authored Sep 26, 2024
1 parent ab4f210 commit a2e3279
Show file tree
Hide file tree
Showing 118 changed files with 855 additions and 443 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,20 @@ class InterpreterSetup[T: ClassTag] {
process: CanonicalProcess,
additionalComponents: List[ComponentDefinition]
): (Context, ServiceExecutionContext) => F[List[Either[InterpretationResult, NuExceptionInfo[_ <: Throwable]]]] = {
val compilerData = prepareCompilerData(additionalComponents)
val jobData = JobData(process.metaData, ProcessVersion.empty.copy(processName = process.metaData.name))
val compilerData = prepareCompilerData(jobData, additionalComponents)
val interpreter = compilerData.interpreter
val parts = failOnErrors(compilerData.compile(process))

def compileNode(part: ProcessPart) =
failOnErrors(compilerData.subPartCompiler.compile(part.node, part.validationContext)(process.metaData).result)
failOnErrors(compilerData.subPartCompiler.compile(part.node, part.validationContext)(jobData).result)

val compiled = compileNode(parts.sources.head)
(initialCtx: Context, ec: ServiceExecutionContext) =>
interpreter.interpret[F](compiled, process.metaData, initialCtx, ec)
(initialCtx: Context, ec: ServiceExecutionContext) => interpreter.interpret[F](compiled, jobData, initialCtx, ec)
}

def prepareCompilerData(
jobData: JobData,
additionalComponents: List[ComponentDefinition],
): ProcessCompilerData = {
val components = List(
Expand All @@ -59,6 +60,7 @@ class InterpreterSetup[T: ClassTag] {
val definitionsWithTypes = ModelDefinitionWithClasses(definitions)

ProcessCompilerData.prepare(
jobData,
definitionsWithTypes,
new SimpleDictRegistry(Map.empty).toEngineRegistry,
List.empty,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package pl.touk.nussknacker.engine.api

import io.circe.generic.JsonCodec
import io.circe.{Decoder, Encoder}
import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, VersionId}

// We should split this class - see TODO in ScenarioAction
@JsonCodec case class ProcessVersion(
case class ProcessVersion(
versionId: VersionId,
processName: ProcessName,
processId: ProcessId,
labels: List[String],
user: String,
modelVersion: Option[Int]
)
Expand All @@ -19,8 +20,34 @@ object ProcessVersion {
versionId = VersionId.initialVersionId,
processName = ProcessName(""),
processId = ProcessId(1),
labels = List.empty,
user = "",
modelVersion = None
)

implicit val encoder: Encoder[ProcessVersion] = io.circe.generic.semiauto.deriveEncoder

// decoder with fallback for labels
// legacy format handling required for K8s - there may be some deployed pods with `processVersion` without labels -
// newer NU should be able to decode the old version (for old pods status check etc.)
implicit val decoder: Decoder[ProcessVersion] = {
Decoder.instance { c =>
for {
versionId <- c.downField("versionId").as[VersionId]
processName <- c.downField("processName").as[ProcessName]
processId <- c.downField("processId").as[ProcessId]
labels <- c.downField("labels").as[Option[List[String]]]
user <- c.downField("user").as[String]
modelVersion <- c.downField("modelVersion").as[Option[Int]]
} yield ProcessVersion(
versionId,
processName,
processId,
labels.getOrElse(List.empty),
user,
modelVersion
)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class OpenAPIServiceSpec
implicit val componentUseCase: ComponentUseCase = ComponentUseCase.EngineRuntime
implicit val metaData: MetaData = MetaData("testProc", StreamMetaData())
implicit val context: Context = Context("testContextId", Map.empty)
val jobData = JobData(metaData, ProcessVersion.empty.copy(processName = metaData.name))

type FixtureParam = EagerServiceWithStaticParametersAndReturnType

Expand All @@ -58,7 +59,7 @@ class OpenAPIServiceSpec
.head
.service
.asInstanceOf[EagerServiceWithStaticParametersAndReturnType]
enricher.open(TestEngineRuntimeContext(JobData(metaData, ProcessVersion.empty)))
enricher.open(TestEngineRuntimeContext(jobData))

withFixture(test.toNoArgTest(enricher))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ trait BaseOpenAPITest {
implicit val componentUseCase: ComponentUseCase = ComponentUseCase.EngineRuntime
implicit val metaData: MetaData = MetaData("testProc", StreamMetaData())
implicit val context: Context = Context("testContextId", Map.empty)
private val runtimeContext = TestEngineRuntimeContext(JobData(metaData, ProcessVersion.empty))
private val jobData = JobData(metaData, ProcessVersion.empty.copy(processName = metaData.name))
private val runtimeContext = TestEngineRuntimeContext(jobData)

protected def parseServicesFromResource(
name: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ trait BaseDatabaseQueryEnricherTest extends AnyFunSuite with Matchers with Befor
implicit val collector: ServiceInvocationCollector = EmptyInvocationCollector.Instance
implicit val componentUseCase: ComponentUseCase = ComponentUseCase.TestRuntime

val jobData: JobData = JobData(MetaData("", StreamMetaData()), ProcessVersion.empty)
val jobData: JobData = JobData(metaData, ProcessVersion.empty.copy(processName = metaData.name))

val service: Service

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ case class DMCustomActionCommand(
// TODO Commands below will be legacy in some future because they operate on the scenario level instead of deployment level -
// we should replace them by commands operating on deployment
case class DMTestScenarioCommand(
scenarioName: ProcessName,
processVersion: ProcessVersion,
canonicalProcess: CanonicalProcess,
scenarioTestData: ScenarioTestData
) extends DMScenarioCommand[TestResults[Json]]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pl.touk.nussknacker.restmodel.scenariodetails

import io.circe.{Decoder, Encoder}
import pl.touk.nussknacker.engine.api.ProcessVersion
import pl.touk.nussknacker.engine.api.component.ProcessingMode
import pl.touk.nussknacker.engine.api.deployment.{ProcessAction, ProcessState}
import pl.touk.nussknacker.engine.api.graph.ScenarioGraph
Expand Down Expand Up @@ -61,6 +62,15 @@ final case class ScenarioWithDetails(

def processIdUnsafe: ProcessId = processId.getOrElse(throw new IllegalStateException("Missing processId"))

def processVersionUnsafe: ProcessVersion = ProcessVersion(
versionId = processVersionId,
processName = name,
processId = processIdUnsafe,
labels = labels,
user = createdBy,
modelVersion = modelVersion
)

}

object ScenarioWithDetails {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,9 @@ class ManagementResources(
scenarioTestServices
.forProcessingTypeUnsafe(details.processingType)
.performTest(
details.idWithNameUnsafe,
scenarioGraph,
details.processVersionUnsafe,
details.isFragment,
details.scenarioLabels,
RawScenarioTestData(testDataContent)
)
.flatMap(mapResultsToHttpResponse)
Expand All @@ -220,19 +219,17 @@ class ManagementResources(
val scenarioTestService = scenarioTestServices.forProcessingTypeUnsafe(details.processingType)
scenarioTestService.generateData(
scenarioGraph,
processName,
details.processVersionUnsafe,
details.isFragment,
details.scenarioLabels,
testSampleSize
) match {
case Left(error) => Future.failed(ProcessUnmarshallingError(error))
case Right(rawScenarioTestData) =>
scenarioTestService
.performTest(
details.idWithNameUnsafe,
scenarioGraph,
details.processVersionUnsafe,
details.isFragment,
details.scenarioLabels,
rawScenarioTestData
)
.flatMap(mapResultsToHttpResponse)
Expand All @@ -258,10 +255,9 @@ class ManagementResources(
scenarioTestServices
.forProcessingTypeUnsafe(process.processingType)
.performTest(
process.idWithNameUnsafe,
testParametersRequest.scenarioGraph,
process.processVersionUnsafe,
process.isFragment,
process.scenarioLabels,
testParametersRequest.sourceParameters
)
.flatMap(mapResultsToHttpResponse)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class NodesApiHttpService(
modelData <- getModelData(scenario.processingType)
nodeValidator = processingTypeToNodeValidator.forProcessingTypeUnsafe(scenario.processingType)
nodeData <- dtoToNodeRequest(nodeValidationRequestDto, modelData)
validation <- getNodeValidation(nodeValidator, scenario.name, nodeData)
validation <- getNodeValidation(nodeValidator, scenario, nodeData)
validationDto = NodeValidationResultDto.apply(validation)
} yield validationDto
}
Expand Down Expand Up @@ -197,14 +197,14 @@ class NodesApiHttpService(

private def getNodeValidation(
nodeValidator: NodeValidator,
scenarioName: ProcessName,
scenario: ScenarioWithDetails,
nodeData: NodeValidationRequest
)(
implicit user: LoggedUser
): EitherT[Future, NodesError, NodeValidationResult] =
EitherT.fromEither(
try {
Right(nodeValidator.validate(scenarioName, nodeData))
Right(nodeValidator.validate(scenario.processVersionUnsafe, nodeData))
} catch {
case e: ProcessNotFoundError =>
Left(NoScenario(ProcessName(e.name.value)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import akka.http.scaladsl.server.{Directives, Route}
import akka.http.scaladsl.unmarshalling.{FromEntityUnmarshaller, Unmarshaller}
import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport
import io.circe.syntax._
import pl.touk.nussknacker.engine.api.ProcessVersion
import pl.touk.nussknacker.engine.api.graph.ScenarioGraph
import pl.touk.nussknacker.engine.api.process.{ProcessName, ProcessingType}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
Expand Down Expand Up @@ -56,9 +57,8 @@ class ProcessesExportResources(
exportResolvedProcess(
process,
processDetails.processingType,
processDetails.name,
processDetails.processVersionUnsafe,
processDetails.isFragment,
processDetails.scenarioLabels,
)
}
}
Expand Down Expand Up @@ -98,13 +98,11 @@ class ProcessesExportResources(
private def exportResolvedProcess(
processWithDictLabels: ScenarioGraph,
processingType: ProcessingType,
processName: ProcessName,
processVersion: ProcessVersion,
isFragment: Boolean,
scenarioLabels: List[ScenarioLabel]
)(implicit user: LoggedUser): HttpResponse = {
val processResolver = processResolvers.forProcessingTypeUnsafe(processingType)
val resolvedProcess =
processResolver.validateAndResolve(processWithDictLabels, processName, isFragment, scenarioLabels)
val resolvedProcess = processResolver.validateAndResolve(processWithDictLabels, processVersion, isFragment)
fileResponse(resolvedProcess)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,27 +41,24 @@ class TestInfoResources(
complete {
scenarioTestService.getTestingCapabilities(
scenarioGraph,
processName,
processDetails.processVersionUnsafe,
processDetails.isFragment,
processDetails.scenarioLabels
)
}
} ~ path("testParameters") {
complete {
scenarioTestService.testParametersDefinition(
scenarioGraph,
processName,
processDetails.processVersionUnsafe,
processDetails.isFragment,
processDetails.scenarioLabels
)
}
} ~ path("generate" / IntNumber) { testSampleSize =>
complete {
scenarioTestService.generateData(
scenarioGraph,
processName,
processDetails.processVersionUnsafe,
processDetails.isFragment,
processDetails.scenarioLabels,
testSampleSize
) match {
case Left(error) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import pl.touk.nussknacker.engine.migration.ProcessMigrations
import pl.touk.nussknacker.ui.db.entity.EnvironmentsEntityData
import pl.touk.nussknacker.ui.db.{DbRef, NuTables}
import pl.touk.nussknacker.ui.process.ScenarioQuery
import pl.touk.nussknacker.ui.process.label.ScenarioLabel
import pl.touk.nussknacker.ui.process.migrate.ProcessModelMigrator
import pl.touk.nussknacker.ui.process.processingtype.provider.ProcessingTypeDataProvider
import pl.touk.nussknacker.ui.process.repository._
Expand Down Expand Up @@ -117,7 +118,8 @@ class AutomaticMigration(
.sequenceOption(for {
migrator <- migrators.forProcessingType(processDetails.processingType)
migrationResult <- migrator.migrateProcess(processDetails, skipEmptyMigrations = true)
updateAction = migrationResult.toUpdateAction(processDetails.processId)
updateAction = migrationResult
.toUpdateAction(processDetails.processId, processDetails.scenarioLabels.map(ScenarioLabel.apply))
} yield {
processRepository.updateProcess(updateAction)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import cats.syntax.functor._
import com.typesafe.scalalogging.LazyLogging
import db.util.DBIOActionInstances.DB
import io.circe.generic.JsonCodec
import pl.touk.nussknacker.engine.api.ProcessVersion
import pl.touk.nussknacker.engine.api.component.ProcessingMode
import pl.touk.nussknacker.engine.api.deployment.{DataFreshnessPolicy, ProcessAction, ScenarioActionName}
import pl.touk.nussknacker.engine.api.graph.ScenarioGraph
Expand Down Expand Up @@ -301,9 +302,8 @@ class DBProcessService(
val processResolver = processResolverByProcessingType.forProcessingTypeUnsafe(entity.processingType)
processResolver.validateAndReverseResolve(
canonical,
entity.name,
entity.toEngineProcessVersion,
entity.isFragment,
entity.scenarioLabels.map(ScenarioLabel.apply)
)
},
parameters
Expand Down Expand Up @@ -405,9 +405,8 @@ class DBProcessService(
FatalValidationError.saveNotAllowedAsError(
processResolver.validateBeforeUiResolving(
action.scenarioGraph,
details.name,
details.processVersionUnsafe,
details.isFragment,
scenarioLabels
)
)
val substituted = processResolver.resolveExpressions(action.scenarioGraph, details.name, validation.typingInfo)
Expand Down Expand Up @@ -443,7 +442,7 @@ class DBProcessService(
val scenarioGraph = CanonicalProcessConverter.toScenarioGraph(canonical)
val validationResult = processResolverByProcessingType
.forProcessingTypeUnsafe(process.processingType)
.validateBeforeUiReverseResolving(canonical, process.isFragment)
.validateBeforeUiReverseResolving(canonical, process.processVersionUnsafe, process.isFragment)
Future.successful(ScenarioGraphWithValidationResult(scenarioGraph, validationResult))
}
}
Expand All @@ -453,10 +452,11 @@ class DBProcessService(
processingType: ProcessingType,
isFragment: Boolean
)(implicit user: LoggedUser) = {
val newProcessVersion = ProcessVersion.empty.copy(processName = canonicalProcess.name)
val validationResult =
processResolverByProcessingType
.forProcessingTypeUnsafe(processingType)
.validateBeforeUiReverseResolving(canonicalProcess, isFragment)
.validateBeforeUiReverseResolving(canonicalProcess, newProcessVersion, isFragment)
validationResult.errors.processPropertiesErrors
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package pl.touk.nussknacker.ui.process

import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, ProcessingType}
import pl.touk.nussknacker.ui.process.label.ScenarioLabel

import java.sql.Timestamp

final case class ScenarioMetadata(
id: ProcessId,
name: ProcessName,
description: Option[String],
processCategory: String,
processingType: ProcessingType,
isFragment: Boolean,
isArchived: Boolean,
createdAt: Timestamp,
createdBy: String,
labels: List[ScenarioLabel],
impersonatedByIdentity: Option[String],
impersonatedByUsername: Option[String]
)
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,11 @@ class DeploymentService(
_ <- Future {
processValidator
.forProcessingTypeUnsafe(processDetails.processingType)
.validateCanonicalProcess(processDetails.json, processDetails.isFragment)
.validateCanonicalProcess(
processDetails.json,
processDetails.toEngineProcessVersion,
processDetails.isFragment
)
}.flatMap {
case validationResult if validationResult.hasErrors =>
Future.failed(DeployingInvalidScenarioError(validationResult.errors))
Expand Down
Loading

0 comments on commit a2e3279

Please sign in to comment.