diff --git a/databricks-sdk-java/src/main/java/com/databricks/sdk/core/UserAgent.java b/databricks-sdk-java/src/main/java/com/databricks/sdk/core/UserAgent.java index dad46553..f420b52a 100644 --- a/databricks-sdk-java/src/main/java/com/databricks/sdk/core/UserAgent.java +++ b/databricks-sdk-java/src/main/java/com/databricks/sdk/core/UserAgent.java @@ -92,7 +92,9 @@ public static void withPartner(String partner) { public static void withOtherInfo(String key, String value) { matchAlphanum(key); matchAlphanumOrSemVer(value); - otherInfo.add(new Info(key, value)); + synchronized (otherInfo) { + otherInfo.add(new Info(key, value)); + } } private static String osName() { @@ -119,10 +121,13 @@ public static String asString() { segments.add(String.format("databricks-sdk-java/%s", version)); segments.add(String.format("jvm/%s", jvmVersion())); segments.add(String.format("os/%s", osName())); - segments.addAll( - otherInfo.stream() - .map(e -> String.format("%s/%s", e.getKey(), e.getValue())) - .collect(Collectors.toSet())); + // Concurrent iteration over ArrayList must be guarded with synchronized. + synchronized (otherInfo) { + segments.addAll( + otherInfo.stream() + .map(e -> String.format("%s/%s", e.getKey(), e.getValue())) + .collect(Collectors.toSet())); + } return segments.stream().collect(Collectors.joining(" ")); } } diff --git a/databricks-sdk-java/src/test/java/com/databricks/sdk/benchmark/DatabricksAuthLoadTest.java b/databricks-sdk-java/src/test/java/com/databricks/sdk/benchmark/DatabricksAuthLoadIT.java similarity index 52% rename from databricks-sdk-java/src/test/java/com/databricks/sdk/benchmark/DatabricksAuthLoadTest.java rename to databricks-sdk-java/src/test/java/com/databricks/sdk/benchmark/DatabricksAuthLoadIT.java index fe0b92af..c7225349 100644 --- a/databricks-sdk-java/src/test/java/com/databricks/sdk/benchmark/DatabricksAuthLoadTest.java +++ b/databricks-sdk-java/src/test/java/com/databricks/sdk/benchmark/DatabricksAuthLoadIT.java @@ -2,20 +2,35 @@ import static org.junit.jupiter.api.Assertions.*; +import com.databricks.sdk.core.DatabricksConfig; +import com.databricks.sdk.core.commons.CommonsHttpClient; +import com.databricks.sdk.integration.framework.EnvContext; +import com.databricks.sdk.integration.framework.EnvOrSkip; +import com.databricks.sdk.integration.framework.EnvTest; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.*; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; -/* - This test executes the authentication workflow 200 times concurrently and verifies that all 200 runs complete successfully. - It is designed to address a previously observed issue where multiple SDK operations needed to authenticate, causing the OIDC endpoints to rate-limit the requests. - Now that these endpoints are configured to retry upon receiving a 429 error, the test runs successfully. - However, since this test generates a large number of requests, it should be run manually rather than being included in CI processes. -*/ -/* -public class DatabricksAuthLoadTest implements GitHubUtils, ConfigResolving { +/** + * This test executes the authentication workflow 200 times concurrently and verifies that all 200 + * runs complete successfully. It is designed to address a previously observed issue where multiple + * SDK operations needed to authenticate, causing the OIDC endpoints to rate-limit the requests. Now + * that these endpoints are configured to retry upon receiving a 429 error, the test runs + * successfully. However, since this test generates a large number of requests, it should be run + * manually rather than being included in CI processes. + */ +@EnvContext("workspace") +@ExtendWith(EnvTest.class) +public class DatabricksAuthLoadIT { @Test - @Disabled - public void testConcurrentConfigBasicAuthAttrs() throws Exception { + public void testConcurrentConfigBasicAuthAttrs( + @EnvOrSkip("DATABRICKS_HOST") String host, + @EnvOrSkip("DATABRICKS_CLIENT_ID") String clientId, + @EnvOrSkip("DATABRICKS_CLIENT_SECRET") String clientSecret) + throws Exception { int numThreads = 200; ExecutorService executorService = Executors.newFixedThreadPool(numThreads); List> futures = new ArrayList<>(); @@ -27,9 +42,9 @@ public void testConcurrentConfigBasicAuthAttrs() throws Exception { try { DatabricksConfig config = new DatabricksConfig() - .setHost("https://dbc-bb03964f-3f59.cloud.databricks.com") - .setClientId("<>") - .setClientSecret("<>"); + .setHost(host) + .setClientId(clientId) + .setClientSecret(clientSecret); config.setHttpClient(new CommonsHttpClient.Builder().withTimeoutSeconds(30).build()); config.authenticate(); @@ -70,4 +85,3 @@ public void testConcurrentConfigBasicAuthAttrs() throws Exception { assertEquals(0, failureCount); } } -*/ diff --git a/databricks-sdk-java/src/test/java/com/databricks/sdk/benchmark/README.md b/databricks-sdk-java/src/test/java/com/databricks/sdk/benchmark/README.md new file mode 100644 index 00000000..01f77063 --- /dev/null +++ b/databricks-sdk-java/src/test/java/com/databricks/sdk/benchmark/README.md @@ -0,0 +1,13 @@ +# Benchmark Tests + +The tests defined in this module are benchmarks of the Databricks SDK and its internal components. These tests ensure that the SDK works correctly and with reasonable performance under high load and concurrency. These tests are not run in normal CI builds to prevent regularly exhausting the REST API rate limit. + +Load tests of components that don't make any network requests (e.g. `UserAgentLoadTest`) should be colocated with the unit tests for that component. + +## Adding a Benchmark Test + +All test files in this module should be named `*LoadIT.java`. They should be written as integration tests, with the `@EnvContext()` and `@ExtendWith(EnvTest.class)` annotations. + +## Running the Benchmarks + +Use [IntelliJ's built-in test runner](https://www.jetbrains.com/help/idea/performing-tests.html) to run benchmark tests. diff --git a/databricks-sdk-java/src/test/java/com/databricks/sdk/benchmark/WorkspaceClientLoadIT.java b/databricks-sdk-java/src/test/java/com/databricks/sdk/benchmark/WorkspaceClientLoadIT.java new file mode 100644 index 00000000..68782214 --- /dev/null +++ b/databricks-sdk-java/src/test/java/com/databricks/sdk/benchmark/WorkspaceClientLoadIT.java @@ -0,0 +1,84 @@ +package com.databricks.sdk.benchmark; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.databricks.sdk.WorkspaceClient; +import com.databricks.sdk.core.DatabricksConfig; +import com.databricks.sdk.integration.framework.EnvContext; +import com.databricks.sdk.integration.framework.EnvOrSkip; +import com.databricks.sdk.integration.framework.EnvTest; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +/** + * This test executes a simple operation 150 times concurrently to verify that the end-to-end + * behavior of the SDK is correct under concurrent load. This test is designed to be run manually + * rather than being included in CI processes, as it generates a large number of requests. + */ +@EnvContext("workspace") +@ExtendWith(EnvTest.class) +public class WorkspaceClientLoadIT { + + @Test + public void testConcurrentCurrentUserMe( + @EnvOrSkip("DATABRICKS_HOST") String host, + @EnvOrSkip("DATABRICKS_CLIENT_ID") String clientId, + @EnvOrSkip("DATABRICKS_CLIENT_SECRET") String clientSecret) + throws Exception { + int numThreads = 150; + ExecutorService executorService = Executors.newFixedThreadPool(numThreads); + List> futures = new ArrayList<>(); + int successCount = 0; + int failureCount = 0; + + Callable task = + () -> { + try { + DatabricksConfig config = + new DatabricksConfig() + .setHost(host) + .setClientId(clientId) + .setClientSecret(clientSecret); + + WorkspaceClient w = new WorkspaceClient(config); + + // This should not throw an exception. + w.currentUser().me(); + + return true; + } catch (Exception e) { + System.err.println( + "DatabricksException occurred in thread " + Thread.currentThread().getName()); + e.printStackTrace(); + return false; + } + }; + + for (int i = 0; i < numThreads; i++) { + futures.add(executorService.submit(task)); + } + + executorService.shutdown(); + if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + + for (Future future : futures) { + if (future.get()) { + successCount++; + } else { + failureCount++; + } + } + + // Log the results + System.out.println("Number of successful threads: " + successCount); + System.out.println("Number of failed threads: " + failureCount); + + // Optionally, you can assert that there were no failures + assertEquals(0, failureCount); + } +} diff --git a/databricks-sdk-java/src/test/java/com/databricks/sdk/core/UserAgentLoadTest.java b/databricks-sdk-java/src/test/java/com/databricks/sdk/core/UserAgentLoadTest.java new file mode 100644 index 00000000..c07e2b64 --- /dev/null +++ b/databricks-sdk-java/src/test/java/com/databricks/sdk/core/UserAgentLoadTest.java @@ -0,0 +1,64 @@ +package com.databricks.sdk.core; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; +import org.junit.jupiter.api.Test; + +/** + * Load tests for the UserAgent class. + * + *

This is not part of the benchmark module because it doesn't make any network requests. + */ +public class UserAgentLoadTest { + + @Test + public void testAsStringConcurrent() throws InterruptedException, ExecutionException { + int numThreads = 200; + ExecutorService executorService = Executors.newFixedThreadPool(numThreads); + List> futures = new ArrayList<>(); + int successCount = 0; + int failureCount = 0; + + // Add some user agent info + Callable task = + () -> { + try { + UserAgent.withOtherInfo("key1", "value1"); + UserAgent.asString(); + return true; + } catch (Exception e) { + System.err.println( + "DatabricksException occurred in thread " + Thread.currentThread().getName()); + e.printStackTrace(); + return false; + } + }; + + for (int i = 0; i < numThreads; i++) { + futures.add(executorService.submit(task)); + } + + executorService.shutdown(); + if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + + for (Future future : futures) { + if (future.get()) { + successCount++; + } else { + failureCount++; + } + } + + // Log the results + System.out.println("Number of successful threads: " + successCount); + System.out.println("Number of failed threads: " + failureCount); + + // Optionally, you can assert that there were no failures + assertEquals(0, failureCount); + } +} diff --git a/pom.xml b/pom.xml index 73dd92c7..137ccd3c 100644 --- a/pom.xml +++ b/pom.xml @@ -103,10 +103,31 @@ + + org.apache.maven.plugins + maven-surefire-plugin + 3.1.2 + + + + **/benchmark/** + + + maven-failsafe-plugin 3.2.5 + + + **/benchmark/** + 2