Skip to content

Commit

Permalink
Added sink and consumer benchmarks
Browse files Browse the repository at this point in the history
  • Loading branch information
paualarco committed Jun 15, 2020
1 parent 87cde0d commit 41e5887
Show file tree
Hide file tree
Showing 12 changed files with 317 additions and 38 deletions.
59 changes: 59 additions & 0 deletions benchmarks/resultats.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# JMH version: 1.21
# VM version: JDK 1.8.0_221, Java HotSpot(TM) 64-Bit Server VM, 25.221-b11
# VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home/jre/bin/java
# VM options: <none>
# Warmup: 2 iterations, 10 s each
# Measurement: 2 iterations, 10 s each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Throughput, ops/time
# Benchmark: monix.kafka.benchmarks.KafkaProducerBenchmark.monixProducer_1P_1RF
# Parameters: (size = 2)

# Run progress: 0.00% complete, ETA 00:01:20
# Fork: 1 of 1
# Warmup Iteration 1: 511.829 ops/s
# Warmup Iteration 2: 568.495 ops/s
Iteration 1: 679.205 ops/s
Iteration 2: 383.040 ops/s


Result "monix.kafka.benchmarks.KafkaProducerBenchmark.monixProducer_1P_1RF":
531.122 ops/s


# JMH version: 1.21
# VM version: JDK 1.8.0_221, Java HotSpot(TM) 64-Bit Server VM, 25.221-b11
# VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home/jre/bin/java
# VM options: <none>
# Warmup: 2 iterations, 10 s each
# Measurement: 2 iterations, 10 s each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Throughput, ops/time
# Benchmark: monix.kafka.benchmarks.KafkaProducerBenchmark.monixProducer_2P_1RF
# Parameters: (size = 2)

# Run progress: 50.00% complete, ETA 00:00:41
# Fork: 1 of 1
# Warmup Iteration 1: 388.808 ops/s
# Warmup Iteration 2: 616.387 ops/s
Iteration 1: 453.712 ops/s
Iteration 2: 709.331 ops/s


Result "monix.kafka.benchmarks.KafkaProducerBenchmark.monixProducer_2P_1RF":
581.521 ops/s


# Run complete. Total time: 00:01:22

REMEMBER: The numbers below are just data. To gain reusable insights, you need to follow up on
why the numbers are the way they are. Use profilers (see -prof, -lprof), design factorial
experiments, perform baseline and negative tests that provide experimental control, make sure
the benchmarking environment is safe on JVM/OS/HW level, ask for reviews from the domain experts.
Do not assume the numbers tell you what you want them to tell.

Benchmark (size) Mode Cnt Score Error Units
KafkaProducerBenchmark.monixProducer_1P_1RF 2 thrpt 2 531.122 ops/s
KafkaProducerBenchmark.monixProducer_2P_1RF 2 thrpt 2 581.521 ops/s
61 changes: 61 additions & 0 deletions benchmarks/sinkResults.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# JMH version: 1.21
# VM version: JDK 1.8.0_221, Java HotSpot(TM) 64-Bit Server VM, 25.221-b11
# VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home/jre/bin/java
# VM options: <none>
# Warmup: 1 iterations, 10 s each
# Measurement: 3 iterations, 10 s each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Throughput, ops/time
# Benchmark: monix.kafka.benchmarks.KafkaProducerSinkBenchmark.monixProducer_2P_1RF

# Run progress: 0.00% complete, ETA 00:01:20
# Fork: 1 of 1
# Warmup Iteration 1: 19325650.744 ops/s
Iteration 1: 22719038.623 ops/s
Iteration 2: 18410283.671 ops/s
Iteration 3: 17055280.316 ops/s


Result "monix.kafka.benchmarks.KafkaProducerSinkBenchmark.monixProducer_2P_1RF":
19394867.537 ±(99.9%) 53955153.846 ops/s [Average]
(min, avg, max) = (17055280.316, 19394867.537, 22719038.623), stdev = 2957464.046
CI (99.9%): [≈ 0, 73350021.383] (assumes normal distribution)


# JMH version: 1.21
# VM version: JDK 1.8.0_221, Java HotSpot(TM) 64-Bit Server VM, 25.221-b11
# VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home/jre/bin/java
# VM options: <none>
# Warmup: 1 iterations, 10 s each
# Measurement: 3 iterations, 10 s each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Throughput, ops/time
# Benchmark: monix.kafka.benchmarks.KafkaProducerSinkBenchmark.monixSink_1P_1RF

# Run progress: 50.00% complete, ETA 00:00:41
# Fork: 1 of 1
# Warmup Iteration 1: 14495673.023 ops/s
Iteration 1: 16978439.192 ops/s
Iteration 2: 14236465.295 ops/s
Iteration 3: 15592697.505 ops/s


Result "monix.kafka.benchmarks.KafkaProducerSinkBenchmark.monixSink_1P_1RF":
15602533.997 ±(99.9%) 25012388.495 ops/s [Average]
(min, avg, max) = (14236465.295, 15602533.997, 16978439.192), stdev = 1371013.414
CI (99.9%): [≈ 0, 40614922.492] (assumes normal distribution)


# Run complete. Total time: 00:01:22

REMEMBER: The numbers below are just data. To gain reusable insights, you need to follow up on
why the numbers are the way they are. Use profilers (see -prof, -lprof), design factorial
experiments, perform baseline and negative tests that provide experimental control, make sure
the benchmarking environment is safe on JVM/OS/HW level, ask for reviews from the domain experts.
Do not assume the numbers tell you what you want them to tell.

Benchmark Mode Cnt Score Error Units
KafkaProducerSinkBenchmark.monixProducer_2P_1RF thrpt 3 19394867.537 ± 53955153.846 ops/s
KafkaProducerSinkBenchmark.monixSink_1P_1RF thrpt 3 15602533.997 ± 25012388.495 ops/s
16 changes: 16 additions & 0 deletions benchmarks/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyyMMdd-HH:mm:ss.SSSZ} [%thread] %-5level %logger - %msg%n</pattern>
</encoder>
</appender>

<root level="WARN">
<appender-ref ref="STDOUT" />
</root>
<logger name="org.apache.kafka" level="WARN"/>
<logger name="org.apache.zookeeper" level="WARN"/>
<logger name="org.I0Itec.zkclient" level="WARN"/>
<logger name="org.apache.kafka.common.utils.AppInfoParser" level="ERROR"/>
<logger name="org.apache.kafka.clients.NetworkClient" level="ERROR"/>
</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package monix.kafka.benchmarks

import java.util.concurrent.TimeUnit

import monix.execution.Scheduler.Implicits.global
import monix.kafka.KafkaConsumerObservable
import org.openjdk.jmh.annotations.{BenchmarkMode, Fork, Measurement, Mode, OutputTimeUnit, Scope, State, Threads, Warmup, _}

@State(Scope.Thread)
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.SECONDS)
@Measurement(iterations = 3)
@Warmup(iterations = 1)
@Fork(2)
@Threads(2)
class KafkaConsumerBenchmark extends KafkaFixture {

var size: Int = 5

produceSinkGrouped(monix_consumer_1P_1RF, size * 2, 5).runSyncUnsafe()
produceSinkGrouped(monix_consumer_2P_1RF, size * 10, 5).runSyncUnsafe()

//syntax (P, P, RF) === (Parallelism factor, Partitions, Replication Factor)
@Benchmark
def monixSink_1P_1RF(): Unit = {
KafkaConsumerObservable[Integer, Integer](consumerConf, List(monix_consumer_1P_1RF))
.take(size - 1)
.lastL
.runSyncUnsafe()
}

@Benchmark
def monixProducer_2P_1RF(): Unit = {
KafkaConsumerObservable[Integer, Integer](consumerConf, List(monix_consumer_2P_1RF))
.take(size - 1)
.lastL
.runSyncUnsafe()
}

}
55 changes: 45 additions & 10 deletions benchmarks/src/main/scala/monix/kafka/benchmarks/KafkaFixture.scala
Original file line number Diff line number Diff line change
@@ -1,17 +1,34 @@
package monix.kafka.benchmarks

import monix.kafka.{KafkaConsumerConfig, KafkaProducer, KafkaProducerConfig}
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.stream.ActorMaterializer
import monix.eval.Task
import monix.kafka.{KafkaConsumerConfig, KafkaProducer, KafkaProducerConfig, KafkaProducerSink}
import monix.reactive.Observable
import org.apache.kafka.clients.producer.ProducerRecord
import net.manub.embeddedkafka.EmbeddedKafka
import org.scalatest.Suite
import org.apache.kafka.common.serialization.StringSerializer

import scala.util.Random
trait KafkaFixture {

val topic_1P_1RF = "topic_1P_1RF"
val topic_2P_1RF = "topic_2P_4RF"
val topic_2P_2RF = "topic_2P_2RF"
val topic_2P_4RF = "topic_2P_4RF"
def getTopicNames(testId: String): (String, String) = {
val topic: (String, Int, Int) => String = {
(testId, partitions: Int, replicationFactor: Int) =>
//syntax (P, RF) === (Partitions, Replication Factor)
s"monix_${testId}_${partitions}P_${replicationFactor}RF"
}
(topic(testId, 1, 1), topic(testId, 1, 2))
}

val producerTestId = "producer"
val sinkTestId = "sink"
val consumerTestId = "consumer"

//monix-kafka benchmarks
val (monix_producer_1P_1RF: String, monix_producer_2P_1RF) = getTopicNames(producerTestId)
val (monix_sink_1P_1RF: String, monix_sink_2P_1RF) = getTopicNames(sinkTestId)
val (monix_consumer_1P_1RF: String, monix_consumer_2P_1RF) = getTopicNames(consumerTestId)


val producerConf = KafkaProducerConfig.default.copy(
bootstrapServers = List("127.0.0.1:9092"),
Expand All @@ -20,9 +37,27 @@ trait KafkaFixture {

val consumerConf = KafkaConsumerConfig.default.copy(
bootstrapServers = List("127.0.0.1:9092"),
clientId = "consumer-monix-kafka-1-0-producer-test",
groupId = "kafka-tests"
clientId = monix_consumer_1P_1RF + "a",
groupId = monix_consumer_1P_1RF + "b",
enableAutoCommit = true,
)

def produceSinkGrouped(topic: String, size: Int, bufferSize: Int ): Task[Unit] = {
Observable
.from(1 to size)
.map(i => new ProducerRecord[Integer, Integer](topic, i))
.bufferTumbling(bufferSize)
.consumeWith(KafkaProducerSink(producerConf, io))
}

/*
implicit val system = ActorSystem("akka-test-benchmark")
implicit val mat = ActorMaterializer()
val producerSettings =
ProducerSettings(system, new StringSerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
*/

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,50 +3,47 @@ package monix.kafka.benchmarks
import org.openjdk.jmh.annotations.{BenchmarkMode, Fork, Measurement, Mode, OutputTimeUnit, Scope, State, Threads, Warmup}
import java.util.concurrent.TimeUnit

import akka.Done
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.Source
import monix.eval.Task
import monix.execution.Scheduler
import monix.kafka.KafkaProducer
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata}
import org.openjdk.jmh.annotations._
import monix.execution.Scheduler.Implicits.global

import scala.concurrent.{Await, Future}
import scala.util.Try

@State(Scope.Thread)
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.SECONDS)
@Measurement(iterations = 10)
@Warmup(iterations = 10)
@Fork(2)
@Measurement(iterations = 2)
@Warmup(iterations = 2)
@Fork(1)
@Threads(1)
class KafkaProducerBenchmark extends KafkaFixture {

@Param(Array("1000"))
var size: Int = _
@Param(Array("2"))
var size: Int = 2

val producer = KafkaProducer[String, String](producerConf, io)
val producer = KafkaProducer[Integer, Integer](producerConf, io)

//syntax (P, P, RF) === (Parallelism factor, Partitions, Replication Factor)

@Benchmark
def produce_1P_1RF(): Seq[Option[RecordMetadata]] = {
val tasks = (0 until size).map(i => producer.send(topic = topic_1P_1RF, i.toString))
val results = Task.sequence(tasks).runSyncUnsafe()
require(results.filter(_.isDefined).isEmpty)
results
}

@Benchmark
def produce_2P_1RF(): Seq[Option[RecordMetadata]] = {
val tasks = (0 until size).map(i => producer.send(topic = topic_2P_1RF, i.toString))
def monixProducer_1P_1RF(): Seq[Option[RecordMetadata]] = {
val tasks = (0 until size).map(i => producer.send(topic = monix_producer_1P_1RF, i))
val results = Task.sequence(tasks).runSyncUnsafe()
require(results.filter(_.isDefined).isEmpty)
//require(results.filter(_.isDefined).isEmpty)
results
}

@Benchmark
def produce_2P_2RF(): Seq[Option[RecordMetadata]] = {
val tasks = (0 until size).map(i => producer.send(topic = topic_2P_2RF, i.toString))
@Benchmark
def monixProducer_2P_1RF(): Seq[Option[RecordMetadata]] = {
val tasks = (0 until size).map(i => producer.send(topic = monix_producer_2P_1RF, i))
val results = Task.sequence(tasks).runSyncUnsafe()
require(results.filter(_.isDefined).isEmpty)
results
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package monix.kafka.benchmarks

import java.util.concurrent.TimeUnit

import org.openjdk.jmh.annotations.{BenchmarkMode, Fork, Measurement, Mode, OutputTimeUnit, Scope, State, Threads, Warmup, _}

@State(Scope.Thread)
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.SECONDS)
@Measurement(iterations = 3)
@Warmup(iterations = 1)
@Fork(1)
@Threads(1)
class KafkaProducerSinkBenchmark extends KafkaFixture {

var size: Int = 20

@Benchmark
def monixSink_1P_1RF(): Unit =
produceSinkGrouped(monix_sink_1P_1RF, size, 5)

@Benchmark
def monixSink_2P_1RF(): Unit =
produceSinkGrouped(monix_sink_1P_1RF, size, 5)

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ import monix.execution.Scheduler
package object benchmarks {

/** I/O scheduler meant for tests. */
implicit lazy val io = Scheduler.io("monix-kafka-benchmark")
lazy val io = Scheduler.io("monix-kafka-benchmark")
}
24 changes: 24 additions & 0 deletions benchmarks/src/test/scala/benchmarks/KafkaConsumerSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package benchmarks

import monix.execution.Scheduler.Implicits.global
import monix.kafka.{KafkaConsumerObservable, KafkaProducer}
import monix.kafka.benchmarks.KafkaFixture
import org.scalatest.{FlatSpec, Matchers}

class KafkaConsumerSpec extends FlatSpec with KafkaFixture with Matchers {

val producer = KafkaProducer[String, String](producerConf, global)

s"Monix ${monix_consumer_1P_1RF}" should "exist" in {
val t = producer.send(topic = monix_consumer_1P_1RF, "test")

t.runSyncUnsafe().isDefined shouldBe true
}

it should "allow " in {
produceSinkGrouped(monix_consumer_1P_1RF, 20, 5).runSyncUnsafe()
val elements = KafkaConsumerObservable[Integer, Integer](consumerConf, List(monix_consumer_1P_1RF)).take(20).toListL.runSyncUnsafe()
elements.size shouldBe 20
}

}
Loading

0 comments on commit 41e5887

Please sign in to comment.