Skip to content

Commit

Permalink
[IMPROVE] init Kafka mailet
Browse files Browse the repository at this point in the history
  • Loading branch information
Maxxx873 committed Nov 21, 2024
1 parent 26ce4d5 commit c91a436
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 6 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
![PostgreSQL](https://img.shields.io/badge/PostgreSQL-2F3134?style=for-the-badge&logo=postgresql)
![Redis](https://img.shields.io/badge/Redis-2F3134?style=for-the-badge&logo=redis)
![RabbitMQ](https://img.shields.io/badge/RabbitMq-2F3134?style=for-the-badge&logo=rabbitmq)
![Apache Kafka](https://img.shields.io/badge/Apache%20Kafka-2F3134?style=for-the-badge&logo=apachekafka)
![Elasticsearch](https://img.shields.io/badge/elasticsearch-2F3134?style=for-the-badge&logo=)
![Docker](https://img.shields.io/badge/Docker-2F3134?style=for-the-badge&logo=docker)
![Kubernetes](https://img.shields.io/badge/Kubernetes-2F3134?style=for-the-badge&logo=kubernetes)
Expand Down
1 change: 1 addition & 0 deletions apache-james/conf-dev/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,6 @@
<logger name="com.disposableemail.apache.james.mailet.collector.SyncSourceCollector" level="INFO" />
<logger name="com.disposableemail.apache.james.mailet.collector.subscriber.SourceCollectorSubscriber" level="INFO" />
<logger name="com.disposableemail.apache.james.mailet.collector.subscriber.MessageCollectorSubscriber" level="INFO" />
<logger name="com.disposableemail.apache.james.mailet.producer.kafka.KafkaProducerMailet" level="INFO" />

</configuration>
6 changes: 6 additions & 0 deletions apache-james/conf-dev/mailetcontainer.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@
<messageCollectionName>message</messageCollectionName>
<accountCollectionName>account</accountCollectionName>
</mailet>
<mailet match="All" class="com.disposableemail.apache.james.mailet.producer.kafka.KafkaProducerMailet">
<bootstrapServer>kafka0:9092</bootstrapServer>
<topicName>disposable-email-received</topicName>
<partitions>2</partitions>
<replication>1</replication>
</mailet>
<mailet match="All" class="PostmasterAlias"/>
<mailet match="RelayLimit=30" class="Null"/>
<mailet match="com.disposableemail.apache.james.matcher.NotBigMessage" class="ToProcessor">
Expand Down
39 changes: 36 additions & 3 deletions apache-james/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,37 @@
version: '3'

services:

kafka0:
image: bitnami/kafka:3.9
ports:
- "9092:9092"
environment:
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka0:9092
- KAFKA_BROKER_ID=1
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka0:9093
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_NODE_ID=1
volumes:
- volume-kafka-1:/bitnami/kafka
networks:
- james

ui:
depends_on:
- kafka0
image: provectuslabs/kafka-ui:v0.7.1
ports:
- "8080:8080"
environment:
- KAFKA_CLUSTERS_0_BOOTSTRAP_SERVERS=kafka0:9092
- KAFKA_CLUSTERS_0_NAME=disposable_emails_project
networks:
- james

james:
depends_on:
- elasticsearch
Expand Down Expand Up @@ -93,4 +123,7 @@ services:

networks:
james:
driver: bridge
driver: bridge

volumes:
volume-kafka-1:
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.disposableemail.apache.james.mailet.producer.kafka

import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.mailet.Mail
import org.apache.mailet.base.GenericMailet
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
import reactor.kafka.sender.KafkaSender
import reactor.kafka.sender.SenderOptions
import reactor.kafka.sender.SenderRecord
import java.util.*


class KafkaProducerMailet : GenericMailet() {

private val logger: Logger = LoggerFactory.getLogger(KafkaProducerMailet::class.java)
private lateinit var bootstrapServer: String
private lateinit var topic: String
private lateinit var partitions: List<Int>
private var replication: Short = 0

override fun init() {
logger.info("${javaClass.simpleName} Initializing..")

bootstrapServer = getInitParameter("bootstrapServer")
topic = getInitParameter("topicName")
partitions = (0..<getInitParameter("partitions").toInt()).toList()
replication = getInitParameter("replication").toShort()
val config = Properties().apply {
this[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServer
}

AdminClient.create(config).use { admin ->
val topicNames = admin.listTopics().names().get()
if (topic !in topicNames) {
createTopic(admin)
}
}
}

private fun createTopic(admin: AdminClient) {
val newTopic = NewTopic(topic, partitions.size, replication).configs(emptyMap())
admin.createTopics(listOf(newTopic)).apply {
logger.info("Topic $topic not exists, created new")
}
}

override fun service(mail: Mail?) {
mail?.let {
logger.info("Mail received: ${mail.message.messageID}")

val senderOptions = SenderOptions.create<String, String>(
mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServer,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java
)
)

KafkaSender.create(senderOptions).send(Flux.fromIterable(it.recipients)
.map { address ->
SenderRecord.create(
topic,
partitions.first(),
it.message.sentDate.time,
it.message.messageID,
address.asString(),
it.message.messageID
)
}
)
.doOnError { e: Any -> logger.info("Send message to Kafka failed: $e") }
.doOnNext { _: Any -> logger.info("Message sent to Kafka: " + it.message.messageID) }
.subscribe()
}
}

override fun getMailetInfo(): String {
return "Disposable email Kafka producer mailet"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.disposableemail.apache.james.mailet.producer.kafka

import org.apache.james.core.MailAddress
import org.apache.mailet.base.test.FakeMail
import org.apache.mailet.base.test.FakeMailetConfig
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import java.io.FileInputStream
import javax.mail.MessagingException
import javax.mail.internet.MimeMessage

@Disabled
class KafkaProducerMailetTest {

lateinit var mailetConfig: FakeMailetConfig
lateinit var mailet: KafkaProducerMailet

@BeforeEach
@Throws(Exception::class)
fun setUp() {
mailetConfig = FakeMailetConfig.builder()
.mailetName("KafkaProducerMailet")
.setProperty("bootstrapServer", "localhost:9092")
.setProperty("topicName", "disposable-email-received")
.setProperty("partitions", "2")
.setProperty("replication", "1")
.build()

mailet = KafkaProducerMailet()
mailet.init(mailetConfig)

}

@Test
@Throws(MessagingException::class)
fun mailetShouldNotCreateDocumentWhenMailIsEmpty() {
val mimeMessage = MimeMessage(null, FileInputStream("src/test/resources/test_mail_html_no_attachments.eml"))
val mail = FakeMail.defaultFakeMail()
mail.recipients.add(MailAddress("test@gmail.com"))
mail.recipients.add(MailAddress("test1@gmail.com"))
mail.message = mimeMessage
val mailet = KafkaProducerMailet()

mailet.init()
mailet.service(mail)
}
}
56 changes: 54 additions & 2 deletions apache-james/extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@
<junit-jupiter-api.version>5.9.2</junit-jupiter-api.version>
<assertj-core.version>3.24.2</assertj-core.version>
<awaitility.version>4.2.0</awaitility.version>
<reactor-test.version>3.5.0</reactor-test.version>
<reactor.version>3.5.0</reactor.version>
<slf4j.version>2.0.1</slf4j.version>
<jsoup.version>1.15.4</jsoup.version>
<jmh-core.version>1.37</jmh-core.version>
<testcontainers.version>1.19.7</testcontainers.version>
<reactor-kafka.version>1.3.23</reactor-kafka.version>
<kafka-clients.version>3.6.0</kafka-clients.version>
<kotlin.version>2.0.21</kotlin.version>
<!-- plugin versions -->
<maven-surefire-plugin.version>3.0.0</maven-surefire-plugin.version>
<maven-compiler-plugin.version>3.10.1</maven-compiler-plugin.version>
Expand Down Expand Up @@ -84,6 +87,26 @@
<version>${mongodb-driver-reactivestreams.version}</version>
</dependency>

<!--kafka-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka-clients.version}</version>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>${reactor-kafka.version}</version>
</dependency>

<!--kotlin-->

<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-jdk8</artifactId>
<version>${kotlin.version}</version>
</dependency>

<!--test-->

<dependency>
Expand All @@ -105,11 +128,16 @@
<version>${james.baseVersion}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>${reactor.version}</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
<version>${reactor-test.version}</version>
<version>${reactor.version}</version>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
Expand Down Expand Up @@ -171,6 +199,30 @@
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-maven-plugin</artifactId>
<version>${kotlin.version}</version>
<executions>
<execution>
<id>compile</id>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile</id>
<phase>test-compile</phase>
<goals>
<goal>test-compile</goal>
</goals>
</execution>
</executions>
<configuration>
<jvmTarget>${maven.compiler.target}</jvmTarget>
</configuration>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ void setUp() {
@RepeatedTest(1)
void shouldSendMail() {
ExecutorService executorService = Executors.newFixedThreadPool(150);
for (int i = 0; i < 1000; i++) {
for (int i = 0; i < 1; i++) {
executorService.submit(() -> sendMail());
}
executorService.shutdown();
Expand Down

0 comments on commit c91a436

Please sign in to comment.