Skip to content

Commit

Permalink
Kafka exceptionHandler: retry when message is too large, log when wri…
Browse files Browse the repository at this point in the history
…ting to Kafka fails (#6958)
  • Loading branch information
piotrp committed Oct 10, 2024
1 parent a0ee1d8 commit 7dc5d70
Show file tree
Hide file tree
Showing 13 changed files with 430 additions and 68 deletions.
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -868,7 +868,8 @@ lazy val kafkaUtils = (project in utils("kafka-utils"))
name := "nussknacker-kafka-utils",
libraryDependencies ++= {
Seq(
"org.apache.kafka" % "kafka-clients" % kafkaV
"org.apache.kafka" % "kafka-clients" % kafkaV,
"org.scalatest" %% "scalatest" % scalaTestV % Test,
)
}
// Depends on componentsApi because of dependency to NuExceptionInfo and NonTransientException -
Expand Down
1 change: 1 addition & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* [#6769](https://github.com/TouK/nussknacker/pull/6769) Added possibility to choose presets and define lists for Long typed parameter inputs in fragments.
* [#6925](https://github.com/TouK/nussknacker/pull/6925) Fix situation when preset labels were presented as `null` when node didn't pass the validation.
* [#7000](https://github.com/TouK/nussknacker/pull/7000) Show all possible options for dictionary editor on open.
* [#6958](https://github.com/TouK/nussknacker/pull/6958) Add message size limit in the "Kafka" exceptionHandler

### 1.17.1 (Not released yet)

Expand Down
9 changes: 9 additions & 0 deletions docs/MigrationGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@
To see the biggest differences please consult the [changelog](Changelog.md).

## In version 1.17-esp

### Configuration changes

* [#6958](https://github.com/TouK/nussknacker/pull/6958) Added message size limit in the "Kafka" exceptionHandler: `maxMessageBytes`.
Its default value reflects Kafka's default size limit of 1 MB (`max.message.bytes`), you need to increase it if your
error topic allows for larger messages. Remember to add some margin for Kafka protocol overhead (100 bytes should be enough).

### Other changes

* [#6952](https://github.com/TouK/nussknacker/pull/6952) Improvement: TypeInformation support for scala.Option:
If you used CaseClassTypeInfoFactory with case classes that contain the Option type, the state won't be restored after the upgrade.

Expand Down
18 changes: 9 additions & 9 deletions docs/integration/KafkaIntegration.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,15 +227,15 @@ Errors can be sent to specified Kafka topic in following json format (see below
Following properties can be configured (please look at correct engine page : [Lite](../../configuration/model/Lite#exception-handling) or [Flink](../../configuration/model/Flink#configuring-exception-handling),
to see where they should be set):

| Name | Default value | Description |
|------------------------|---------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| topic | - | Topic where errors will be sent. If the topic does not exist, it will be created (with default settings - for production deployments make sure the config is ok) during deploy. If (e.g. due to ACL settings) the topic cannot be created, the scenarios will fail, in that case, the topic has to be configured manually. |
| stackTraceLengthLimit | 50 | Limit of stacktrace length that will be sent (0 to omit stacktrace at all) |
| includeHost | true | Should name of host where error occurred (e.g. TaskManager in case of Flink) be included. Can be misleading if there are many network interfaces or hostname is improperly configured) |
| includeInputEvent | false | Should input event be serialized (can be large or contain sensitive data so use with care) |
| useSharedProducer | false | For better performance shared Kafka producer can be used (by default it's created and closed for each error), shared Producer is kind of experimental feature and should be used with care |
| additionalParams | {} | Map of fixed parameters that can be added to Kafka message |

| Name | Default value | Description |
|-----------------------|---------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| topic | - | Topic where errors will be sent. If the topic does not exist, it will be created (with default settings - for production deployments make sure the config is ok) during deploy. If (e.g. due to ACL settings) the topic cannot be created, the scenarios will fail, in that case, the topic has to be configured manually. |
| stackTraceLengthLimit | 50 | Limit of stacktrace length that will be sent (0 to omit stacktrace at all) |
| maxMessageBytes | 1048476 | Limit Kafka message length by truncating variables from input event (enabled by `includeInputEvent`), defaults to Kafka's default `max.message.bytes` of 1 MB with 100 bytes of safety margin |
| includeHost | true | Should name of host where error occurred (e.g. TaskManager in case of Flink) be included. Can be misleading if there are many network interfaces or hostname is improperly configured) |
| includeInputEvent | false | Should input event be serialized (can be large or contain sensitive data so use with care) |
| useSharedProducer | false | For better performance shared Kafka producer can be used (by default it's created and closed for each error), shared Producer is kind of experimental feature and should be used with care |
| additionalParams | {} | Map of fixed parameters that can be added to Kafka message |

### Configuration for Flink engine

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package pl.touk.nussknacker.engine.kafka.exception

import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import net.ceedubs.ficus.Ficus._
import net.ceedubs.ficus.readers.ArbitraryTypeReader._
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.errors.RecordTooLargeException
import pl.touk.nussknacker.engine.api.MetaData
import pl.touk.nussknacker.engine.api.exception.{NonTransientException, NuExceptionInfo}
import pl.touk.nussknacker.engine.api.runtimecontext.EngineRuntimeContext
Expand All @@ -13,18 +16,20 @@ import pl.touk.nussknacker.engine.kafka.{DefaultProducerCreator, KafkaConfig, Ka
import pl.touk.nussknacker.engine.util.SynchronousExecutionContextAndIORuntime
import pl.touk.nussknacker.engine.util.config.ConfigEnrichments.RichConfig

import scala.concurrent.{ExecutionContext, Future}

class KafkaExceptionConsumerProvider extends FlinkEspExceptionConsumerProvider {

override def create(metaData: MetaData, exceptionHandlerConfig: Config): FlinkEspExceptionConsumer = {
val kafkaConfig = KafkaConfig.parseConfig(exceptionHandlerConfig)
val consumerConfig = exceptionHandlerConfig.rootAs[KafkaExceptionConsumerConfig]
val producerCreator = kafkaProducerCreator(kafkaConfig)
val serializationSchema = createSerializationSchema(metaData, consumerConfig)
val errorTopicInitializer = new KafkaErrorTopicInitializer(kafkaConfig, consumerConfig)
val errorTopicInitializer = new DefaultKafkaErrorTopicInitializer(kafkaConfig, consumerConfig)
if (consumerConfig.useSharedProducer) {
SharedProducerKafkaExceptionConsumer(metaData, serializationSchema, producerCreator, errorTopicInitializer)
} else {
TempProducerKafkaExceptionConsumer(serializationSchema, producerCreator, errorTopicInitializer)
TempProducerKafkaExceptionConsumer(metaData, serializationSchema, producerCreator, errorTopicInitializer)
}
}

Expand All @@ -42,42 +47,88 @@ class KafkaExceptionConsumerProvider extends FlinkEspExceptionConsumerProvider {

}

case class TempProducerKafkaExceptionConsumer(
serializationSchema: KafkaSerializationSchema[NuExceptionInfo[NonTransientException]],
kafkaProducerCreator: KafkaProducerCreator.Binary,
kafkaErrorTopicInitializer: KafkaErrorTopicInitializer
) extends FlinkEspExceptionConsumer {
trait BaseKafkaExceptionConsumer extends FlinkEspExceptionConsumer with LazyLogging {
protected val serializationSchema: KafkaSerializationSchema[NuExceptionInfo[NonTransientException]]
protected val kafkaErrorTopicInitializer: KafkaErrorTopicInitializer
protected val metaData: MetaData

private val topic: String = kafkaErrorTopicInitializer.topicName

protected def sendKafkaMessage(record: ProducerRecord[Array[Byte], Array[Byte]]): Future[_]

override def open(context: EngineRuntimeContext): Unit = {
super.open(context)
kafkaErrorTopicInitializer.init()
}

override def consume(exceptionInfo: NuExceptionInfo[NonTransientException]): Unit = {
KafkaUtils.sendToKafkaWithTempProducer(serializationSchema.serialize(exceptionInfo, System.currentTimeMillis()))(
kafkaProducerCreator
)
override final def consume(exceptionInfo: NuExceptionInfo[NonTransientException]): Unit = {
sendKafkaMessage(serializationSchema.serialize(exceptionInfo, System.currentTimeMillis()))
.recoverWith { case e: RecordTooLargeException =>
// Kafka message size should be kept within acceptable size by serializer, but it may be incorrectly configured.
// We may also encounter this exception due to producer's 'buffer.memory' being set too low.
//
// Try to reduce Kafka message size in hope that it will fit configured limits. It's practically impossible
// to correctly detect and handle this limit preemptively, because:
// * Kafka limits are imposed on compressed message sizes (with headers and protocol overhead)
// * there are limits on multiple levels: producer, topic, broker (access requires additional permissions)

val scenario = metaData.id
val node = exceptionInfo.nodeComponentInfo.map(_.nodeId).getOrElse("-")
val error = exceptionInfo.throwable.message
logger.warn(
s"Cannot write to $topic, retrying with stripped context (scenario: $scenario, node: $node, error: $error)." +
s"Verify your configuration of Kafka producer, error logging and errors topic and set correct limits. ${e.getMessage}"
)

val lightExceptionInfo = exceptionInfo.copy(
context = exceptionInfo.context.copy(
variables = Map(
"!warning" -> s"variables truncated, they didn't fit within max allowed size of a Kafka message: ${e.getMessage}",
),
parentContext = None
)
)

sendKafkaMessage(serializationSchema.serialize(lightExceptionInfo, System.currentTimeMillis()))
}(ExecutionContext.Implicits.global)
.recover { case e: Throwable =>
val scenario = metaData.id
val node = exceptionInfo.nodeComponentInfo.map(_.nodeId).getOrElse("-")
val error = exceptionInfo.throwable.message

logger.warn(
s"Failed to write to $topic (scenario: $scenario, node: $node, error: $error): ${e.getMessage}",
e
)
}(ExecutionContext.Implicits.global)
}

}

case class SharedProducerKafkaExceptionConsumer(
case class TempProducerKafkaExceptionConsumer(
metaData: MetaData,
serializationSchema: KafkaSerializationSchema[NuExceptionInfo[NonTransientException]],
kafkaProducerCreator: KafkaProducerCreator.Binary,
kafkaErrorTopicInitializer: KafkaErrorTopicInitializer
) extends FlinkEspExceptionConsumer
with WithSharedKafkaProducer {
) extends BaseKafkaExceptionConsumer {

override def open(context: EngineRuntimeContext): Unit = {
super.open(context)
kafkaErrorTopicInitializer.init()
override protected def sendKafkaMessage(record: ProducerRecord[Array[Byte], Array[Byte]]): Future[_] = {
KafkaUtils
.sendToKafkaWithTempProducer(record)(kafkaProducerCreator)
}

override def consume(exceptionInfo: NuExceptionInfo[NonTransientException]): Unit = {
sendToKafka(serializationSchema.serialize(exceptionInfo, System.currentTimeMillis()))(
SynchronousExecutionContextAndIORuntime.syncEc
)
}

case class SharedProducerKafkaExceptionConsumer(
metaData: MetaData,
serializationSchema: KafkaSerializationSchema[NuExceptionInfo[NonTransientException]],
kafkaProducerCreator: KafkaProducerCreator.Binary,
kafkaErrorTopicInitializer: KafkaErrorTopicInitializer
) extends BaseKafkaExceptionConsumer
with WithSharedKafkaProducer {

override protected def sendKafkaMessage(record: ProducerRecord[Array[Byte], Array[Byte]]): Future[_] = {
sendToKafka(record)(SynchronousExecutionContextAndIORuntime.syncEc)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,13 @@ class KafkaExceptionConsumerSerializationSpec extends AnyFunSuite with Matchers

private val serializationSchema = new KafkaJsonExceptionSerializationSchema(metaData, consumerConfig)

// null as we don't test open here...
private val consumer =
TempProducerKafkaExceptionConsumer(serializationSchema, MockProducerCreator(mockProducer), null)
TempProducerKafkaExceptionConsumer(
metaData,
serializationSchema,
MockProducerCreator(mockProducer),
NoopKafkaErrorTopicInitializer
)

test("records event") {
consumer.consume(exception)
Expand Down
Loading

0 comments on commit 7dc5d70

Please sign in to comment.