From 5ca767b229fdf3da2f0354ba4842a9a1a7d64aa5 Mon Sep 17 00:00:00 2001 From: Mikayla Thompson Date: Tue, 14 Jan 2025 15:29:42 -0700 Subject: [PATCH] Add an OpensearchClient for ES 6.x (#1230) Create OpenSearchClient for 6.x --------- Signed-off-by: Mikayla Thompson Co-authored-by: Peter Nied --- .../opensearch/migrations/CreateSnapshot.java | 5 +- .../migrations/TestCreateSnapshot.java | 37 ++- .../opensearch/migrations/DataGenerator.java | 5 +- .../migrations/DataGeneratorEndToEnd.java | 7 + .../migrations/RfsMigrateDocuments.java | 4 +- .../migrations/bulkload/EndToEndTest.java | 13 +- .../ParallelDocumentMigrationsTest.java | 12 +- .../bulkload/ProcessLifecycleTest.java | 11 +- .../migrations/bulkload/SourceTestBase.java | 11 +- .../migrations/BaseMigrationTest.java | 7 +- .../bulkload/common/OpenSearchClient.java | 160 ++----------- .../common/OpenSearchClientFactory.java | 213 ++++++++++++++++++ .../OpenSearchClient_ES_6_8.java | 35 +++ .../OpenSearchClient_OS_2_11.java | 25 ++ .../version_os_2_11/RemoteWriter_OS_2_11.java | 4 +- .../version_universal/RemoteReader.java | 5 +- .../version_universal/RemoteReaderClient.java | 16 +- .../cluster/ClusterProviderRegistry.java | 7 +- .../common/OpenSearchClientFactoryTest.java | 183 +++++++++++++++ .../bulkload/common/OpenSearchClientTest.java | 131 +---------- .../framework/SimpleRestoreFromSnapshot.java | 10 +- .../OpenSearchClient_ES_6_8_Test.java | 118 ++++++++++ 22 files changed, 703 insertions(+), 316 deletions(-) create mode 100644 RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClientFactory.java create mode 100644 RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_6_8/OpenSearchClient_ES_6_8.java create mode 100644 RFS/src/main/java/org/opensearch/migrations/bulkload/version_os_2_11/OpenSearchClient_OS_2_11.java create mode 100644 RFS/src/test/java/org/opensearch/migrations/bulkload/common/OpenSearchClientFactoryTest.java create mode 100644 RFS/src/test/java/org/opensearch/migrations/bulkload/version_es_6_8/OpenSearchClient_ES_6_8_Test.java diff --git a/CreateSnapshot/src/main/java/org/opensearch/migrations/CreateSnapshot.java b/CreateSnapshot/src/main/java/org/opensearch/migrations/CreateSnapshot.java index 40982a4b3..2c70dfabb 100644 --- a/CreateSnapshot/src/main/java/org/opensearch/migrations/CreateSnapshot.java +++ b/CreateSnapshot/src/main/java/org/opensearch/migrations/CreateSnapshot.java @@ -3,7 +3,7 @@ import java.util.List; import org.opensearch.migrations.bulkload.common.FileSystemSnapshotCreator; -import org.opensearch.migrations.bulkload.common.OpenSearchClient; +import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory; import org.opensearch.migrations.bulkload.common.S3SnapshotCreator; import org.opensearch.migrations.bulkload.common.SnapshotCreator; import org.opensearch.migrations.bulkload.common.http.ConnectionContext; @@ -135,7 +135,8 @@ public static void main(String[] args) throws Exception { private ICreateSnapshotContext context; public void run() { - var client = new OpenSearchClient(arguments.sourceArgs.toConnectionContext()); + var clientFactory = new OpenSearchClientFactory(arguments.sourceArgs.toConnectionContext()); + var client = clientFactory.determineVersionAndCreate(); SnapshotCreator snapshotCreator; if (arguments.fileSystemRepoPath != null) { snapshotCreator = new FileSystemSnapshotCreator( diff --git a/CreateSnapshot/src/test/java/org/opensearch/migrations/TestCreateSnapshot.java b/CreateSnapshot/src/test/java/org/opensearch/migrations/TestCreateSnapshot.java index 36757a224..cd0be8d40 100644 --- a/CreateSnapshot/src/test/java/org/opensearch/migrations/TestCreateSnapshot.java +++ b/CreateSnapshot/src/test/java/org/opensearch/migrations/TestCreateSnapshot.java @@ -6,9 +6,10 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; -import org.opensearch.migrations.bulkload.common.OpenSearchClient; +import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory; import org.opensearch.migrations.bulkload.common.S3SnapshotCreator; import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams; import org.opensearch.migrations.bulkload.worker.SnapshotRunner; @@ -29,6 +30,21 @@ @Slf4j public class TestCreateSnapshot { + private static final byte[] ROOT_RESPONSE_ES_7_10_2 = "{\"version\": {\"number\": \"7.10.2\"}}".getBytes(StandardCharsets.UTF_8); + private static final Map ROOT_RESPONSE_HEADERS = Map.of( + "Content-Type", + "application/json", + "Content-Length", + "" + ROOT_RESPONSE_ES_7_10_2.length + ); + private static final byte[] CLUSTER_SETTINGS_COMPATIBILITY_OVERRIDE_DISABLED = "{\"persistent\":{\"compatibility\":{\"override_main_response_version\":\"false\"}}}".getBytes(StandardCharsets.UTF_8); + private static final Map CLUSTER_SETTINGS_COMPATIBILITY_HEADERS = Map.of( + "Content-Type", + "application/json", + "Content-Length", + "" + CLUSTER_SETTINGS_COMPATIBILITY_OVERRIDE_DISABLED.length + ); + final SnapshotTestContext snapshotContext = SnapshotTestContext.factory().noOtelTracking(); final byte[] payloadBytes = "Success".getBytes(StandardCharsets.UTF_8); final Map headers = Map.of( @@ -46,17 +62,24 @@ public void testRepoRegisterAndSnapshotCreateRequests() throws Exception { try (var destinationServer = SimpleNettyHttpServer.makeNettyServer(false, Duration.ofMinutes(10), fl -> { + if (Objects.equals(fl.uri(), "/")) { + return new SimpleHttpResponse(ROOT_RESPONSE_HEADERS, ROOT_RESPONSE_ES_7_10_2, "OK", 200); + } else if (Objects.equals(fl.uri(), "/_cluster/settings")) { + return new SimpleHttpResponse(CLUSTER_SETTINGS_COMPATIBILITY_HEADERS, CLUSTER_SETTINGS_COMPATIBILITY_OVERRIDE_DISABLED, + "OK", 200); + } capturedRequestList.add(new AbstractMap.SimpleEntry<>(fl, fl.content().toString(StandardCharsets.UTF_8))); return new SimpleHttpResponse(headers, payloadBytes, "OK", 200); })) { final var endpoint = destinationServer.localhostEndpoint().toString(); - var sourceClient = new OpenSearchClient(ConnectionContextTestParams.builder() + var sourceClientFactory = new OpenSearchClientFactory(ConnectionContextTestParams.builder() .host(endpoint) .insecure(true) .build() .toConnectionContext()); + var sourceClient = sourceClientFactory.determineVersionAndCreate(); var snapshotCreator = new S3SnapshotCreator( snapshotName, sourceClient, @@ -111,7 +134,12 @@ public void testSnapshotCreateWithIndexAllowlist() throws Exception { try (var destinationServer = SimpleNettyHttpServer.makeNettyServer(false, Duration.ofMinutes(10), fl -> { - if (fl.uri().equals("/_snapshot/migration_assistant_repo/" + snapshotName)) { + if (Objects.equals(fl.uri(), "/")) { + return new SimpleHttpResponse(ROOT_RESPONSE_HEADERS, ROOT_RESPONSE_ES_7_10_2, "OK", 200); + } else if (Objects.equals(fl.uri(), "/_cluster/settings")) { + return new SimpleHttpResponse(CLUSTER_SETTINGS_COMPATIBILITY_HEADERS, CLUSTER_SETTINGS_COMPATIBILITY_OVERRIDE_DISABLED, + "OK", 200); + } else if (fl.uri().equals("/_snapshot/migration_assistant_repo/" + snapshotName)) { createSnapshotRequest.set(fl); createSnapshotRequestContent.set(fl.content().toString(StandardCharsets.UTF_8)); } @@ -120,11 +148,12 @@ public void testSnapshotCreateWithIndexAllowlist() throws Exception { { final var endpoint = destinationServer.localhostEndpoint().toString(); - var sourceClient = new OpenSearchClient(ConnectionContextTestParams.builder() + var sourceClientFactory = new OpenSearchClientFactory(ConnectionContextTestParams.builder() .host(endpoint) .insecure(true) .build() .toConnectionContext()); + var sourceClient = sourceClientFactory.determineVersionAndCreate(); var snapshotCreator = new S3SnapshotCreator( snapshotName, sourceClient, diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/DataGenerator.java b/DataGenerator/src/main/java/org/opensearch/migrations/DataGenerator.java index ed5ff2402..d3a3d0fca 100644 --- a/DataGenerator/src/main/java/org/opensearch/migrations/DataGenerator.java +++ b/DataGenerator/src/main/java/org/opensearch/migrations/DataGenerator.java @@ -2,7 +2,7 @@ import java.text.NumberFormat; -import org.opensearch.migrations.bulkload.common.OpenSearchClient; +import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory; import org.opensearch.migrations.data.WorkloadGenerator; import org.opensearch.migrations.utils.ProcessHelpers; @@ -35,7 +35,8 @@ public static void main(String[] args) { public void run(DataGeneratorArgs arguments) { var connectionContext = arguments.targetArgs.toConnectionContext(); - var client = new OpenSearchClient(connectionContext); + var clientFactory = new OpenSearchClientFactory(connectionContext); + var client = clientFactory.determineVersionAndCreate(); var startTimeMillis = System.currentTimeMillis(); var workloadGenerator = new WorkloadGenerator(client); diff --git a/DataGenerator/src/test/java/org/opensearch/migrations/DataGeneratorEndToEnd.java b/DataGenerator/src/test/java/org/opensearch/migrations/DataGeneratorEndToEnd.java index 3b6cd58db..f4a36d605 100644 --- a/DataGenerator/src/test/java/org/opensearch/migrations/DataGeneratorEndToEnd.java +++ b/DataGenerator/src/test/java/org/opensearch/migrations/DataGeneratorEndToEnd.java @@ -30,6 +30,13 @@ void generateData_OS_2_14() throws Exception { } } + @Test + void generateData_ES_6_8() throws Exception { + try (var targetCluster = new SearchClusterContainer(SearchClusterContainer.ES_V6_8_23)) { + generateData(targetCluster); + } + } + @SneakyThrows void generateData(final SearchClusterContainer targetCluster) { // ACTION: Set up the target clusters diff --git a/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java b/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java index 723f5f04c..47d0102b9 100644 --- a/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java +++ b/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java @@ -13,6 +13,7 @@ import org.opensearch.migrations.bulkload.common.FileSystemRepo; import org.opensearch.migrations.bulkload.common.LuceneDocumentsReader; import org.opensearch.migrations.bulkload.common.OpenSearchClient; +import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory; import org.opensearch.migrations.bulkload.common.S3Repo; import org.opensearch.migrations.bulkload.common.S3Uri; import org.opensearch.migrations.bulkload.common.SnapshotShardUnpacker; @@ -278,7 +279,8 @@ public static void main(String[] args) throws Exception { workerId) ) { MDC.put(LOGGING_MDC_WORKER_ID, workerId); // I don't see a need to clean this up since we're in main - OpenSearchClient targetClient = new OpenSearchClient(connectionContext); + var clientFactory = new OpenSearchClientFactory(connectionContext); + OpenSearchClient targetClient = clientFactory.determineVersionAndCreate(); DocumentReindexer reindexer = new DocumentReindexer(targetClient, arguments.numDocsPerBulkRequest, arguments.numBytesPerBulkRequest, diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/EndToEndTest.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/EndToEndTest.java index 1f1b65694..d2794b7ee 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/EndToEndTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/EndToEndTest.java @@ -9,7 +9,7 @@ import org.opensearch.migrations.bulkload.common.FileSystemRepo; import org.opensearch.migrations.bulkload.common.FileSystemSnapshotCreator; -import org.opensearch.migrations.bulkload.common.OpenSearchClient; +import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory; import org.opensearch.migrations.bulkload.common.RestClient; import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams; import org.opensearch.migrations.bulkload.framework.SearchClusterContainer; @@ -105,11 +105,12 @@ private void migrationDocumentsWithClusters( // === ACTION: Take a snapshot === var snapshotName = "my_snap"; - var sourceClient = new OpenSearchClient(ConnectionContextTestParams.builder() - .host(sourceCluster.getUrl()) - .insecure(true) - .build() - .toConnectionContext()); + var sourceClientFactory = new OpenSearchClientFactory(ConnectionContextTestParams.builder() + .host(sourceCluster.getUrl()) + .insecure(true) + .build() + .toConnectionContext()); + var sourceClient = sourceClientFactory.determineVersionAndCreate(); var snapshotCreator = new FileSystemSnapshotCreator( snapshotName, sourceClient, diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ParallelDocumentMigrationsTest.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ParallelDocumentMigrationsTest.java index df097f3bb..1bad466b6 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ParallelDocumentMigrationsTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ParallelDocumentMigrationsTest.java @@ -13,7 +13,7 @@ import org.opensearch.migrations.CreateSnapshot; import org.opensearch.migrations.bulkload.common.FileSystemRepo; -import org.opensearch.migrations.bulkload.common.OpenSearchClient; +import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory; import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams; import org.opensearch.migrations.bulkload.framework.SearchClusterContainer; import org.opensearch.migrations.data.WorkloadGenerator; @@ -72,11 +72,11 @@ public void testDocumentMigration( ).join(); // Populate the source cluster with data - var client = new OpenSearchClient(ConnectionContextTestParams.builder() - .host(esSourceContainer.getUrl()) - .build() - .toConnectionContext() - ); + var clientFactory = new OpenSearchClientFactory(ConnectionContextTestParams.builder() + .host(esSourceContainer.getUrl()) + .build() + .toConnectionContext()); + var client = clientFactory.determineVersionAndCreate(); var generator = new WorkloadGenerator(client); generator.generate(new WorkloadOptions()); diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java index 38488f0b8..ea3acfae7 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java @@ -8,7 +8,7 @@ import java.util.function.Function; import org.opensearch.migrations.CreateSnapshot; -import org.opensearch.migrations.bulkload.common.OpenSearchClient; +import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory; import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams; import org.opensearch.migrations.bulkload.framework.SearchClusterContainer; import org.opensearch.migrations.data.WorkloadGenerator; @@ -124,11 +124,12 @@ private void testProcess(int expectedExitCode, Function proces ).join(); // Populate the source cluster with data - var client = new OpenSearchClient(ConnectionContextTestParams.builder() - .host(esSourceContainer.getUrl()) - .build() - .toConnectionContext() + var clientFactory = new OpenSearchClientFactory(ConnectionContextTestParams.builder() + .host(esSourceContainer.getUrl()) + .build() + .toConnectionContext() ); + var client = clientFactory.determineVersionAndCreate(); var generator = new WorkloadGenerator(client); generator.generate(new WorkloadOptions()); diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/SourceTestBase.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/SourceTestBase.java index 49f9c54a0..96e1a8433 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/SourceTestBase.java +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/SourceTestBase.java @@ -26,7 +26,7 @@ import org.opensearch.migrations.bulkload.common.DocumentReindexer; import org.opensearch.migrations.bulkload.common.FileSystemRepo; import org.opensearch.migrations.bulkload.common.LuceneDocumentsReader; -import org.opensearch.migrations.bulkload.common.OpenSearchClient; +import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory; import org.opensearch.migrations.bulkload.common.RestClient; import org.opensearch.migrations.bulkload.common.RfsLuceneDocument; import org.opensearch.migrations.bulkload.common.SnapshotShardUnpacker; @@ -274,13 +274,14 @@ public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker( UUID.randomUUID().toString(), Clock.offset(Clock.systemUTC(), Duration.ofMillis(nextClockShift)) )) { - return RfsMigrateDocuments.run( - readerFactory, - new DocumentReindexer(new OpenSearchClient(ConnectionContextTestParams.builder() + var clientFactory = new OpenSearchClientFactory(ConnectionContextTestParams.builder() .host(targetAddress) .compressionEnabled(compressionEnabled) .build() - .toConnectionContext()), 1000, Long.MAX_VALUE, 1, defaultDocTransformer), + .toConnectionContext()); + return RfsMigrateDocuments.run( + readerFactory, + new DocumentReindexer(clientFactory.determineVersionAndCreate(), 1000, Long.MAX_VALUE, 1, defaultDocTransformer), new OpenSearchWorkCoordinator( new CoordinateWorkHttpClient(ConnectionContextTestParams.builder() .host(targetAddress) diff --git a/MetadataMigration/src/test/java/org/opensearch/migrations/BaseMigrationTest.java b/MetadataMigration/src/test/java/org/opensearch/migrations/BaseMigrationTest.java index 03451edf8..0322e268c 100644 --- a/MetadataMigration/src/test/java/org/opensearch/migrations/BaseMigrationTest.java +++ b/MetadataMigration/src/test/java/org/opensearch/migrations/BaseMigrationTest.java @@ -6,6 +6,7 @@ import org.opensearch.migrations.bulkload.common.FileSystemSnapshotCreator; import org.opensearch.migrations.bulkload.common.OpenSearchClient; +import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory; import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams; import org.opensearch.migrations.bulkload.framework.SearchClusterContainer; import org.opensearch.migrations.bulkload.http.ClusterOperations; @@ -58,11 +59,12 @@ protected void startClusters() { @SneakyThrows protected String createSnapshot(String snapshotName) { var snapshotContext = SnapshotTestContext.factory().noOtelTracking(); - var sourceClient = new OpenSearchClient(ConnectionContextTestParams.builder() + var clientFactory = new OpenSearchClientFactory(ConnectionContextTestParams.builder() .host(sourceCluster.getUrl()) .insecure(true) .build() .toConnectionContext()); + var sourceClient = clientFactory.determineVersionAndCreate(); var snapshotCreator = new org.opensearch.migrations.bulkload.common.FileSystemSnapshotCreator( snapshotName, sourceClient, @@ -115,11 +117,12 @@ protected MigrationItemResult executeMigration(MigrateOrEvaluateArgs arguments, * @return An OpenSearch client. */ protected OpenSearchClient createClient(SearchClusterContainer cluster) { - return new OpenSearchClient(ConnectionContextTestParams.builder() + var clientFactory = new OpenSearchClientFactory(ConnectionContextTestParams.builder() .host(cluster.getUrl()) .insecure(true) .build() .toConnectionContext()); + return clientFactory.determineVersionAndCreate(); } protected SnapshotTestContext createSnapshotContext() { diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java index e1d68950d..08eaf1f62 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java @@ -4,7 +4,6 @@ import java.net.HttpURLConnection; import java.time.Duration; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -12,9 +11,7 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; -import org.opensearch.migrations.Flavor; import org.opensearch.migrations.Version; -import org.opensearch.migrations.VersionMatchers; import org.opensearch.migrations.bulkload.common.http.ConnectionContext; import org.opensearch.migrations.bulkload.common.http.HttpResponse; import org.opensearch.migrations.bulkload.tracing.IRfsContexts; @@ -22,7 +19,6 @@ import org.opensearch.migrations.reindexer.FailedRequestsLogger; import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.extern.slf4j.Slf4j; @@ -30,23 +26,16 @@ import reactor.util.retry.Retry; @Slf4j -public class OpenSearchClient { +public abstract class OpenSearchClient { protected static final ObjectMapper objectMapper = new ObjectMapper(); - /** Amazon OpenSearch Serverless cluster don't have a version number, but - * its closely aligned with the latest open-source OpenSearch 2.X */ - private static final Version AMAZON_SERVERLESS_VERSION = Version.builder() - .flavor(Flavor.AMAZON_SERVERLESS_OPENSEARCH) - .major(2) - .build(); - private static final int DEFAULT_MAX_RETRY_ATTEMPTS = 3; private static final Duration DEFAULT_BACKOFF = Duration.ofSeconds(1); private static final Duration DEFAULT_MAX_BACKOFF = Duration.ofSeconds(10); private static final Retry SNAPSHOT_RETRY_STRATEGY = Retry.backoff(DEFAULT_MAX_RETRY_ATTEMPTS, DEFAULT_BACKOFF) .maxBackoff(DEFAULT_MAX_BACKOFF); - protected static final Retry CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY = + public static final Retry CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY = Retry.backoff(DEFAULT_MAX_RETRY_ATTEMPTS, DEFAULT_BACKOFF) .maxBackoff(DEFAULT_MAX_BACKOFF); private static final Retry CREATE_ITEM_EXISTS_RETRY_STRATEGY = @@ -68,143 +57,20 @@ public class OpenSearchClient { protected final RestClient client; protected final FailedRequestsLogger failedRequestsLogger; + private final Version version; - public OpenSearchClient(ConnectionContext connectionContext) { - this(new RestClient(connectionContext), new FailedRequestsLogger()); + protected OpenSearchClient(ConnectionContext connectionContext, Version version) { + this(new RestClient(connectionContext), new FailedRequestsLogger(), version); } - protected OpenSearchClient(RestClient client, FailedRequestsLogger failedRequestsLogger) { + protected OpenSearchClient(RestClient client, FailedRequestsLogger failedRequestsLogger, Version version) { this.client = client; this.failedRequestsLogger = failedRequestsLogger; + this.version = version; } public Version getClusterVersion() { - var versionFromRootApi = client.getAsync("", null) - .flatMap(resp -> { - if (resp.statusCode == 200) { - return versionFromResponse(resp); - } - // If the root API doesn't exist, the cluster is OpenSearch Serverless - if (resp.statusCode == 404) { - return Mono.just(AMAZON_SERVERLESS_VERSION); - } - return Mono.error(new UnexpectedStatusCode(resp)); - }) - .doOnError(e -> log.error(e.getMessage())) - .retryWhen(CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY) - .block(); - - // Compatibility mode is only enabled on OpenSearch clusters responding with the version of 7.10.2 - if (!VersionMatchers.isES_7_10.test(versionFromRootApi)) { - return versionFromRootApi; - } - return client.getAsync("_cluster/settings", null) - .flatMap(this::checkCompatibilityModeFromResponse) - .doOnError(e -> log.error(e.getMessage())) - .retryWhen(CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY) - .flatMap(hasCompatibilityModeEnabled -> { - log.atInfo().setMessage("Checking CompatibilityMode, was enabled? {}").addArgument(hasCompatibilityModeEnabled).log(); - if (Boolean.FALSE.equals(hasCompatibilityModeEnabled)) { - return Mono.just(versionFromRootApi); - } - return client.getAsync("_nodes/_all/nodes,version?format=json", null) - .flatMap(this::getVersionFromNodes) - .doOnError(e -> log.error(e.getMessage())) - .retryWhen(CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY); - }) - .onErrorResume(e -> { - log.atWarn() - .setCause(e) - .setMessage("Unable to CompatibilityMode or determine the version from a plugin, falling back to version {}") - .addArgument(versionFromRootApi).log(); - return Mono.just(versionFromRootApi); - }) - .block(); - } - - private Mono versionFromResponse(HttpResponse resp) { - try { - var body = objectMapper.readTree(resp.body); - var versionNode = body.get("version"); - - var versionNumberString = versionNode.get("number").asText(); - var parts = versionNumberString.split("\\."); - var versionBuilder = Version.builder() - .major(Integer.parseInt(parts[0])) - .minor(Integer.parseInt(parts[1])) - .patch(parts.length > 2 ? Integer.parseInt(parts[2]) : 0); - - var distroNode = versionNode.get("distribution"); - if (distroNode != null && distroNode.asText().equalsIgnoreCase("opensearch")) { - versionBuilder.flavor(getLikelyOpenSearchFlavor()); - } else { - versionBuilder.flavor(Flavor.ELASTICSEARCH); - } - return Mono.just(versionBuilder.build()); - } catch (Exception e) { - log.error("Unable to parse version from response", e); - return Mono.error(new OperationFailed("Unable to parse version from response: " + e.getMessage(), resp)); - } - } - - Mono checkCompatibilityModeFromResponse(HttpResponse resp) { - if (resp.statusCode != 200) { - return Mono.error(new UnexpectedStatusCode(resp)); - } - try { - var body = Optional.of(objectMapper.readTree(resp.body)); - var persistentlyInCompatibilityMode = inCompatibilityMode(body.map(n -> n.get("persistent"))); - var transientlyInCompatibilityMode = inCompatibilityMode(body.map(n -> n.get("transient"))); - return Mono.just(persistentlyInCompatibilityMode || transientlyInCompatibilityMode); - } catch (Exception e) { - log.error("Unable to determine if the cluster is in compatibility mode", e); - return Mono.error(new OperationFailed("Unable to determine if the cluster is in compatibility mode from response: " + e.getMessage(), resp)); - } - } - - private boolean inCompatibilityMode(Optional node) { - return node.filter(n -> !n.isNull()) - .map(n -> n.get("compatibility")) - .filter(n -> !n.isNull()) - .map(n -> n.get("override_main_response_version")) - .filter(n -> !n.isNull()) - .map(n -> n.asBoolean()) - .orElse(false); - } - - private Mono getVersionFromNodes(HttpResponse resp) { - if (resp.statusCode != 200) { - return Mono.error(new UnexpectedStatusCode(resp)); - } - var foundVersions = new HashSet(); - try { - - var nodes = objectMapper.readTree(resp.body) - .get("nodes"); - nodes.fields().forEachRemaining(node -> { - var versionNumber = node.getValue().get("version").asText(); - var nodeVersion = Version.fromString(getLikelyOpenSearchFlavor() + " " + versionNumber); - foundVersions.add(nodeVersion); - }); - - if (foundVersions.isEmpty()) { - return Mono.error(new OperationFailed("Unable to find any version numbers", resp)); - } - - if (foundVersions.size() == 1) { - return Mono.just(foundVersions.stream().findFirst().get()); - } - - return Mono.error(new OperationFailed("Multiple version numbers discovered on nodes, " + foundVersions, resp)); - - } catch (Exception e) { - log.error("Unable to check node versions", e); - return Mono.error(new OperationFailed("Unable to check node versions: " + e.getMessage(), resp)); - } - } - - private Flavor getLikelyOpenSearchFlavor() { - return client.getConnectionContext().isAwsSpecificAuthentication() ? Flavor.AMAZON_MANAGED_OPENSEARCH : Flavor.OPENSEARCH; + return version; } /* @@ -269,6 +135,8 @@ public boolean hasIndex(String indexName) { return hasObjectCheck(indexName, null); } + protected abstract String getCreateIndexPath(String indexName); + /* * Create an index if it does not already exist. Returns an Optional; if the index was created, it * will be the created object and empty otherwise. @@ -278,7 +146,7 @@ public Optional createIndex( ObjectNode settings, IRfsContexts.ICheckedIdempotentPutRequestContext context ) { - String targetPath = indexName; + var targetPath = getCreateIndexPath(indexName); return createObjectIdempotent(targetPath, settings, context); } @@ -442,7 +310,9 @@ public Optional getSnapshotStatus( } } - Retry getBulkRetryStrategy() { + protected abstract String getBulkRequestPath(String indexName); + + protected Retry getBulkRetryStrategy() { return BULK_RETRY_STRATEGY; } @@ -451,7 +321,7 @@ public Mono sendBulkRequest(String indexName, List { final var docsMap = docs.stream().collect(Collectors.toMap(d -> d.getDocId(), d -> d)); return Mono.defer(() -> { - final String targetPath = indexName + "/_bulk"; + final String targetPath = getBulkRequestPath(indexName); log.atTrace().setMessage("Creating bulk body with document ids {}").addArgument(docsMap::keySet).log(); var body = BulkDocSection.convertToBulkRequestBody(docsMap.values()); var additionalHeaders = new HashMap>(); diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClientFactory.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClientFactory.java new file mode 100644 index 000000000..ec9639823 --- /dev/null +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClientFactory.java @@ -0,0 +1,213 @@ +package org.opensearch.migrations.bulkload.common; + + +import java.lang.reflect.InvocationTargetException; +import java.util.HashSet; +import java.util.Optional; + +import org.opensearch.migrations.Flavor; +import org.opensearch.migrations.Version; +import org.opensearch.migrations.VersionMatchers; +import org.opensearch.migrations.bulkload.common.http.ConnectionContext; +import org.opensearch.migrations.bulkload.common.http.HttpResponse; +import org.opensearch.migrations.bulkload.version_es_6_8.OpenSearchClient_ES_6_8; +import org.opensearch.migrations.bulkload.version_os_2_11.OpenSearchClient_OS_2_11; +import org.opensearch.migrations.reindexer.FailedRequestsLogger; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Mono; + +@Getter +@Slf4j +public class OpenSearchClientFactory { + private static final ObjectMapper objectMapper = new ObjectMapper(); + + private ConnectionContext connectionContext; + private Version version; + RestClient client; + + public OpenSearchClientFactory(ConnectionContext connectionContext) { + if (connectionContext == null) { + throw new IllegalArgumentException("Connection context was not provided in constructor."); + } + this.connectionContext = connectionContext; + this.client = new RestClient(connectionContext); + } + + public OpenSearchClient determineVersionAndCreate() { + if (version == null) { + version = getClusterVersion(); + } + var clientClass = getOpenSearchClientClass(version); + try { + return clientClass.getConstructor(ConnectionContext.class, Version.class) + .newInstance(connectionContext, version); + } catch (NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException("Failed to instantiate OpenSearchClient", e); + } + } + + public OpenSearchClient determineVersionAndCreate(RestClient restClient, FailedRequestsLogger failedRequestsLogger) { + if (version == null) { + version = getClusterVersion(); + } + var clientClass = getOpenSearchClientClass(version); + try { + return clientClass.getConstructor(RestClient.class, FailedRequestsLogger.class, Version.class) + .newInstance(restClient, failedRequestsLogger, version); + } catch (NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException("Failed to instantiate OpenSearchClient", e); + } + } + + private Class getOpenSearchClientClass(Version version) { + if (VersionMatchers.isOS_1_X.or(VersionMatchers.isOS_2_X).or(VersionMatchers.isES_7_X).test(version)) { + return OpenSearchClient_OS_2_11.class; + } else if (VersionMatchers.isES_6_X.test(version)) { + return OpenSearchClient_ES_6_8.class; + } else { + throw new IllegalArgumentException("Unsupported version: " + version); + } + } + + /** Amazon OpenSearch Serverless cluster don't have a version number, but + * it is closely aligned with the latest open-source OpenSearch 2.X */ + private static final Version AMAZON_SERVERLESS_VERSION = Version.builder() + .flavor(Flavor.AMAZON_SERVERLESS_OPENSEARCH) + .major(2) + .build(); + + public Version getClusterVersion() { + var versionFromRootApi = client.getAsync("", null) + .flatMap(resp -> { + if (resp.statusCode == 200) { + return versionFromResponse(resp); + } + // If the root API doesn't exist, the cluster is OpenSearch Serverless + if (resp.statusCode == 404) { + return Mono.just(AMAZON_SERVERLESS_VERSION); + } + return Mono.error(new OpenSearchClient.UnexpectedStatusCode(resp)); + }) + .doOnError(e -> log.error(e.getMessage())) + .retryWhen(OpenSearchClient.CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY) + .block(); + + // Compatibility mode is only enabled on OpenSearch clusters responding with the version of 7.10.2 + if (!VersionMatchers.isES_7_10.test(versionFromRootApi)) { + return versionFromRootApi; + } + return client.getAsync("_cluster/settings", null) + .flatMap(this::checkCompatibilityModeFromResponse) + .doOnError(e -> log.error(e.getMessage())) + .retryWhen(OpenSearchClient.CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY) + .flatMap(hasCompatibilityModeEnabled -> { + log.atInfo().setMessage("Checking CompatibilityMode, was enabled? {}").addArgument(hasCompatibilityModeEnabled).log(); + if (Boolean.FALSE.equals(hasCompatibilityModeEnabled)) { + return Mono.just(versionFromRootApi); + } + return client.getAsync("_nodes/_all/nodes,version?format=json", null) + .flatMap(this::getVersionFromNodes) + .doOnError(e -> log.error(e.getMessage())) + .retryWhen(OpenSearchClient.CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY); + }) + .onErrorResume(e -> { + log.atWarn() + .setCause(e) + .setMessage("Unable to CompatibilityMode or determine the version from a plugin, falling back to version {}") + .addArgument(versionFromRootApi).log(); + return Mono.just(versionFromRootApi); + }) + .block(); + } + + private Mono versionFromResponse(HttpResponse resp) { + try { + var body = objectMapper.readTree(resp.body); + var versionNode = body.get("version"); + + var versionNumberString = versionNode.get("number").asText(); + var parts = versionNumberString.split("\\."); + var versionBuilder = Version.builder() + .major(Integer.parseInt(parts[0])) + .minor(Integer.parseInt(parts[1])) + .patch(parts.length > 2 ? Integer.parseInt(parts[2]) : 0); + + var distroNode = versionNode.get("distribution"); + if (distroNode != null && distroNode.asText().equalsIgnoreCase("opensearch")) { + versionBuilder.flavor(getLikelyOpenSearchFlavor()); + } else { + versionBuilder.flavor(Flavor.ELASTICSEARCH); + } + return Mono.just(versionBuilder.build()); + } catch (Exception e) { + log.error("Unable to parse version from response", e); + return Mono.error(new OpenSearchClient.OperationFailed("Unable to parse version from response: " + e.getMessage(), resp)); + } + } + + Mono checkCompatibilityModeFromResponse(HttpResponse resp) { + if (resp.statusCode != 200) { + return Mono.error(new OpenSearchClient.UnexpectedStatusCode(resp)); + } + try { + var body = Optional.of(objectMapper.readTree(resp.body)); + var persistentlyInCompatibilityMode = inCompatibilityMode(body.map(n -> n.get("persistent"))); + var transientlyInCompatibilityMode = inCompatibilityMode(body.map(n -> n.get("transient"))); + return Mono.just(persistentlyInCompatibilityMode || transientlyInCompatibilityMode); + } catch (Exception e) { + log.error("Unable to determine if the cluster is in compatibility mode", e); + return Mono.error(new OpenSearchClient.OperationFailed("Unable to determine if the cluster is in compatibility mode from response: " + e.getMessage(), resp)); + } + } + + private boolean inCompatibilityMode(Optional node) { + return node.filter(n -> !n.isNull()) + .map(n -> n.get("compatibility")) + .filter(n -> !n.isNull()) + .map(n -> n.get("override_main_response_version")) + .filter(n -> !n.isNull()) + .map(n -> n.asBoolean()) + .orElse(false); + } + + private Mono getVersionFromNodes(HttpResponse resp) { + if (resp.statusCode != 200) { + return Mono.error(new OpenSearchClient.UnexpectedStatusCode(resp)); + } + var foundVersions = new HashSet(); + try { + + var nodes = objectMapper.readTree(resp.body) + .get("nodes"); + nodes.fields().forEachRemaining(node -> { + var versionNumber = node.getValue().get("version").asText(); + var nodeVersion = Version.fromString(getLikelyOpenSearchFlavor() + " " + versionNumber); + foundVersions.add(nodeVersion); + }); + + if (foundVersions.isEmpty()) { + return Mono.error(new OpenSearchClient.OperationFailed("Unable to find any version numbers", resp)); + } + + if (foundVersions.size() == 1) { + return Mono.just(foundVersions.stream().findFirst().get()); + } + + return Mono.error(new OpenSearchClient.OperationFailed("Multiple version numbers discovered on nodes, " + foundVersions, resp)); + + } catch (Exception e) { + log.error("Unable to check node versions", e); + return Mono.error(new OpenSearchClient.OperationFailed("Unable to check node versions: " + e.getMessage(), resp)); + } + } + + private Flavor getLikelyOpenSearchFlavor() { + return client.getConnectionContext().isAwsSpecificAuthentication() ? Flavor.AMAZON_MANAGED_OPENSEARCH : Flavor.OPENSEARCH; + } + + +} diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_6_8/OpenSearchClient_ES_6_8.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_6_8/OpenSearchClient_ES_6_8.java new file mode 100644 index 000000000..1d40e0df1 --- /dev/null +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_6_8/OpenSearchClient_ES_6_8.java @@ -0,0 +1,35 @@ +package org.opensearch.migrations.bulkload.version_es_6_8; + +import org.opensearch.migrations.Flavor; +import org.opensearch.migrations.Version; +import org.opensearch.migrations.bulkload.common.OpenSearchClient; +import org.opensearch.migrations.bulkload.common.RestClient; +import org.opensearch.migrations.bulkload.common.http.ConnectionContext; +import org.opensearch.migrations.reindexer.FailedRequestsLogger; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class OpenSearchClient_ES_6_8 extends OpenSearchClient { + public OpenSearchClient_ES_6_8(ConnectionContext connectionContext, Version version) { + super(connectionContext, version); + if (version.getFlavor() != Flavor.ELASTICSEARCH && version.getMajor() != 6) { + log.atWarn().setMessage("OpenSearchClient_ES_6_8 created for cluster with version {}").addArgument(version.toString()).log(); + } + } + + public OpenSearchClient_ES_6_8(RestClient client, FailedRequestsLogger failedRequestsLogger, Version version) { + super(client, failedRequestsLogger, version); + if (version.getFlavor() != Flavor.ELASTICSEARCH && version.getMajor() != 6) { + log.atWarn().setMessage("OpenSearchClient_ES_6_8 created for cluster with version {}").addArgument(version.toString()).log(); + } + } + + protected String getCreateIndexPath(String indexName) { + return indexName + "?include_type_name=false"; + } + + protected String getBulkRequestPath(String indexName) { + return indexName + "/_doc/_bulk"; + } +} diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_os_2_11/OpenSearchClient_OS_2_11.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_os_2_11/OpenSearchClient_OS_2_11.java new file mode 100644 index 000000000..83c92c71f --- /dev/null +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_os_2_11/OpenSearchClient_OS_2_11.java @@ -0,0 +1,25 @@ +package org.opensearch.migrations.bulkload.version_os_2_11; + +import org.opensearch.migrations.Version; +import org.opensearch.migrations.bulkload.common.OpenSearchClient; +import org.opensearch.migrations.bulkload.common.RestClient; +import org.opensearch.migrations.bulkload.common.http.ConnectionContext; +import org.opensearch.migrations.reindexer.FailedRequestsLogger; + +public class OpenSearchClient_OS_2_11 extends OpenSearchClient { + public OpenSearchClient_OS_2_11(ConnectionContext connectionContext, Version version) { + super(connectionContext, version); + } + + public OpenSearchClient_OS_2_11(RestClient client, FailedRequestsLogger failedRequestsLogger, Version version) { + super(client, failedRequestsLogger, version); + } + + protected String getCreateIndexPath(String indexName) { + return indexName; + } + + protected String getBulkRequestPath(String indexName) { + return indexName + "/_bulk"; + } +} diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_os_2_11/RemoteWriter_OS_2_11.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_os_2_11/RemoteWriter_OS_2_11.java index c562454a7..74a00c9b5 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_os_2_11/RemoteWriter_OS_2_11.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_os_2_11/RemoteWriter_OS_2_11.java @@ -3,6 +3,7 @@ import org.opensearch.migrations.Version; import org.opensearch.migrations.VersionMatchers; import org.opensearch.migrations.bulkload.common.OpenSearchClient; +import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory; import org.opensearch.migrations.bulkload.common.http.ConnectionContext; import org.opensearch.migrations.bulkload.models.DataFilterArgs; import org.opensearch.migrations.cluster.ClusterWriter; @@ -76,7 +77,8 @@ public String toString() { private OpenSearchClient getClient() { if (client == null) { - client = new OpenSearchClient(getConnection()); + var clientFactory = new OpenSearchClientFactory(getConnection()); + client = clientFactory.determineVersionAndCreate(); } return client; } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteReader.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteReader.java index 7f52443dc..af94a1b37 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteReader.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteReader.java @@ -2,7 +2,7 @@ import org.opensearch.migrations.Version; import org.opensearch.migrations.VersionMatchers; -import org.opensearch.migrations.bulkload.common.OpenSearchClient; +import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory; import org.opensearch.migrations.bulkload.common.http.ConnectionContext; import org.opensearch.migrations.bulkload.models.GlobalMetadata.Factory; import org.opensearch.migrations.bulkload.models.IndexMetadata; @@ -44,7 +44,8 @@ public IndexMetadata.Factory getIndexMetadata() { public Version getVersion() { if (version == null) { // Use a throw away client that will work on any version of the service - version = new OpenSearchClient(getConnection()).getClusterVersion(); + var clientFactory = new OpenSearchClientFactory(connection); + version = clientFactory.getClusterVersion(); } return version; } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteReaderClient.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteReaderClient.java index 9ed64b302..a2178ab04 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteReaderClient.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteReaderClient.java @@ -18,7 +18,17 @@ public class RemoteReaderClient extends OpenSearchClient { public RemoteReaderClient(ConnectionContext connection) { - super(connection); + super(connection, null); + } + + @Override + protected String getCreateIndexPath(String indexName) { + return indexName; + } + + @Override + protected String getBulkRequestPath(String indexName) { + return indexName + "/_bulk"; } protected Map getTemplateEndpoints() { @@ -36,7 +46,7 @@ public ObjectNode getClusterData() { .flatMap(this::getJsonForTemplateApis) .map(json -> Map.entry(entry.getKey(), json)) .doOnError(e -> log.error("Error fetching template {}: {}", entry.getKey(), e.getMessage())) - .retryWhen(CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY) + .retryWhen(OpenSearchClient.CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY) ) .collectMap(Entry::getKey, Entry::getValue) .block(); @@ -72,7 +82,7 @@ public ObjectNode getIndexes() { .getAsync(endpoint, null) .flatMap(this::getJsonForIndexApis) .doOnError(e -> log.error(e.getMessage())) - .retryWhen(CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY) + .retryWhen(OpenSearchClient.CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY) ) .collectList() .block(); diff --git a/RFS/src/main/java/org/opensearch/migrations/cluster/ClusterProviderRegistry.java b/RFS/src/main/java/org/opensearch/migrations/cluster/ClusterProviderRegistry.java index 3dcb69e77..d336bdfbd 100644 --- a/RFS/src/main/java/org/opensearch/migrations/cluster/ClusterProviderRegistry.java +++ b/RFS/src/main/java/org/opensearch/migrations/cluster/ClusterProviderRegistry.java @@ -5,7 +5,7 @@ import java.util.stream.Stream; import org.opensearch.migrations.Version; -import org.opensearch.migrations.bulkload.common.OpenSearchClient; +import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory; import org.opensearch.migrations.bulkload.common.SourceRepo; import org.opensearch.migrations.bulkload.common.http.ConnectionContext; import org.opensearch.migrations.bulkload.models.DataFilterArgs; @@ -58,7 +58,8 @@ public ClusterSnapshotReader getSnapshotReader(Version version, SourceRepo repo) * @return The remote resource provider */ public ClusterReader getRemoteReader(ConnectionContext connection) { - var client = new OpenSearchClient(connection); + var clientFactory = new OpenSearchClientFactory(connection); + var client = clientFactory.determineVersionAndCreate(); var version = client.getClusterVersion(); var remoteProvider = getRemoteProviders(connection) @@ -79,7 +80,7 @@ public ClusterReader getRemoteReader(ConnectionContext connection) { */ public ClusterWriter getRemoteWriter(ConnectionContext connection, Version versionOverride, DataFilterArgs dataFilterArgs) { var version = Optional.ofNullable(versionOverride) - .orElseGet(() -> new OpenSearchClient(connection).getClusterVersion()); + .orElseGet(() -> new OpenSearchClientFactory(connection).getClusterVersion()); var remoteProvider = getRemoteProviders(connection) .filter(p -> p.compatibleWith(version)) diff --git a/RFS/src/test/java/org/opensearch/migrations/bulkload/common/OpenSearchClientFactoryTest.java b/RFS/src/test/java/org/opensearch/migrations/bulkload/common/OpenSearchClientFactoryTest.java new file mode 100644 index 000000000..f640729a9 --- /dev/null +++ b/RFS/src/test/java/org/opensearch/migrations/bulkload/common/OpenSearchClientFactoryTest.java @@ -0,0 +1,183 @@ +package org.opensearch.migrations.bulkload.common; + +import java.net.URI; +import java.util.Map; +import java.util.function.BiFunction; +import java.util.function.Function; + +import org.opensearch.migrations.Version; +import org.opensearch.migrations.bulkload.common.http.ConnectionContext; +import org.opensearch.migrations.bulkload.common.http.HttpResponse; +import org.opensearch.migrations.reindexer.FailedRequestsLogger; + +import com.fasterxml.jackson.core.StreamReadFeature; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mock.Strictness; +import org.mockito.junit.jupiter.MockitoExtension; +import reactor.core.publisher.Mono; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class OpenSearchClientFactoryTest { + private static final String NODES_RESPONSE_OS_2_13_0 = "{\r\n" + // + " \"_nodes\": {\r\n" + // + " \"total\": 1,\r\n" + // + " \"successful\": 1,\r\n" + // + " \"failed\": 0\r\n" + // + " },\r\n" + // + " \"cluster_name\": \"336984078605:target-domain\",\r\n" + // + " \"nodes\": {\r\n" + // + " \"HDzrwdO8TneRQaxzx94uKA\": {\r\n" + // + " \"name\": \"74c8fa743d5e3626e3903c3b1d5450e0\",\r\n" + // + " \"version\": \"2.13.0\",\r\n" + // + " \"build_type\": \"tar\",\r\n" + // + " \"build_hash\": \"unknown\",\r\n" + // + " \"roles\": [\r\n" + // + " \"data\",\r\n" + // + " \"ingest\",\r\n" + // + " \"master\",\r\n" + // + " \"remote_cluster_client\"\r\n" + // + " ]\r\n" + // + " }\r\n" + // + " }\r\n" + // + "}"; + private static final String CLUSTER_SETTINGS_COMPATIBILITY_OVERRIDE_ENABLED = "{\"persistent\":{\"compatibility\":{\"override_main_response_version\":\"true\"}}}"; + private static final String CLUSTER_SETTINGS_COMPATIBILITY_OVERRIDE_DISABLED = "{\"persistent\":{\"compatibility\":{\"override_main_response_version\":\"false\"}}}"; + private static final String ROOT_RESPONSE_OS_1_0_0 = "{\"version\":{\"distribution\":\"opensearch\",\"number\":\"1.0.0\"}}"; + private static final String ROOT_RESPONSE_ES_7_10_2 = "{\"version\": {\"number\": \"7.10.2\"}}"; + private static final ObjectMapper OBJECT_MAPPER = JsonMapper.builder() + .enable(StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION) + .build(); + + @Mock(strictness = Strictness.LENIENT) + RestClient restClient; + + @Mock(strictness = Strictness.LENIENT) + ConnectionContext connectionContext; + + @Mock + FailedRequestsLogger failedRequestLogger; + + OpenSearchClientFactory openSearchClientFactory; + + @BeforeEach + void beforeTest() { + when(connectionContext.getUri()).thenReturn(URI.create("http://localhost/")); + when(connectionContext.isAwsSpecificAuthentication()).thenReturn(false); + when(restClient.getConnectionContext()).thenReturn(connectionContext); + openSearchClientFactory = spy(new OpenSearchClientFactory(connectionContext)); + openSearchClientFactory.client = restClient; + } + + @Test + void testGetClusterVersion_ES_7_10() { + setupOkResponse(restClient, "", ROOT_RESPONSE_ES_7_10_2); + setupOkResponse(restClient, "_cluster/settings", CLUSTER_SETTINGS_COMPATIBILITY_OVERRIDE_DISABLED); + + var version = openSearchClientFactory.getClusterVersion(); + + assertThat(version, equalTo(Version.fromString("ES 7.10.2"))); + verify(restClient).getAsync("", null); + verify(restClient).getAsync("_cluster/settings", null); + verifyNoMoreInteractions(restClient); + } + + @Test + void testGetClusterVersion_OS_CompatibilityModeEnabled() { + when(connectionContext.isAwsSpecificAuthentication()).thenReturn(true); + setupOkResponse(restClient, "", ROOT_RESPONSE_ES_7_10_2); + setupOkResponse(restClient, "_cluster/settings", CLUSTER_SETTINGS_COMPATIBILITY_OVERRIDE_ENABLED); + setupOkResponse(restClient, "_nodes/_all/nodes,version?format=json", NODES_RESPONSE_OS_2_13_0); + + var version = openSearchClientFactory.getClusterVersion(); + + assertThat(version, equalTo(Version.fromString("AOS 2.13.0"))); + verify(restClient).getAsync("", null); + verify(restClient).getAsync("_cluster/settings", null); + verify(restClient).getAsync("_nodes/_all/nodes,version?format=json", null); + } + + @Test + void testGetClusterVersion_OS_CompatibilityModeDisableEnabled() { + setupOkResponse(restClient, "", ROOT_RESPONSE_OS_1_0_0); + setupOkResponse(restClient, "_cluster/settings", CLUSTER_SETTINGS_COMPATIBILITY_OVERRIDE_DISABLED); + + var version = openSearchClientFactory.getClusterVersion(); + + assertThat(version, equalTo(Version.fromString("OS 1.0.0"))); + verify(restClient).getConnectionContext(); + verify(restClient).getAsync("", null); + verifyNoMoreInteractions(restClient); + } + + @Test + void testGetClusterVersion_OS_CompatibilityModeFailure_UseFallback() { + setupOkResponse(restClient, "", ROOT_RESPONSE_ES_7_10_2); + + var versionResponse = new HttpResponse(403, "Forbidden", Map.of(), ""); + when(restClient.getAsync("_cluster/settings", null)).thenReturn(Mono.just(versionResponse)); + + var version = openSearchClientFactory.getClusterVersion(); + + assertThat(version, equalTo(Version.fromString("ES 7.10.2"))); + verify(restClient).getAsync("", null); + verify(restClient).getAsync("_cluster/settings", null); + verifyNoMoreInteractions(restClient); + } + + private void setupOkResponse(RestClient restClient, String url, String body) { + var versionResponse = new HttpResponse(200, "OK", Map.of(), body); + when(restClient.getAsync(url, null)).thenReturn(Mono.just(versionResponse)); + } + + @Test + void testGetClusterVersion_OS_Serverless() { + var versionResponse = new HttpResponse(404, "Not Found", Map.of(), ""); + when(restClient.getAsync("", null)).thenReturn(Mono.just(versionResponse)); + + var version = openSearchClientFactory.getClusterVersion(); + + assertThat(version, equalTo(Version.fromString("AOSS 2.x.x"))); + verify(restClient, times(1)).getAsync("", null); + verifyNoMoreInteractions(restClient); + } + + @Test + void testCheckCompatibilityModeFromResponse() { + Function createCompatibilitySection = (Boolean value) -> + OBJECT_MAPPER.createObjectNode() + .set("compatibility", OBJECT_MAPPER.createObjectNode() + .put("override_main_response_version", value)); + + BiFunction createSettingsResponse = (Boolean persistentVal, Boolean transientVal) -> { + var body = OBJECT_MAPPER.createObjectNode() + .set("persistent", createCompatibilitySection.apply(persistentVal)) + .set("transient", createCompatibilitySection.apply(transientVal)) + .toPrettyString(); + return new HttpResponse(200, "OK", null, body); + }; + + var bothTrue = openSearchClientFactory.checkCompatibilityModeFromResponse(createSettingsResponse.apply(true, true)); + assertThat(bothTrue.block(), equalTo(true)); + var persistentTrue = openSearchClientFactory.checkCompatibilityModeFromResponse(createSettingsResponse.apply(true, false)); + assertThat(persistentTrue.block(), equalTo(true)); + var transientTrue = openSearchClientFactory.checkCompatibilityModeFromResponse(createSettingsResponse.apply(false, true)); + assertThat(transientTrue.block(), equalTo(true)); + var neitherTrue = openSearchClientFactory.checkCompatibilityModeFromResponse(createSettingsResponse.apply(false, false)); + assertThat(neitherTrue.block(), equalTo(false)); + } +} diff --git a/RFS/src/test/java/org/opensearch/migrations/bulkload/common/OpenSearchClientTest.java b/RFS/src/test/java/org/opensearch/migrations/bulkload/common/OpenSearchClientTest.java index 4d6e0e16c..c26f5a5f1 100644 --- a/RFS/src/test/java/org/opensearch/migrations/bulkload/common/OpenSearchClientTest.java +++ b/RFS/src/test/java/org/opensearch/migrations/bulkload/common/OpenSearchClientTest.java @@ -1,11 +1,10 @@ package org.opensearch.migrations.bulkload.common; +import java.net.URI; import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.function.BiFunction; -import java.util.function.Function; import org.opensearch.migrations.Version; import org.opensearch.migrations.bulkload.common.http.ConnectionContext; @@ -14,10 +13,10 @@ import org.opensearch.migrations.bulkload.http.BulkRequestGenerator.BulkItemResponseEntry; import org.opensearch.migrations.bulkload.tracing.IRfsContexts; import org.opensearch.migrations.bulkload.tracing.IRfsContexts.ICheckedIdempotentPutRequestContext; +import org.opensearch.migrations.bulkload.version_os_2_11.OpenSearchClient_OS_2_11; import org.opensearch.migrations.reindexer.FailedRequestsLogger; import com.fasterxml.jackson.core.StreamReadFeature; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.json.JsonMapper; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -53,32 +52,6 @@ @ExtendWith(MockitoExtension.class) class OpenSearchClientTest { - private static final String NODES_RESPONSE_OS_2_13_0 = "{\r\n" + // - " \"_nodes\": {\r\n" + // - " \"total\": 1,\r\n" + // - " \"successful\": 1,\r\n" + // - " \"failed\": 0\r\n" + // - " },\r\n" + // - " \"cluster_name\": \"336984078605:target-domain\",\r\n" + // - " \"nodes\": {\r\n" + // - " \"HDzrwdO8TneRQaxzx94uKA\": {\r\n" + // - " \"name\": \"74c8fa743d5e3626e3903c3b1d5450e0\",\r\n" + // - " \"version\": \"2.13.0\",\r\n" + // - " \"build_type\": \"tar\",\r\n" + // - " \"build_hash\": \"unknown\",\r\n" + // - " \"roles\": [\r\n" + // - " \"data\",\r\n" + // - " \"ingest\",\r\n" + // - " \"master\",\r\n" + // - " \"remote_cluster_client\"\r\n" + // - " ]\r\n" + // - " }\r\n" + // - " }\r\n" + // - "}"; - private static final String CLUSTER_SETTINGS_COMPATIBILITY_OVERRIDE_ENABLED = "{\"persistent\":{\"compatibility\":{\"override_main_response_version\":\"true\"}}}"; - private static final String CLUSTER_SETTINGS_COMPATIBILITY_OVERRIDE_DISABLED = "{\"persistent\":{\"compatibility\":{\"override_main_response_version\":\"false\"}}}"; - private static final String ROOT_RESPONSE_OS_1_0_0 = "{\"version\":{\"distribution\":\"opensearch\",\"number\":\"1.0.0\"}}"; - private static final String ROOT_RESPONSE_ES_7_10_2 = "{\"version\": {\"number\": \"7.10.2\"}}"; private static final ObjectMapper OBJECT_MAPPER = JsonMapper.builder() .enable(StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION) .build(); @@ -86,7 +59,7 @@ class OpenSearchClientTest { @Mock(strictness = Strictness.LENIENT) RestClient restClient; - @Mock + @Mock(strictness = Strictness.LENIENT) ConnectionContext connectionContext; @Mock @@ -96,8 +69,9 @@ class OpenSearchClientTest { @BeforeEach void beforeTest() { - doReturn(connectionContext).when(restClient).getConnectionContext(); - openSearchClient = spy(new OpenSearchClient(restClient, failedRequestLogger)); + when(connectionContext.getUri()).thenReturn(URI.create("http://localhost/")); + when(restClient.getConnectionContext()).thenReturn(connectionContext); + openSearchClient = spy(new OpenSearchClient_OS_2_11(restClient, failedRequestLogger, Version.fromString("OS 2.11"))); } @Test @@ -172,79 +146,11 @@ void testCreateIndex_errorOnCreation_notRetriedOnBadRequest() { assertThat(exception.getMessage(), containsString("illegal_argument_exception")); } - @Test - void testGetClusterVersion_ES_7_10() { - setupOkResponse(restClient, "", ROOT_RESPONSE_ES_7_10_2); - setupOkResponse(restClient, "_cluster/settings", CLUSTER_SETTINGS_COMPATIBILITY_OVERRIDE_DISABLED); - - var version = openSearchClient.getClusterVersion(); - - assertThat(version, equalTo(Version.fromString("ES 7.10.2"))); - verify(restClient).getAsync("", null); - verify(restClient).getAsync("_cluster/settings", null); - verifyNoMoreInteractions(restClient); - } - - @Test - void testGetClusterVersion_OS_CompatibilityModeEnabled() { - when(connectionContext.isAwsSpecificAuthentication()).thenReturn(true); - setupOkResponse(restClient, "", ROOT_RESPONSE_ES_7_10_2); - setupOkResponse(restClient, "_cluster/settings", CLUSTER_SETTINGS_COMPATIBILITY_OVERRIDE_ENABLED); - setupOkResponse(restClient, "_nodes/_all/nodes,version?format=json", NODES_RESPONSE_OS_2_13_0); - - var version = openSearchClient.getClusterVersion(); - - assertThat(version, equalTo(Version.fromString("AOS 2.13.0"))); - verify(restClient).getAsync("", null); - verify(restClient).getAsync("_cluster/settings", null); - verify(restClient).getAsync("_nodes/_all/nodes,version?format=json", null); - } - - @Test - void testGetClusterVersion_OS_CompatibilityModeDisableEnabled() { - setupOkResponse(restClient, "", ROOT_RESPONSE_OS_1_0_0); - setupOkResponse(restClient, "_cluster/settings", CLUSTER_SETTINGS_COMPATIBILITY_OVERRIDE_DISABLED); - - var version = openSearchClient.getClusterVersion(); - - assertThat(version, equalTo(Version.fromString("OS 1.0.0"))); - verify(restClient).getConnectionContext(); - verify(restClient).getAsync("", null); - verifyNoMoreInteractions(restClient); - } - - @Test - void testGetClusterVersion_OS_CompatibilityModeFailure_UseFallback() { - setupOkResponse(restClient, "", ROOT_RESPONSE_ES_7_10_2); - - var versionResponse = new HttpResponse(403, "Forbidden", Map.of(), ""); - when(restClient.getAsync("_cluster/settings", null)).thenReturn(Mono.just(versionResponse)); - - var version = openSearchClient.getClusterVersion(); - - assertThat(version, equalTo(Version.fromString("ES 7.10.2"))); - verify(restClient).getAsync("", null); - verify(restClient).getAsync("_cluster/settings", null); - verifyNoMoreInteractions(restClient); - } - private void setupOkResponse(RestClient restClient, String url, String body) { var versionResponse = new HttpResponse(200, "OK", Map.of(), body); when(restClient.getAsync(url, null)).thenReturn(Mono.just(versionResponse)); } - @Test - void testGetClusterVersion_OS_Serverless() { - var versionResponse = new HttpResponse(404, "Not Found", Map.of(), ""); - when(restClient.getAsync("", null)).thenReturn(Mono.just(versionResponse)); - - var version = openSearchClient.getClusterVersion(); - - assertThat(version, equalTo(Version.fromString("AOSS 2.x.x"))); - verify(restClient, times(1)).getAsync("", null); - verifyNoMoreInteractions(restClient); - } - @Test void testBulkRequest_succeedAfterRetries() { var docId1 = "tt1979320"; @@ -399,29 +305,4 @@ void testNonBulkRequest_doesNotAddGzipHeaders() { verify(restClient).putAsync(any(), any(), any()); verifyNoMoreInteractions(restClient); } - - @Test - void testCheckCompatibilityModeFromResponse() { - Function createCompatibilitySection = (Boolean value) -> - OBJECT_MAPPER.createObjectNode() - .set("compatibility", OBJECT_MAPPER.createObjectNode() - .put("override_main_response_version", value)); - - BiFunction createSettingsResponse = (Boolean persistentVal, Boolean transientVal) -> { - var body = OBJECT_MAPPER.createObjectNode() - .set("persistent", createCompatibilitySection.apply(persistentVal)) - .set("transient", createCompatibilitySection.apply(transientVal)) - .toPrettyString(); - return new HttpResponse(200, "OK", null, body); - }; - - var bothTrue = openSearchClient.checkCompatibilityModeFromResponse(createSettingsResponse.apply(true, true)); - assertThat(bothTrue.block(), equalTo(true)); - var persistentTrue = openSearchClient.checkCompatibilityModeFromResponse(createSettingsResponse.apply(true, false)); - assertThat(persistentTrue.block(), equalTo(true)); - var transientTrue = openSearchClient.checkCompatibilityModeFromResponse(createSettingsResponse.apply(false, true)); - assertThat(transientTrue.block(), equalTo(true)); - var neitherTrue = openSearchClient.checkCompatibilityModeFromResponse(createSettingsResponse.apply(false, false)); - assertThat(neitherTrue.block(), equalTo(false)); - } } diff --git a/RFS/src/test/java/org/opensearch/migrations/bulkload/framework/SimpleRestoreFromSnapshot.java b/RFS/src/test/java/org/opensearch/migrations/bulkload/framework/SimpleRestoreFromSnapshot.java index 2d3867465..ceb71b67a 100644 --- a/RFS/src/test/java/org/opensearch/migrations/bulkload/framework/SimpleRestoreFromSnapshot.java +++ b/RFS/src/test/java/org/opensearch/migrations/bulkload/framework/SimpleRestoreFromSnapshot.java @@ -4,6 +4,7 @@ import java.util.List; import org.opensearch.migrations.bulkload.common.OpenSearchClient; +import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory; import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams; import org.opensearch.migrations.bulkload.models.IndexMetadata; import org.opensearch.migrations.reindexer.tracing.IDocumentMigrationContexts; @@ -28,10 +29,11 @@ public default void fullMigrationViaLocalSnapshot( tempSnapshotName, unpackedShardDataDir ); - final var targetClusterClient = new OpenSearchClient(ConnectionContextTestParams.builder() - .host(targetClusterUrl) - .build() - .toConnectionContext()); + var clientFactory = new OpenSearchClientFactory(ConnectionContextTestParams.builder() + .host(targetClusterUrl) + .build() + .toConnectionContext()); + final var targetClusterClient =clientFactory.determineVersionAndCreate(); // TODO: This should update the following metdata: // - Global cluster state diff --git a/RFS/src/test/java/org/opensearch/migrations/bulkload/version_es_6_8/OpenSearchClient_ES_6_8_Test.java b/RFS/src/test/java/org/opensearch/migrations/bulkload/version_es_6_8/OpenSearchClient_ES_6_8_Test.java new file mode 100644 index 000000000..72016ec83 --- /dev/null +++ b/RFS/src/test/java/org/opensearch/migrations/bulkload/version_es_6_8/OpenSearchClient_ES_6_8_Test.java @@ -0,0 +1,118 @@ +package org.opensearch.migrations.bulkload.version_es_6_8; + +import java.util.List; + +import org.opensearch.migrations.Version; +import org.opensearch.migrations.bulkload.common.BulkDocSection; +import org.opensearch.migrations.bulkload.common.OpenSearchClient; +import org.opensearch.migrations.bulkload.common.RestClient; +import org.opensearch.migrations.bulkload.common.http.ConnectionContext; +import org.opensearch.migrations.bulkload.common.http.HttpResponse; +import org.opensearch.migrations.bulkload.http.BulkRequestGenerator; +import org.opensearch.migrations.bulkload.http.BulkRequestGenerator.BulkItemResponseEntry; +import org.opensearch.migrations.bulkload.tracing.IRfsContexts; +import org.opensearch.migrations.bulkload.tracing.IRfsContexts.ICheckedIdempotentPutRequestContext; +import org.opensearch.migrations.reindexer.FailedRequestsLogger; + +import com.fasterxml.jackson.core.StreamReadFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mock.Strictness; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import reactor.core.publisher.Mono; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.withSettings; +import static org.opensearch.migrations.bulkload.http.BulkRequestGenerator.itemEntry; + +@ExtendWith(MockitoExtension.class) +class OpenSearchClient_ES_6_8_Test { + private static final ObjectMapper OBJECT_MAPPER = JsonMapper.builder() + .enable(StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION) + .build(); + + @Mock(strictness = Strictness.LENIENT) + RestClient restClient; + + @Mock + ConnectionContext connectionContext; + + @Mock + FailedRequestsLogger failedRequestLogger; + + OpenSearchClient openSearchClient; + + @BeforeEach + void beforeTest() { + openSearchClient = spy(new OpenSearchClient_ES_6_8(restClient, failedRequestLogger, Version.fromString("ES_6_8"))); + } + + @SneakyThrows + @Test + void testCreateIndexWithIncludeTypeNameFalse() { + // Setup + var checkIfExistsResponse = new HttpResponse(404, "", null, "does not exist"); + var createdItemRawJson = "{\"created\":\"yup!\"}"; + var createItemResponse = new HttpResponse(200, "", null, createdItemRawJson); + + when(restClient.getAsync(any(), any())).thenReturn(Mono.just(checkIfExistsResponse)); + when(restClient.putAsync(any(), any(), any())).thenReturn(Mono.just(createItemResponse)); + + // Action + var rawJson = "{ }"; + var body = (ObjectNode) OBJECT_MAPPER.readTree(rawJson); + var result = openSearchClient.createIndex("indexName", body, mock(ICheckedIdempotentPutRequestContext.class)); + + // Assertions + assertThat(result.get().toPrettyString(), containsString(rawJson)); + // The interface is to send back the passed json if on success + assertThat(result.get().toPrettyString(), not(containsString(createdItemRawJson))); + + Mockito.verify(restClient).putAsync("indexName?include_type_name=false", "{}", null); + } + + @Test + void testBulkRequestHasType_doc() { + var docId1 = "tt1979320"; + var docId2 = "tt0816711"; + + var successResponse = bulkItemResponse(false, List.of(itemEntry(docId1), itemEntry(docId2))); + var bulkDocs = List.of(createBulkDoc(docId1), createBulkDoc(docId2)); + when(restClient.postAsync(any(), any(), any(), any())).thenReturn(Mono.just(successResponse)); + when(restClient.supportsGzipCompression()).thenReturn(false); + + var result = openSearchClient.sendBulkRequest("indexName", bulkDocs, mock(IRfsContexts.IRequestContext.class)).block(); + + Mockito.verify(restClient).postAsync(eq("indexName/_doc/_bulk"), any(), any(), any()); + + verifyNoInteractions(failedRequestLogger); + } + + private HttpResponse bulkItemResponse(boolean hasErrors, List entries) { + var responseBody = BulkRequestGenerator.bulkItemResponse(hasErrors, entries); + return new HttpResponse(200, "", null, responseBody); + } + + private BulkDocSection createBulkDoc(String docId) { + var bulkDoc = mock(BulkDocSection.class, withSettings().strictness(org.mockito.quality.Strictness.LENIENT)); + when(bulkDoc.getDocId()).thenReturn(docId); + when(bulkDoc.asBulkIndexString()).thenReturn("BULK-INDEX\nBULK_BODY"); + return bulkDoc; + } + +}