From eeedd64ac9a8cdbc5922c03b411dbd3ae973d588 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Thu, 5 Jan 2023 14:22:47 -0800 Subject: [PATCH] Introduce Default Replication Worker Performance Test Harness (#20956) Introduce a performance test harness for the default replication worker to make it easy for devs to test effect of changes on platform throughput. The current set up is designed to be run manually. In the future, we can look into integrating this report into our build pipelines. For now, this is good enough as I wanted to start somewhere. The general idea is to use JMH to run the test n number of times (currently 4 times). The dev can then look at logs to see throughput and how it varies. As of this PR, we see general platform throughput of ~ 20 - 25 MB/s. --- airbyte-commons-worker/build.gradle | 3 + .../general/DefaultReplicationWorker.java | 2 +- .../general/EmptyAirbyteDestination.java | 56 ++++++++++ .../workers/general/LimitedAirbyteSource.java | 57 ++++++++++ .../ReplicationWorkerPerformanceTest.java | 105 ++++++++++++++++++ .../workers/general/StubAirbyteMapper.java | 26 +++++ deps.toml | 3 + spotbugs-exclude-filter-file.xml | 3 + 8 files changed, 254 insertions(+), 1 deletion(-) create mode 100644 airbyte-commons-worker/src/test/java/io/airbyte/workers/general/EmptyAirbyteDestination.java create mode 100644 airbyte-commons-worker/src/test/java/io/airbyte/workers/general/LimitedAirbyteSource.java create mode 100644 airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java create mode 100644 airbyte-commons-worker/src/test/java/io/airbyte/workers/general/StubAirbyteMapper.java diff --git a/airbyte-commons-worker/build.gradle b/airbyte-commons-worker/build.gradle index 8051e8038d0d2..337afacd66c28 100644 --- a/airbyte-commons-worker/build.gradle +++ b/airbyte-commons-worker/build.gradle @@ -31,6 +31,7 @@ dependencies { testAnnotationProcessor platform(libs.micronaut.bom) testAnnotationProcessor libs.bundles.micronaut.test.annotation.processor + testAnnotationProcessor libs.jmh.annotations testImplementation libs.bundles.micronaut.test testImplementation 'com.jayway.jsonpath:json-path:2.7.0' @@ -38,6 +39,8 @@ dependencies { testImplementation libs.postgresql testImplementation libs.platform.testcontainers testImplementation libs.platform.testcontainers.postgresql + testImplementation libs.jmh.core + testImplementation libs.jmh.annotations testImplementation project(':airbyte-commons-docker') } diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java index 4b669bd9e71e9..f699ff07e3815 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java @@ -309,7 +309,7 @@ private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource sou return () -> { MDC.setContextMap(mdc); LOGGER.info("Replication thread started."); - Long recordsRead = 0L; + long recordsRead = 0L; final Map, Integer>> validationErrors = new HashMap<>(); final Map> streamToSelectedFields = new HashMap<>(); if (fieldSelectionEnabled) { diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/EmptyAirbyteDestination.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/EmptyAirbyteDestination.java new file mode 100644 index 0000000000000..f35c97a3027e1 --- /dev/null +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/EmptyAirbyteDestination.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.general; + +import io.airbyte.config.WorkerDestinationConfig; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.workers.internal.AirbyteDestination; +import java.nio.file.Path; +import java.util.Optional; + +/** + * Empty Airbyte Destination. Does nothing with messages. Intended for performance testing. + */ +public class EmptyAirbyteDestination implements AirbyteDestination { + + @Override + public void start(WorkerDestinationConfig destinationConfig, Path jobRoot) throws Exception { + + } + + @Override + public void accept(AirbyteMessage message) throws Exception { + + } + + @Override + public void notifyEndOfInput() throws Exception { + + } + + @Override + public boolean isFinished() { + return true; + } + + @Override + public int getExitValue() { + return 0; + } + + @Override + public Optional attemptRead() { + return Optional.empty(); + } + + @Override + public void close() throws Exception {} + + @Override + public void cancel() throws Exception { + + } + +} diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/LimitedAirbyteSource.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/LimitedAirbyteSource.java new file mode 100644 index 0000000000000..6a6ad64540e76 --- /dev/null +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/LimitedAirbyteSource.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.general; + +import io.airbyte.config.WorkerSourceConfig; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.workers.internal.AirbyteSource; +import io.airbyte.workers.test_utils.AirbyteMessageUtils; +import java.nio.file.Path; +import java.util.Optional; + +/** + * Basic Airbyte Source that emits {@link LimitedAirbyteSource#TOTAL_RECORDS} before finishing. + * Intended for performance testing. + */ +public class LimitedAirbyteSource implements AirbyteSource { + + private static final int TOTAL_RECORDS = 1_000_000; + + private int currentRecords = 0; + + @Override + public void start(WorkerSourceConfig sourceConfig, Path jobRoot) throws Exception { + + } + + @Override + public boolean isFinished() { + return currentRecords == TOTAL_RECORDS; + } + + @Override + public int getExitValue() { + return 0; + } + + @Override + public Optional attemptRead() { + currentRecords++; + return Optional.of(AirbyteMessageUtils.createRecordMessage("s1", "data", + "This is a fairly long sentence to provide some bytes here. More bytes is better as it helps us measure performance." + + "Random append to prevent dead code generation: " + currentRecords)); + } + + @Override + public void close() throws Exception { + + } + + @Override + public void cancel() throws Exception { + + } + +} diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java new file mode 100644 index 0000000000000..13728602785c7 --- /dev/null +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.general; + +import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType; +import io.airbyte.config.ReplicationOutput; +import io.airbyte.config.StandardSyncInput; +import io.airbyte.metrics.lib.NotImplementedMetricClient; +import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.JsonSchemaType; +import io.airbyte.protocol.models.SyncMode; +import io.airbyte.workers.RecordSchemaValidator; +import io.airbyte.workers.WorkerMetricReporter; +import io.airbyte.workers.exception.WorkerException; +import io.airbyte.workers.internal.NamespacingMapper; +import io.airbyte.workers.internal.book_keeping.AirbyteMessageTracker; +import java.io.IOException; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import lombok.extern.slf4j.Slf4j; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Warmup; + +@Slf4j +public class ReplicationWorkerPerformanceTest { + + /** + * Hook up the DefaultReplicationWorker to a test harness with an insanely quick Source + * {@link LimitedAirbyteSource} and Destination {@link EmptyAirbyteDestination}. + *

+ * Harness uses Java Micro Benchmark to run the E2E sync a configured number of times. It then + * reports a time distribution for the time taken to run the E2E sync. + *

+ * Because the reported time does not explicitly include throughput numbers, throughput logging has + * been added. This class is intended to help devs understand the impact of changes on throughput. + *

+ * To use this, simply run the main method, make yourself a cup of coffee for 5 mins, then look the + * logs. + */ + @Benchmark + // SampleTime = the time taken to run the benchmarked method. Use this because we only care about + // the time taken to sync the entire dataset. + @BenchmarkMode(Mode.SampleTime) + // Warming up the JVM stabilises results however takes longer. Skip this for now since we don't need + // that fine a result. + @Warmup(iterations = 0) + // How many runs to do. + @Fork(1) + // Within each run, how many iterations to do. + @Measurement(iterations = 2) + public void executeOneSync() throws InterruptedException { + final var perSource = new LimitedAirbyteSource(); + final var perDestination = new EmptyAirbyteDestination(); + final var messageTracker = new AirbyteMessageTracker(); + final var metricReporter = new WorkerMetricReporter(new NotImplementedMetricClient(), "test-image:0.01"); + final var dstNamespaceMapper = new NamespacingMapper(NamespaceDefinitionType.DESTINATION, "", ""); + final var validator = new RecordSchemaValidator(Map.of( + new AirbyteStreamNameNamespacePair("s1", null), + CatalogHelpers.fieldsToJsonSchema(io.airbyte.protocol.models.Field.of("data", JsonSchemaType.STRING)))); + + final var worker = new DefaultReplicationWorker("1", 0, + perSource, + dstNamespaceMapper, + perDestination, + messageTracker, + validator, + metricReporter, + false); + final AtomicReference output = new AtomicReference<>(); + final Thread workerThread = new Thread(() -> { + try { + output.set(worker.run(new StandardSyncInput().withCatalog(new ConfiguredAirbyteCatalog() + .withStreams(List.of(new ConfiguredAirbyteStream().withSyncMode(SyncMode.FULL_REFRESH).withStream(new AirbyteStream().withName("s1"))))), + Path.of("/"))); + } catch (final WorkerException e) { + throw new RuntimeException(e); + } + }); + + workerThread.start(); + workerThread.join(); + final var summary = output.get().getReplicationAttemptSummary(); + final var mbRead = summary.getBytesSynced() / 1_000_000; + final var timeTakenSec = (summary.getEndTime() - summary.getStartTime()) / 1000.0; + log.info("MBs read: {}, Time taken sec: {}, MB/s: {}", mbRead, timeTakenSec, mbRead / timeTakenSec); + } + + public static void main(String[] args) throws IOException { + // Run this main class to start benchmarking. + org.openjdk.jmh.Main.main(args); + } + +} diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/StubAirbyteMapper.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/StubAirbyteMapper.java new file mode 100644 index 0000000000000..d33634e5d9bcb --- /dev/null +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/StubAirbyteMapper.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.general; + +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.workers.internal.AirbyteMapper; + +/** + * Stub mapper testing what happens without any mapping. + */ +public class StubAirbyteMapper implements AirbyteMapper { + + @Override + public ConfiguredAirbyteCatalog mapCatalog(ConfiguredAirbyteCatalog catalog) { + return null; + } + + @Override + public AirbyteMessage mapMessage(AirbyteMessage message) { + return message; + } + +} diff --git a/deps.toml b/deps.toml index 7a54da9f967fa..e3eff5d61cb4f 100644 --- a/deps.toml +++ b/deps.toml @@ -15,6 +15,7 @@ fasterxml_version = "2.13.3" flyway = "7.14.0" glassfish_version = "2.31" hikaricp = "5.0.1" +jmh = "1.36" jooq = "3.13.4" junit-jupiter = "5.9.0" log4j = "2.17.2" @@ -68,6 +69,8 @@ jackson-datatype = { module = "com.fasterxml.jackson.datatype:jackson-datatype-j java-dogstatsd-client = { module = "com.datadoghq:java-dogstatsd-client", version = "4.1.0" } javax-databind = { module = "javax.xml.bind:jaxb-api", version = "2.4.0-b180830.0359" } jcl-over-slf4j = { module = "org.slf4j:jcl-over-slf4j", version.ref = "slf4j" } +jmh-core = { module = "org.openjdk.jmh:jmh-core", version.ref = "jmh" } +jmh-annotations = { module = "org.openjdk.jmh:jmh-generator-annprocess", version.ref = "jmh" } jooq = { module = "org.jooq:jooq", version.ref = "jooq" } jooq-codegen = { module = "org.jooq:jooq-codegen", version.ref = "jooq" } jooq-meta = { module = "org.jooq:jooq-meta", version.ref = "jooq" } diff --git a/spotbugs-exclude-filter-file.xml b/spotbugs-exclude-filter-file.xml index c5da06291781e..129c0baf73c8d 100644 --- a/spotbugs-exclude-filter-file.xml +++ b/spotbugs-exclude-filter-file.xml @@ -8,6 +8,9 @@ + + +