Skip to content

guide kafka spring

devonfw-core edited this page Dec 13, 2022 · 6 revisions

Kafka

This guide explains how Spring Kafka is used in devonfw applications. It focuses on aspects which are special to devonfw if you want to learn about spring-kafka you should adhere to springs references documentation.

There is an example of simple Kafka implementation in the devon4j-kafka-employeeapp.

The devon4j-kafka library consists of:

  • Custom message processor with retry pattern

  • Monitoring support

  • Tracing support

  • Logging support

  • Configuration support for Kafka Producers, Consumers, brave tracer and message retry processing including defaults

How to use?

To use devon4j-kafka you have to add required starter dependencies which is "starter-kafka-sender" or "starter-kafka-receiver" from devon4j. These 2 starters are responsible for taking care of the required spring configuration. If you only want to produce messages "starter-kafka-sender" is enough. For consuming messages you need "starter-kafka-receiver" which also includes "starter-kafka-sender".

To use devon4j-kafka message sender add the below dependency:

<dependency>
  <groupId>com.devonfw.java.starters</groupId>
  <artifactId>devon4j-starter-kafka-sender</artifactId>
</dependency>

It includes the Tracer implementations from Spring cloud sleuth.

To use the devon4j-kafka message receiver configurations, loggers and message retry processor for processing message, add the below dependency:

<dependency>
  <groupId>com.devonfw.java.starters</groupId>
  <artifactId>devon4j-starter-kafka-receiver</artifactId>
</dependency>

Property Parameters

As written before kafka-producer and listener-specific configuration is done via properties classes. These classes provide useful defaults, at a minimum the following parameters have to be configured:

messaging.kafka.common.bootstrap-servers=kafka-broker:9092
messaging.kafka.consumer.group-id=<e.g. application name>
messaging.kafka.listener.container.concurrency=<Number of listener threads for each listener container>

All the configuration beans for devon4j-kafka are annotated with @ConfigurationProperties and use common prefixes to read the property values from application.properties or application.yml.

Example:

 @Bean
  @ConfigurationProperties(prefix = "messaging.kafka.producer")
  public KafkaProducerProperties messageKafkaProducerProperties() {
    return new KafkaProducerProperties();
  }

For producer and consumer the prefixes are messaging.kafka.producer…​ and message.kafka.consumer…​ and for retry the prefix is messaging.retry…​

We use the same properties defined by Apache Kafka or Spring Kafka. They are simply "mapped" to the above prefixes to allow easy access from your application properties. The java docs provided in each of the devon4j-kafka property classes which explains their use and what value has to be passed.

Naming convention for topics

For better managing of several Kafka topics in your application portfolio we strongly advice to introduce a naming scheme for your topics. The schema may depend on the actual usage pattern of Kafka. For context where Kafka is used in a 1-to-1-communication-scheme (not publish/subscribe) the following schema has been proven useful in practice:

<application name>-<service name>-<version>-<service-operation>

To keep things easy and prevent problems we suggest to use only small letters, hyphens but no other special characters.

Send Messages

As mentioned above the 'starter-kafka-sender' is required to be added as a dependency to use MessageSender from Kafka.

<dependency>
  <groupId>com.devonfw.java.starters</groupId>
  <artifactId>devon4j-starter-kafka-sender</artifactId>
</dependency>

The following example shows how to use MessageSender and its method to send messages to Kafka broker:

Example:

  @Inject
  private MessageSender messageSender;
  private ProducerRecord<K,V> producerRecord;

  public void sendMessageToKafka(){
  producerRecord=new ProducerRecord<>("topic-name","message");
  messageSender.sendMessage(this.producerRecord);
  //Alternative
  messageSender.sendMessageAndWait(this.producerRecord,10);
  }

There are multiple methods available from MessageSender of devon4j-kafka. The ProducerListener will log the message sent to the Kafka broker.

Receive Messages

To receive messages you have to define a listener. The listener is normally part of the service layer.

Architecture for Kafka services
Figure 1. Architecture for Kafka services

Import the following starter-kafka-receiver dependency to use the listener configurations and loggers from devon4j-kafka.

<dependency>
  <groupId>com.devonfw.java.starters</groupId>
  <artifactId>devon4j-starter-kafka-receiver</artifactId>
</dependency>

The listener is defined by implementing and annotating a method like in the following example:

  @KafkaListener(topics = "employeeapp-employee-v1-delete", groupId = "${messaging.kafka.consumer.groupId}", containerFactory = "kafkaListenerContainerFactory")
  public void consumer(ConsumerRecord<Object, Object> consumerRecord, Acknowledgment acknowledgment) {
  //user operation
  //To acknowledge listener after processing
  acknowledgement.acknowledge();
  }

The group id can be mentioned in application.properties as listener properties.

messaging.kafka.consumer.groupId=default

If there are multiple topics and multiple listeners then we suggest to specify the topic names directly on each listener instead reading from the property file. The container factory mentioned in the @KafkaListener is provided in the KafkaListenerContainerProperties.java to create a default container factory with acknowledgement.

The default ack-mode is manual_immediate . It can be overridden by below example:

messaging.kafka.listener.container.ackMode=<ack-mode>

The other ack-mode values can be referred from here.

Retry

The retry pattern in devon4j-kafka is invoked when a particular exception(described by user in application.properties file) is thrown while processing the consumed message and it is configured in application.properties file. The general idea is to separate messages which could not be processed into dedicated retry-topics to allow fine control on how processing of the messages is retried and to not block newly arriving messages. Let us see more about handling retry in the below topics.

Retry pattern in devon4j-kafka

Handling retry in devon4j-kafka

The retry pattern is included in the starter dependency of "starter-kafka-receiver".

The retryPattern method is used by calling the method processMessageWithRetry(ConsumerRecord<K, V> consumerRecord,MessageProcessor<K, V> processor). Please find the below Example:

@Inject
private MessageRetryOperations<K, V> messageRetryOperations;
@Inject
private DeleteEmployeeMessageProcessor<K, V> deleteEmployeeMessageProcessor;
@KafkaListener(topics = "employeeapp-employee-v1-delete", groupId = "${messaging.kafka.consumer.groupId}",containerFactory = "kafkaListenerContainerFactory")
public void consumer(ConsumerRecord<K, V> consumerRecord, Acknowledgment acknowledgment) {
this.messageRetryOperations.processMessageWithRetry(consumerRecord, this.deleteEmployeeMessageProcessor);
// Acknowledge the listener.
acknowledgment.acknowledge();
}

The implementation for MessageProcessor from devon4j-kafka is required to provide the implementation to process the ConsumedRecord from Kafka broker. The implementation for MessageProcessor interface can look as below example:

import com.devonfw.module.kafka.common.messaging.retry.api.client.MessageProcessor;
@Named
public class DeleteEmployeeMessageProcessor<K, V> implements MessageProcessor<K, V> {
 @Override
  public void processMessage(ConsumerRecord<K, V> message) {
  //process message
  }
}

It works as follows:

  • The application gets a message from the topic.

  • During the processing of the message an error occurs, the message will be written to the redelivery topic.

  • The message is acknowledged in the topic.

  • The message will be processed from the re-delivery topic after a delay.

  • Processing of the message fails again. It retires until the retry count gets over.

  • When the retry fails in all the retry then the message is logged and payload in the ProducerRecord is deleted for log compaction which is explained below.

Retry configuration and naming convention of redelivery topics.

The following properties should be added in the application.properties or application.yml file.

The retry pattern in devon4j-kafka will perform for specific topic of a message. So its mandatory to specify the properties for each topic. Below properties are example,

# Back off policy properties for employeeapp-employee-v1-delete
messaging.retry.back-off-policy.retryReEnqueueDelay.employeeapp-employee-v1-delete=1000
messaging.retry.back-off-policy.retryDelay.employeeapp-employee-v1-delete=600000
messaging.retry.back-off-policy.retryDelayMultiplier.employeeapp-employee-v1-delete=1.0
messaging.retry.back-off-policy.retryMaxDelay.employeeapp-employee-v1-delete=600000
messaging.retry.back-off-policy.retryCount.employeeapp-employee-v1-delete=2

# Retry policy properties for employeeapp-employee-v1-delete
messaging.retry.retry-policy.retryPeriod.employeeapp-employee-v1-delete=1800
messaging.retry.retry-policy.retryableExceptions.employeeapp-employee-v1-delete=<Class names of exceptions for which a retry should be performed>
messaging.retry.retry-policy.retryableExceptionsTraverseCauses.employeeapp-employee-v1-delete=true

# Back off policy properties for employeeapp-employee-v1-add
messaging.retry.back-off-policy.retryReEnqueueDelay.employeeapp-employee-v1-add=1000
messaging.retry.back-off-policy.retryDelay.employeeapp-employee-v1-add=600000
messaging.retry.back-off-policy.retryDelayMultiplier.employeeapp-employee-v1-add=2.0
messaging.retry.back-off-policy.retryMaxDelay.employeeapp-employee-v1-add=600000
messaging.retry.back-off-policy.retryCount.employeeapp-employee-v1-add=4

# Retry policy properties for employeeapp-employee-v1-add
messaging.retry.retry-policy.retryPeriod.employeeapp-employee-v1-add=3000
messaging.retry.retry-policy.retryableExceptions.employeeapp-employee-v1-add=<Class names of exceptions for which a retry should be performed>
messaging.retry.retry-policy.retryableExceptionsTraverseCauses.employeeapp-employee-v1-add=true

If you notice the above properties, the retry-policy and back-off policy properties are repeated twice as i have 2 topics for the retry to be performed with different level of values. The topic name should be added at the last of attribute.

So, the retry will be performed for each topic according to their configuration values.

If you want to provide same/default values for all the topics, then its required to add default in the place of topic on the above properties example.

For example,

# Default back off policy properties
messaging.retry.back-off-policy.retryReEnqueueDelay.default=1000
messaging.retry.back-off-policy.retryDelay.default=600000
messaging.retry.back-off-policy.retryDelayMultiplier.default=1.0
messaging.retry.back-off-policy.retryMaxDelay.default=600000
messaging.retry.back-off-policy.retryCount.default=2

# Default retry policy properties
messaging.retry.retry-policy.retryPeriod.default=1800
messaging.retry.retry-policy.retryableExceptions.default=<Class names of exceptions for which a retry should be performed>
messaging.retry.retry-policy.retryableExceptionsTraverseCauses.default=true

By giving properties like above, the same values will be passed for all the topics and the way of processing retry for all the topics are same.

All these above property values are mapped to the classes DefaultBackOffPolicyProperties.java and DefaultRetryPolicyProperties.java and configured by the class MessageDefaultRetryConfig.java.

The MessageRetryContext in devon kafka is used to perform the retry pattern with the properties from DefaultBackOffPolicyProperties and DefaultRetryPolicyProperties.

The 2 main properties of MessageRetryContext are nextRetry and retryUntil which is a Instant date format and it is calculated internally using the properties given in DefaultBackOffPolicyProperties and DefaultRetryPolicyProperties.

You may change the behavior of this date calculation by providing your own implementation classes for MessageBackOffPolicy.java and MessageRetryPolicy.java.

The naming convention for retry topic is the same topic name which you have given to publish the message and we add suffix -retry to it once it is consumed and given to process with retry.

If there is no topic found in the consumed record the default retry topic will be added which is default-message-retry.

Retry topics

Devon4j-kafka uses a separate retry topic for each topic where retries occur. By default this topic is named <topic name>-retry. You may change this behavior by providing your own implementation for DefaultKafkaRecordSupport which is a default implementation from devon4j-kafka for KafkaRecordSupport.

Devon4j-kafka enqueues a new message for each retry attempt. It is very important to configure your retry tropics with log compaction enabled. More or less simplified, if log compaction is enabled Kafka keeps only one message per message key. Since each retry message has the same key, in fact only one message per retry attempt is stored. After the last retry attempt the message payload is removed from the message so, you do not keep unnecessary data in your topics.

Handling retry finally failed

Per default when the retry fails with final attempt we just log the message and delete the payload of ProducerRecord which comes to proceed the retry pattern.

You can change this behavior by providing the implementation class for the interface MessageRetryHandler.java which has two methods retryTimeout and retryFailedFinal.

Tracer

We leverage Spring Cloud Sleuth for tracing in devon4j-kafka This is used to trace the asynchronous process of Kafka producing and consuming. In an asynchronous process it is important to maintain an id which will be same for all asynchronous process. However, devon uses its own correlation-id(UUID) to track the process. But devon4j-kafka uses an additional tracing protocol which is Brave Tracer.

This is a part of both starter dependencies starter-kafka-receiver and starter-kafka-sender.

There are 2 important properties which will be automatically logged which are trace-id and spain-id. The trace-id is same for all the asynchronous process and span-id is unique for each asynchronous process.

How devon4j-kafka handles tracer ?

We inject the trace-id and span-id in to the ProducerRecord headers which comes to publish into the Kafka broker. It’s injected in the headers with the key traceId for trace-id and spanId for span-id. Along with these, the correlation-id(UUID) is also injected in the headers of record with the key correlationId.

So, when you consume record from Kafka broker, these values can be found in the consumed record’s headers with these keys.

So, it is very helpful to track the asynchronous process of consuming the messages.

Logging

devon4j-kafka provides multiple support classes to log the published message and the consumed message. * The class ProducerLoggingListener which implements ProducerListener<K,V> from Spring Kafka uses to log the message as soon as it is published in the Kafka broker.

  • The aspect class MessageListenerLoggingAspect which is annotated with @Aspect and has a method logMessageprocessing which is annotated with @Around("@annotation(org.springframework.kafka.annotation.KafkaListener)&&args(kafkaRecord,..)") is used to listen to the classes which is annotated with @KafkaListener and logs the message as soon as it is consumed.

  • The class MessageLoggingSupport has multiple methods to log different types of events like MessageReceived, MessageSent, MessageProcessed, MessageNotProcessed.

  • The class LoggingErrorHandler which implements ErrorHandler from spring-kafka which logs the message when an error occurred while consuming messages. You may change this behavior by creating your own implementation class for the ErrorHandler.

Kafka Health check using Spring acutator

The spring config class MessageCommonConfig automatically provides a spring health indicator bean for kafka if the property endpoints. The health indicator will check for all topics listed in messaging.kafka.health.topics-tocheck if a leader is available. If this property is missing only the broker connection will be checked. The timeout for the check (default 60s) maybe changed via the property messaging.kafka.health.timeout. If an application uses multiple broker(-clusters) for each broker(-cluster) a dedicated health indicator bean has to be configured in the spring config.

The properties for the devon kafka health check should be given like below example:

management.endpoint.health.enabled=<true or false>
messaging.kafka.health.timeout=<the health check timeout seconds>
messaging.kafka.health.topicsToCheck=employeeapp-employee-v1-delete,employeeapp-employee-v1-add

These properties are provided with default values except the topicsToCheck and health check will do happen only when the property is management.endpoint.health.enabled=true.

Authentication

JSON Web Token (JWT)

devon4j-kafka supports authentication via JSON Web Tokens (JWT) out-of-the-box. To use it add a dependency to the devon4j-starter-security-jwt:

<dependency>
  <groupId>com.devonfw.java.starters</groupId>
  <artifactId>devon4j-starter-security-jwt</artifactId>
</dependency>

The authentication via JWT needs some configuration, e.g. a keystore to verify the token signature. This is explained in the JWT documentation.

To secure a message listener with jwt add the @JwtAuthentication:

  @JwtAuthentication
  @KafkaListener(topics = "employeeapp-employee-v1-delete", groupId = "${messaging.kafka.consumer.groupId}")
  public void consumer(ConsumerRecord<K, V> consumerRecord, Acknowledgment acknowledgment) {
...
    }
  }

With this annotation in-place each message will be checked for a valid JWT in a message header with the name Authorization. If a valid annotation is found the spring security context will be initialized with the user roles and "normal" authorization e.g. with @RolesAllowed may be used. This is also demonstrated in the kafka sample application.

Using Kafka for internal parallel processing

Apart from the use of Kafka as "communication channel", it is sometimes helpful to use Kafka internally to do parallel processing:

Architecture for internal parallel processing with Kafka
Figure 2. Architecture for internal parallel processing with Kafka

This examples shows a payment service which allows to submit a list of receipt IDs for payment. We assume that the payment itself takes a long time and should be done asynchronously and in parallel. The general idea is to put a message for each receipt to pay into a topic. This is done in the use case implementation in a first step, if a rest call arrives. Also part of the use case is a listener which consumes the messages. For each message (e.g. payment to do) a processor is called, which actually does the payment via the use case. Since Kafka supports concurrency for the listeners easily the payment will also be done in parallel. All features of devon4j-kafka, like retry handling could also be used.

Clone this wiki locally