Skip to content

Commit

Permalink
A
Browse files Browse the repository at this point in the history
  • Loading branch information
paualarco committed Jun 1, 2020
1 parent ebd4f93 commit 87cde0d
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,3 @@ services:
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'



File renamed without changes.
75 changes: 0 additions & 75 deletions benchmarks/results.txt

This file was deleted.

5 changes: 4 additions & 1 deletion benchmarks/src/main/resources/commands
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@ docker exec -it broker kafka-console-consumer --bootstrap-server broker:9092 --t

docker exec -it broker /bin/bash

kafka-producer-perf-test --topic topic2 --num-records 100 --throughput -1 --producer-props bootstrap.servers=localhost:9092 batch.size=1000 acks=1 linger.ms=1000 buffer.memory=4294967296 request.timeout.ms=300000 --record-size 10
docker stop $(docker ps -a -q)

#from root directory
sbt 'benchmarks/jmh:run -o results.txt monix.kafka.benchmarks.KafkaProducerBenchmark'
5 changes: 5 additions & 0 deletions benchmarks/src/main/scala/monix/kafka/benchmarks/Ap.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package monix.kafka.benchmarks

object Ap {

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import org.scalatest.Suite
import scala.util.Random
trait KafkaFixture {

val topic_1P_2RF = "topic_1P_2RF"
val topic_1P_1RF = "topic_1P_1RF"
val topic_2P_1RF = "topic_2P_4RF"
val topic_2P_2RF = "topic_2P_2RF"
val topic_2P_4RF = "topic_2P_4RF"

val producerConf = KafkaProducerConfig.default.copy(
bootstrapServers = List("127.0.0.1:9092"),
Expand All @@ -21,9 +24,5 @@ trait KafkaFixture {
groupId = "kafka-tests"
)

val strToProducedRecord: String => Seq[ProducerRecord[String, String]] = {
v: String =>
Seq(new ProducerRecord[String, String](topic_1P_2RF, Random.nextInt().toString, v))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,31 @@ class KafkaProducerBenchmark extends KafkaFixture {

val producer = KafkaProducer[String, String](producerConf, io)

@Benchmark // kafka producer benchmark with 1 partition and 2 repartition factor
def produce_1P_2RF(): Seq[Option[RecordMetadata]] = {
val tasks = (0 until size).map(i => producer.send(topic = topic_1P_2RF, i.toString))
//syntax (P, P, RF) === (Parallelism factor, Partitions, Replication Factor)

@Benchmark
def produce_1P_1RF(): Seq[Option[RecordMetadata]] = {
val tasks = (0 until size).map(i => producer.send(topic = topic_1P_1RF, i.toString))
val results = Task.sequence(tasks).runSyncUnsafe()
require(results.filter(_.isDefined).isEmpty)
results
}

@Benchmark
def produce_2P_1RF(): Seq[Option[RecordMetadata]] = {
val tasks = (0 until size).map(i => producer.send(topic = topic_2P_1RF, i.toString))
val results = Task.sequence(tasks).runSyncUnsafe()
require(results.filter(_.isDefined).isEmpty)
results
}

@Benchmark
def produce_2P_2RF(): Seq[Option[RecordMetadata]] = {
val tasks = (0 until size).map(i => producer.send(topic = topic_2P_2RF, i.toString))
val results = Task.sequence(tasks).runSyncUnsafe()
require(results.filter(_.isDefined).isEmpty)
results
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,4 @@ docker ps
sleep 15

create_topic topic_1P_1RF 1 1
create_topic topic_2P_1RF 1 1
create_topic topic_2P_2RF 1 1
create_topic topic_2P_1RF 2 1

0 comments on commit 87cde0d

Please sign in to comment.