Skip to content

Commit

Permalink
feat(cdc): add kafka listener
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrmsouza committed Sep 12, 2024
1 parent e0c50c1 commit aafbbe7
Show file tree
Hide file tree
Showing 24 changed files with 666 additions and 1 deletion.
3 changes: 3 additions & 0 deletions infrastructure/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ dependencies {
implementation('org.springframework.boot:spring-boot-starter-oauth2-resource-server')
implementation("org.springframework.boot:spring-boot-starter-validation")
implementation("org.springframework.boot:spring-boot-starter-data-jdbc")
implementation("org.springframework.kafka:spring-kafka")

implementation("org.springdoc:springdoc-openapi-starter-webmvc-ui:$springdoc") {
exclude group: 'org.springdoc', module: 'springdoc-openapi-ui'
Expand All @@ -57,13 +58,15 @@ dependencies {
testImplementation('org.springframework.security:spring-security-test')
testImplementation("org.springframework.cloud:spring-cloud-contract-wiremock")
testImplementation("org.springframework.boot:spring-boot-testcontainers")
testImplementation("org.springframework.kafka:spring-kafka-test")

testImplementation("com.h2database:h2")
testImplementation("org.flywaydb:flyway-core")
testImplementation("org.flywaydb:flyway-mysql")

testImplementation('org.testcontainers:testcontainers:1.19.8')
testImplementation('org.testcontainers:junit-jupiter:1.19.8')
testImplementation("org.testcontainers:kafka:1.19.8")
}

dependencyManagement {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.github.gabrmsouza.subscription.infrastructure.configuration;

import io.github.gabrmsouza.subscription.infrastructure.configuration.properties.KafkaProperties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@Configuration(proxyBeanMethods = false)
public class KafkaConfiguration {

private final KafkaProperties properties;

public KafkaConfiguration(final KafkaProperties props) {
this.properties = props;
}

@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerFactory() {
final var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setPollTimeout(properties.poolTimeout());
return factory;
}

private ConsumerFactory<String, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

private Map<String, Object> consumerConfigs() {
final var props = new HashMap<String, Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, properties.autoCreateTopics());
return props;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package io.github.gabrmsouza.subscription.infrastructure.configuration;

import io.github.gabrmsouza.subscription.domain.DomainEvent;
import io.github.gabrmsouza.subscription.infrastructure.authentication.clientcredentials.RefreshClientCredentials;
import io.github.gabrmsouza.subscription.infrastructure.observer.Publisher;
import io.github.gabrmsouza.subscription.infrastructure.observer.domainevent.AddToGroupSubscriber;
import io.github.gabrmsouza.subscription.infrastructure.observer.domainevent.RemoveFromGroupSubscriber;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
Expand Down Expand Up @@ -31,4 +35,15 @@ Clock clock() {
LocalValidatorFactoryBean localValidatorFactoryBean() {
return new LocalValidatorFactoryBean();
}

@Bean
Publisher<DomainEvent> domainEventPublisher(
final AddToGroupSubscriber addToGroupSubscriber,
final RemoveFromGroupSubscriber removeFromGroupSubscriber
) {
var publisher = new Publisher<DomainEvent>();
publisher.register(addToGroupSubscriber);
publisher.register(removeFromGroupSubscriber);
return publisher;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package io.github.gabrmsouza.subscription.infrastructure.configuration.properties;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConfigurationProperties(prefix = "kafka")
public class KafkaProperties {

private String bootstrapServers;

private int poolTimeout;

private boolean autoCreateTopics;

public String bootstrapServers() {
return bootstrapServers;
}

public void setBootstrapServers(String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}

public int poolTimeout() {
return poolTimeout;
}

public void setPoolTimeout(int poolTimeout) {
this.poolTimeout = poolTimeout;
}

public boolean autoCreateTopics() {
return autoCreateTopics;
}

public void setAutoCreateTopics(boolean autoCreateTopics) {
this.autoCreateTopics = autoCreateTopics;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.github.gabrmsouza.subscription.infrastructure.kafka;

import com.fasterxml.jackson.core.type.TypeReference;
import io.github.gabrmsouza.subscription.infrastructure.json.Json;
import io.github.gabrmsouza.subscription.infrastructure.kafka.connect.MessageValue;
import io.github.gabrmsouza.subscription.infrastructure.kafka.event.EventMsg;
import io.github.gabrmsouza.subscription.infrastructure.mediator.EventMediator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.util.Objects;

@Component
public class EventCdcListener {
private static final Logger LOG = LoggerFactory.getLogger(EventCdcListener.class);
private static final TypeReference<MessageValue<EventMsg>> EVENT_MESSAGE_TYPE = new TypeReference<>() {};

private final EventMediator eventMediator;

public EventCdcListener(final EventMediator eventMediator) {
this.eventMediator = Objects.requireNonNull(eventMediator);
}

@KafkaListener(
concurrency = "${kafka.consumers.events.concurrency}",
containerFactory = "kafkaListenerFactory",
topics = "${kafka.consumers.events.topics}",
groupId = "${kafka.consumers.events.group-id}",
id = "${kafka.consumers.events.id}",
properties = {
"auto.offset.reset=${kafka.consumers.events.auto-offset-reset}"
}
)
public void onMessage(@Payload(required = false) final String payload, final ConsumerRecordMetadata metadata) {
if (payload == null) {
LOG.info("Message received from Kafka [topic:{}] [partition:{}] [offset:{}]: EMPTY", metadata.topic(), metadata.partition(), metadata.offset());
return;
}

LOG.info("Message received from Kafka [topic:{}] [partition:{}] [offset:{}]: {}", metadata.topic(), metadata.partition(), metadata.offset(), payload);
final var messagePayload = Json.readValue(payload, EVENT_MESSAGE_TYPE).payload();
final var op = messagePayload.operation();

if (!op.isCreate()) {
LOG.info("Message received from Kafka [topic:{}] [partition:{}] [offset:{}]: Discarding operation {}", metadata.topic(), metadata.partition(), metadata.offset(), op);
return;
}

final var eventId = messagePayload.after().eventId();
LOG.info("Message received from Kafka [topic:{}] [partition:{}] [offset:{}]: Processing event {}", metadata.topic(), metadata.partition(), metadata.offset(), eventId);
this.eventMediator.mediate(eventId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.github.gabrmsouza.subscription.infrastructure.kafka.connect;

import com.fasterxml.jackson.annotation.JsonProperty;

public record MessageValue<T>(
@JsonProperty("payload") ValuePayload<T> payload
) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.github.gabrmsouza.subscription.infrastructure.kafka.connect;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;

import java.util.Arrays;

public enum Operation {
CREATE("c"), UPDATE("u"), DELETE("d"), READ("r");

private final String op;

Operation(String op) {
this.op = op;
}

@JsonCreator
public static Operation of(final String value) {
return Arrays.stream(values())
.filter(it -> it.op.equalsIgnoreCase(value))
.findFirst()
.orElse(null);
}

@JsonValue
public String op() {
return op;
}

public boolean isCreate() {
return this == CREATE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.github.gabrmsouza.subscription.infrastructure.kafka.connect;

import com.fasterxml.jackson.annotation.JsonProperty;

public record Source(
@JsonProperty("name") String name,
@JsonProperty("db") String database,
@JsonProperty("table") String table
) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.github.gabrmsouza.subscription.infrastructure.kafka.connect;

import com.fasterxml.jackson.annotation.JsonProperty;

public record ValuePayload<T>(
@JsonProperty("after") T after,
@JsonProperty("before") T before,
@JsonProperty("source") Source source,
@JsonProperty("op") Operation operation
) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.github.gabrmsouza.subscription.infrastructure.kafka.event;

import com.fasterxml.jackson.annotation.JsonProperty;

public record EventMsg(@JsonProperty("event_id") Long eventId) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.github.gabrmsouza.subscription.infrastructure.mediator;

import io.github.gabrmsouza.subscription.domain.DomainEvent;
import io.github.gabrmsouza.subscription.domain.exceptions.InternalErrorException;
import io.github.gabrmsouza.subscription.infrastructure.gateway.repository.EventJdbcRepository;
import io.github.gabrmsouza.subscription.infrastructure.observer.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.util.Objects;

@Component
public class EventMediator {
private static final Logger LOG = LoggerFactory.getLogger(EventMediator.class);

private final EventJdbcRepository repository;
private final Publisher<DomainEvent> publisher;

public EventMediator(final EventJdbcRepository repository, final Publisher<DomainEvent> publisher) {
this.repository = Objects.requireNonNull(repository);
this.publisher = Objects.requireNonNull(publisher);
}

public void mediate(final Long eventId) {
this.repository
.eventOfIdAndUnprocessed(eventId)
.ifPresentOrElse(event -> process(eventId, event), logEventNotFount(eventId));
}

private void process(final Long eventId, final DomainEvent event) {
if (!this.publisher.publish(event)) {
throw InternalErrorException.with("Failed to process event %s".formatted(eventId));
}
this.repository.markAsProcessed(eventId);
}

private Runnable logEventNotFount(final Long eventId) {
return () -> LOG.warn("Event not found [eventId:{}]", eventId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.github.gabrmsouza.subscription.infrastructure.observer;

import java.util.ArrayList;
import java.util.List;

public class Publisher<T> {
private final List<Subscriber<T>> subscribers;

public Publisher() {
this.subscribers = new ArrayList<>();
}

public void register(final Subscriber<T> subscriber) {
this.subscribers.add(subscriber);
}

public boolean publish(final T event) {
boolean success = true;
for (Subscriber<T> subscriber : subscribers) {
try {
if (subscriber.test(event)) {
subscriber.onEvent(event);
return success;
}
} catch (Throwable t) {
success = false;
}
}
return success;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.github.gabrmsouza.subscription.infrastructure.observer;

public interface Subscriber<T> {
boolean test(T ev);
void onEvent(T ev);
}
Loading

0 comments on commit aafbbe7

Please sign in to comment.