Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement] Show DeploymentData in Flink console #1514

Merged
merged 2 commits into from
Apr 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Nussknacker versions
* [#1458](https://github.com/TouK/nussknacker/pull/1458) `PeriodicProcessListener` allows custom handling of `PeriodicProcess` events
* [#1466](https://github.com/TouK/nussknacker/pull/1466) `ProcessManager` API allows to return ExternalDeploymentId immediately from deploy
* [#1405](https://github.com/TouK/nussknacker/pull/1405) 'KafkaAvroSinkFactoryWithEditor' for more user-friendly Avro message definition.
* [#1514](https://github.com/TouK/nussknacker/pull/1514) Expose DeploymentData in Flink UI via `NkGlobalParameters`
* [#1510](https://github.com/TouK/nussknacker/pull/1510) `FlinkSource` API allows to create stream of `Context` (FlinkSource API and test support API refactoring).
* [#1497](https://github.com/TouK/nussknacker/pull/1497) Initial support for multiple (named) schedules in `PeriodicProcessManager`
* [#1499](https://github.com/TouK/nussknacker/pull/1499) ClassTag is provided in params in avro key-value deserialization schema factory: `KafkaAvroKeyValueDeserializationSchemaFactory`
Expand Down
1 change: 1 addition & 0 deletions docs/MigrationGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ To see biggest differences please consult the [changelog](Changelog.md).
- trait `KafkaAvroDeserializationSchemaFactory` uses both key and value ClassTags and schemas (instead of value-only), check the order of parameters.
- ClassTag is provided in params in avro key-value deserialization schema factory: `KafkaAvroKeyValueDeserializationSchemaFactory`
- `BaseKafkaAvroSourceFactory` is able to read both key and value schema determiner to build proper DeserializationSchema (support for keys is not fully introduced in this change)
* [#1514](https://github.com/TouK/nussknacker/pull/1514) `ExecutionConfigPreparer` has different method parameter - `JobData`, which has more info than previous parameters

## In version 0.3.0

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package pl.touk.nussknacker.engine.flink.api

import java.util

import com.typesafe.config.Config

import scala.collection.JavaConverters._
Expand All @@ -10,51 +9,75 @@ import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters
import pl.touk.nussknacker.engine.api.ProcessVersion
import net.ceedubs.ficus.readers.ArbitraryTypeReader._
import net.ceedubs.ficus.Ficus._
import pl.touk.nussknacker.engine.api.deployment.DeploymentData

//we can use this class to pass config through RuntimeContext to places where it would be difficult to use otherwise
//Also, those configuration properties will be exposed via Flink REST API/webconsole
case class NkGlobalParameters(buildInfo: String,
processVersion: ProcessVersion,
deploymentData: DeploymentData,
configParameters: Option[ConfigGlobalParameters],
namingParameters: Option[NamingParameters]) extends GlobalJobParameters {
namingParameters: Option[NamingParameters],
private val additionalInformationSerializer: AdditionalInformationSerializer) extends GlobalJobParameters {

//here we decide which configuration properties should be shown in REST API etc.
//For now it will be only deployment information
//NOTE: this information is used in FlinkRestManager - any changes here should be reflected there
//NOTE: some of the information is used in FlinkRestManager - any changes here should be reflected there
//AdditionalInformationSerializer is used to expose fields for e.g. easier job analysis, custom implementation can be provided
override def toMap: util.Map[String, String] = {
//we wrap in HashMap because .asJava creates not-serializable map in 2.11
new util.HashMap(Map[String, String](

val baseProperties = Map[String, String](
"buildInfo" -> buildInfo,
"versionId" -> processVersion.versionId.toString,
"modelVersion" -> processVersion.modelVersion.map(_.toString).orNull,
"user" -> processVersion.user
).filterNot(_._2 == null).asJava)
)
val baseDeploymentInfo = Map("deployment.user" -> deploymentData.user.id, "deployment.id" -> deploymentData.deploymentId.value)
val additionalInfo = additionalInformationSerializer.toMap(this)

val configMap = baseProperties ++ baseDeploymentInfo ++ additionalInfo
//we wrap in HashMap because .asJava creates not-serializable map in 2.11
new util.HashMap(configMap.filterNot(_._2 == null).asJava)
}

}

trait AdditionalInformationSerializer extends Serializable {

def toMap(nkGlobalParameters: NkGlobalParameters): Map[String, String]

}

object DefaultAdditionalInformationSerializer extends AdditionalInformationSerializer {
override def toMap(nkGlobalParameters: NkGlobalParameters): Map[String, String] = {
val deploymentData = nkGlobalParameters.deploymentData
deploymentData.additionalDeploymentData.map {
case (k, v) => s"deployment.properties.$k" -> v
}
}
}

//this is part of global parameters that is parsed with typesafe Config (e.g. from application.conf/model.conf)
case class ConfigGlobalParameters(useLegacyMetrics: Option[Boolean],
explicitUidInStatefulOperators: Option[Boolean],
useTypingResultTypeInformation: Option[Boolean],
//TODO: temporary, until we confirm that IOMonad is not causing problems
//TODO: temporary, until we confirm that IOMonad is not causing problems
useIOMonadInInterpreter: Option[Boolean])

case class NamingParameters(tags: Map[String, String])

object NkGlobalParameters {

def apply(buildInfo: String, processVersion: ProcessVersion, modelConfig: Config, namingParameters: Option[NamingParameters] = None): NkGlobalParameters = {
def apply(buildInfo: String, processVersion: ProcessVersion, deploymentData: DeploymentData, modelConfig: Config, namingParameters: Option[NamingParameters] = None): NkGlobalParameters = {
val configGlobalParameters = modelConfig.getAs[ConfigGlobalParameters]("globalParameters")
NkGlobalParameters(buildInfo, processVersion, configGlobalParameters, namingParameters)
NkGlobalParameters(buildInfo, processVersion, deploymentData, configGlobalParameters, namingParameters, DefaultAdditionalInformationSerializer)
}

def setInContext(ec: ExecutionConfig, globalParameters: NkGlobalParameters): Unit = {
ec.setGlobalJobParameters(globalParameters)
}

def readFromContext(ec: ExecutionConfig): Option[NkGlobalParameters] = Option(ec.getGlobalJobParameters).collect {
case a:NkGlobalParameters => a
case a: NkGlobalParameters => a
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.scalatest.{FunSuite, Matchers}
import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.NodeId
import pl.touk.nussknacker.engine.api.deployment.DeploymentData
import pl.touk.nussknacker.engine.api.process._
import pl.touk.nussknacker.engine.api.{MetaData, ProcessVersion, StreamMetaData}
import pl.touk.nussknacker.engine.api.{JobData, MetaData, ProcessVersion, StreamMetaData}
import pl.touk.nussknacker.engine.avro.KafkaAvroBaseTransformer._
import pl.touk.nussknacker.engine.avro.encode.ValidationMode
import pl.touk.nussknacker.engine.avro.kryo.AvroSerializersRegistrar
Expand Down Expand Up @@ -45,7 +44,7 @@ trait KafkaAvroSpecMixin extends FunSuite with KafkaWithSchemaRegistryOperations
ProcessSettingsPreparer(modelData),
new UnoptimizedSerializationPreparer(modelData),
new ExecutionConfigPreparer {
override def prepareExecutionConfig(config: ExecutionConfig)(metaData: MetaData, processVersion: ProcessVersion): Unit = {
override def prepareExecutionConfig(config: ExecutionConfig)(jobData: JobData): Unit = {
AvroSerializersRegistrar.registerGenericRecordSchemaIdSerializationIfNeed(config, confluentClientFactory, kafkaConfig)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import org.apache.flink.api.common.ExecutionConfig
import org.scalatest.{EitherValues, FunSuite, Matchers}
import pl.touk.nussknacker.engine.api.deployment.DeploymentData
import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies
import pl.touk.nussknacker.engine.api.{MetaData, ProcessVersion, StreamMetaData}
import pl.touk.nussknacker.engine.api.{JobData, MetaData, ProcessVersion, StreamMetaData}
import pl.touk.nussknacker.engine.avro.encode.{BestEffortAvroEncoder, ValidationMode}
import pl.touk.nussknacker.engine.avro.kryo.AvroSerializersRegistrar
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.{MockConfluentSchemaRegistryClientFactory, MockSchemaRegistryClient}
Expand Down Expand Up @@ -338,7 +338,7 @@ class GenericItSpec extends FunSuite with FlinkSpec with Matchers with KafkaSpec
ProcessSettingsPreparer(modelData),
new UnoptimizedSerializationPreparer(modelData),
new ExecutionConfigPreparer {
override def prepareExecutionConfig(config: ExecutionConfig)(metaData: MetaData, processVersion: ProcessVersion): Unit = {
override def prepareExecutionConfig(config: ExecutionConfig)(jobData: JobData): Unit = {
AvroSerializersRegistrar.registerGenericRecordSchemaIdSerializationIfNeed(config, new MockConfluentSchemaRegistryClientFactory(schemaRegistryMockClient), kafkaConfig)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import net.ceedubs.ficus.Ficus._
import org.apache.flink.api.common.ExecutionConfig
import pl.touk.nussknacker.engine.ModelData
import pl.touk.nussknacker.engine.api.namespaces.{FlinkUsageKey, NamingContext, ObjectNaming}
import pl.touk.nussknacker.engine.api.{MetaData, ProcessVersion}
import pl.touk.nussknacker.engine.api.JobData
import pl.touk.nussknacker.engine.flink.api.{NamingParameters, NkGlobalParameters}
import pl.touk.nussknacker.engine.process.util.Serializers

Expand All @@ -17,7 +17,7 @@ import pl.touk.nussknacker.engine.process.util.Serializers
trait ExecutionConfigPreparer {

def prepareExecutionConfig(config: ExecutionConfig)
(metaData: MetaData, processVersion: ProcessVersion): Unit
(jobData: JobData): Unit

}

Expand All @@ -38,18 +38,18 @@ object ExecutionConfigPreparer extends LazyLogging {

def chain(configPreparers: ExecutionConfigPreparer*): ExecutionConfigPreparer = {
new ExecutionConfigPreparer {
override def prepareExecutionConfig(config: ExecutionConfig)(metaData: MetaData, processVersion: ProcessVersion): Unit = {
configPreparers.foreach(_.prepareExecutionConfig(config)(metaData, processVersion))
override def prepareExecutionConfig(config: ExecutionConfig)(jobData: JobData): Unit = {
configPreparers.foreach(_.prepareExecutionConfig(config)(jobData))
}
}
}

class ProcessSettingsPreparer(processConfig: Config, objectNaming: ObjectNaming, buildInfo: String) extends ExecutionConfigPreparer {
override def prepareExecutionConfig(config: ExecutionConfig)
(metaData: MetaData, processVersion: ProcessVersion): Unit = {
val namingParameters = objectNaming.objectNamingParameters(metaData.id, processConfig, new NamingContext(FlinkUsageKey))
(jobData: JobData): Unit = {
val namingParameters = objectNaming.objectNamingParameters(jobData.metaData.id, processConfig, new NamingContext(FlinkUsageKey))
.map(p => NamingParameters(p.toTags))
NkGlobalParameters.setInContext(config, NkGlobalParameters(buildInfo, processVersion, processConfig, namingParameters))
NkGlobalParameters.setInContext(config, NkGlobalParameters(buildInfo, jobData.processVersion, jobData.deploymentData, processConfig, namingParameters))
}
}

Expand All @@ -66,7 +66,7 @@ object ExecutionConfigPreparer extends LazyLogging {
modelData.processConfig.getOrElse[Boolean]("enableObjectReuse", true)

override def prepareExecutionConfig(config: ExecutionConfig)
(metaData: MetaData, processVersion: ProcessVersion): Unit = {
(jobData: JobData): Unit = {
Serializers.registerSerializers(modelData, config)
if (enableObjectReuse) {
config.enableObjectReuse()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class DefaultStreamExecutionEnvPreparer(checkpointConfig: Option[CheckpointConfi

override def preRegistration(env: StreamExecutionEnvironment, processWithDeps: FlinkProcessCompilerData): Unit = {

executionConfigPreparer.prepareExecutionConfig(env.getConfig)(processWithDeps.metaData, processWithDeps.jobData.processVersion)
executionConfigPreparer.prepareExecutionConfig(env.getConfig)(processWithDeps.jobData)

val streamMetaData = MetaDataExtractor.extractTypeSpecificDataOrFail[StreamMetaData](processWithDeps.metaData)
env.setRestartStrategy(processWithDeps.restartStrategy)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, TypeInformation}
import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, TraversableTypeInfo}
import org.scalatest.{FunSuite, Matchers}
import pl.touk.nussknacker.engine.api.context.ValidationContext
import pl.touk.nussknacker.engine.api.deployment.DeploymentData
import pl.touk.nussknacker.engine.api.typed.typing
import pl.touk.nussknacker.engine.api.typed.typing.{Typed, Unknown}
import pl.touk.nussknacker.engine.flink.api.typeinformation.TypeInformationDetection
import pl.touk.nussknacker.engine.api.{Context, InterpretationResult, ProcessVersion, ValueWithContext}
import pl.touk.nussknacker.engine.flink.api.{ConfigGlobalParameters, NkGlobalParameters}
import pl.touk.nussknacker.engine.flink.api.{ConfigGlobalParameters, DefaultAdditionalInformationSerializer, NkGlobalParameters}
import pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject.TypedScalaMapTypeInformation
import pl.touk.nussknacker.test.ClassLoaderWithServices

Expand All @@ -18,7 +19,8 @@ class TypeInformationDetectionSpec extends FunSuite with Matchers {
private val loader = getClass.getClassLoader

private def executionConfig(useTypingResultAware: Option[Boolean] = None) = new ExecutionConfig {
setGlobalJobParameters(NkGlobalParameters("", ProcessVersion.empty, Some(ConfigGlobalParameters(None, None, useTypingResultAware, None)), None))
setGlobalJobParameters(NkGlobalParameters("",
ProcessVersion.empty, DeploymentData.empty, Some(ConfigGlobalParameters(None, None, useTypingResultAware, None)), None, DefaultAdditionalInformationSerializer))
}

private def typeInformationForVariables(detection: TypeInformationDetection,
Expand Down