Skip to content

Commit

Permalink
Add an OpensearchClient for ES 6.x (opensearch-project#1230)
Browse files Browse the repository at this point in the history
Create OpenSearchClient for 6.x

---------

Signed-off-by: Mikayla Thompson <thomika@amazon.com>
Co-authored-by: Peter Nied <peternied@hotmail.com>
  • Loading branch information
mikaylathompson and peternied authored Jan 14, 2025
1 parent de5ef3a commit 5ca767b
Show file tree
Hide file tree
Showing 22 changed files with 703 additions and 316 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import java.util.List;

import org.opensearch.migrations.bulkload.common.FileSystemSnapshotCreator;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory;
import org.opensearch.migrations.bulkload.common.S3SnapshotCreator;
import org.opensearch.migrations.bulkload.common.SnapshotCreator;
import org.opensearch.migrations.bulkload.common.http.ConnectionContext;
Expand Down Expand Up @@ -135,7 +135,8 @@ public static void main(String[] args) throws Exception {
private ICreateSnapshotContext context;

public void run() {
var client = new OpenSearchClient(arguments.sourceArgs.toConnectionContext());
var clientFactory = new OpenSearchClientFactory(arguments.sourceArgs.toConnectionContext());
var client = clientFactory.determineVersionAndCreate();
SnapshotCreator snapshotCreator;
if (arguments.fileSystemRepoPath != null) {
snapshotCreator = new FileSystemSnapshotCreator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;

import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory;
import org.opensearch.migrations.bulkload.common.S3SnapshotCreator;
import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams;
import org.opensearch.migrations.bulkload.worker.SnapshotRunner;
Expand All @@ -29,6 +30,21 @@

@Slf4j
public class TestCreateSnapshot {
private static final byte[] ROOT_RESPONSE_ES_7_10_2 = "{\"version\": {\"number\": \"7.10.2\"}}".getBytes(StandardCharsets.UTF_8);
private static final Map<String, String> ROOT_RESPONSE_HEADERS = Map.of(
"Content-Type",
"application/json",
"Content-Length",
"" + ROOT_RESPONSE_ES_7_10_2.length
);
private static final byte[] CLUSTER_SETTINGS_COMPATIBILITY_OVERRIDE_DISABLED = "{\"persistent\":{\"compatibility\":{\"override_main_response_version\":\"false\"}}}".getBytes(StandardCharsets.UTF_8);
private static final Map<String, String> CLUSTER_SETTINGS_COMPATIBILITY_HEADERS = Map.of(
"Content-Type",
"application/json",
"Content-Length",
"" + CLUSTER_SETTINGS_COMPATIBILITY_OVERRIDE_DISABLED.length
);

final SnapshotTestContext snapshotContext = SnapshotTestContext.factory().noOtelTracking();
final byte[] payloadBytes = "Success".getBytes(StandardCharsets.UTF_8);
final Map<String, String> headers = Map.of(
Expand All @@ -46,17 +62,24 @@ public void testRepoRegisterAndSnapshotCreateRequests() throws Exception {
try (var destinationServer = SimpleNettyHttpServer.makeNettyServer(false,
Duration.ofMinutes(10),
fl -> {
if (Objects.equals(fl.uri(), "/")) {
return new SimpleHttpResponse(ROOT_RESPONSE_HEADERS, ROOT_RESPONSE_ES_7_10_2, "OK", 200);
} else if (Objects.equals(fl.uri(), "/_cluster/settings")) {
return new SimpleHttpResponse(CLUSTER_SETTINGS_COMPATIBILITY_HEADERS, CLUSTER_SETTINGS_COMPATIBILITY_OVERRIDE_DISABLED,
"OK", 200);
}
capturedRequestList.add(new AbstractMap.SimpleEntry<>(fl, fl.content().toString(StandardCharsets.UTF_8)));
return new SimpleHttpResponse(headers, payloadBytes, "OK", 200);
}))
{
final var endpoint = destinationServer.localhostEndpoint().toString();

var sourceClient = new OpenSearchClient(ConnectionContextTestParams.builder()
var sourceClientFactory = new OpenSearchClientFactory(ConnectionContextTestParams.builder()
.host(endpoint)
.insecure(true)
.build()
.toConnectionContext());
var sourceClient = sourceClientFactory.determineVersionAndCreate();
var snapshotCreator = new S3SnapshotCreator(
snapshotName,
sourceClient,
Expand Down Expand Up @@ -111,7 +134,12 @@ public void testSnapshotCreateWithIndexAllowlist() throws Exception {
try (var destinationServer = SimpleNettyHttpServer.makeNettyServer(false,
Duration.ofMinutes(10),
fl -> {
if (fl.uri().equals("/_snapshot/migration_assistant_repo/" + snapshotName)) {
if (Objects.equals(fl.uri(), "/")) {
return new SimpleHttpResponse(ROOT_RESPONSE_HEADERS, ROOT_RESPONSE_ES_7_10_2, "OK", 200);
} else if (Objects.equals(fl.uri(), "/_cluster/settings")) {
return new SimpleHttpResponse(CLUSTER_SETTINGS_COMPATIBILITY_HEADERS, CLUSTER_SETTINGS_COMPATIBILITY_OVERRIDE_DISABLED,
"OK", 200);
} else if (fl.uri().equals("/_snapshot/migration_assistant_repo/" + snapshotName)) {
createSnapshotRequest.set(fl);
createSnapshotRequestContent.set(fl.content().toString(StandardCharsets.UTF_8));
}
Expand All @@ -120,11 +148,12 @@ public void testSnapshotCreateWithIndexAllowlist() throws Exception {
{
final var endpoint = destinationServer.localhostEndpoint().toString();

var sourceClient = new OpenSearchClient(ConnectionContextTestParams.builder()
var sourceClientFactory = new OpenSearchClientFactory(ConnectionContextTestParams.builder()
.host(endpoint)
.insecure(true)
.build()
.toConnectionContext());
var sourceClient = sourceClientFactory.determineVersionAndCreate();
var snapshotCreator = new S3SnapshotCreator(
snapshotName,
sourceClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import java.text.NumberFormat;

import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory;
import org.opensearch.migrations.data.WorkloadGenerator;
import org.opensearch.migrations.utils.ProcessHelpers;

Expand Down Expand Up @@ -35,7 +35,8 @@ public static void main(String[] args) {

public void run(DataGeneratorArgs arguments) {
var connectionContext = arguments.targetArgs.toConnectionContext();
var client = new OpenSearchClient(connectionContext);
var clientFactory = new OpenSearchClientFactory(connectionContext);
var client = clientFactory.determineVersionAndCreate();

var startTimeMillis = System.currentTimeMillis();
var workloadGenerator = new WorkloadGenerator(client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ void generateData_OS_2_14() throws Exception {
}
}

@Test
void generateData_ES_6_8() throws Exception {
try (var targetCluster = new SearchClusterContainer(SearchClusterContainer.ES_V6_8_23)) {
generateData(targetCluster);
}
}

@SneakyThrows
void generateData(final SearchClusterContainer targetCluster) {
// ACTION: Set up the target clusters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.migrations.bulkload.common.FileSystemRepo;
import org.opensearch.migrations.bulkload.common.LuceneDocumentsReader;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory;
import org.opensearch.migrations.bulkload.common.S3Repo;
import org.opensearch.migrations.bulkload.common.S3Uri;
import org.opensearch.migrations.bulkload.common.SnapshotShardUnpacker;
Expand Down Expand Up @@ -278,7 +279,8 @@ public static void main(String[] args) throws Exception {
workerId)
) {
MDC.put(LOGGING_MDC_WORKER_ID, workerId); // I don't see a need to clean this up since we're in main
OpenSearchClient targetClient = new OpenSearchClient(connectionContext);
var clientFactory = new OpenSearchClientFactory(connectionContext);
OpenSearchClient targetClient = clientFactory.determineVersionAndCreate();
DocumentReindexer reindexer = new DocumentReindexer(targetClient,
arguments.numDocsPerBulkRequest,
arguments.numBytesPerBulkRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import org.opensearch.migrations.bulkload.common.FileSystemRepo;
import org.opensearch.migrations.bulkload.common.FileSystemSnapshotCreator;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory;
import org.opensearch.migrations.bulkload.common.RestClient;
import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams;
import org.opensearch.migrations.bulkload.framework.SearchClusterContainer;
Expand Down Expand Up @@ -105,11 +105,12 @@ private void migrationDocumentsWithClusters(

// === ACTION: Take a snapshot ===
var snapshotName = "my_snap";
var sourceClient = new OpenSearchClient(ConnectionContextTestParams.builder()
.host(sourceCluster.getUrl())
.insecure(true)
.build()
.toConnectionContext());
var sourceClientFactory = new OpenSearchClientFactory(ConnectionContextTestParams.builder()
.host(sourceCluster.getUrl())
.insecure(true)
.build()
.toConnectionContext());
var sourceClient = sourceClientFactory.determineVersionAndCreate();
var snapshotCreator = new FileSystemSnapshotCreator(
snapshotName,
sourceClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

import org.opensearch.migrations.CreateSnapshot;
import org.opensearch.migrations.bulkload.common.FileSystemRepo;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory;
import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams;
import org.opensearch.migrations.bulkload.framework.SearchClusterContainer;
import org.opensearch.migrations.data.WorkloadGenerator;
Expand Down Expand Up @@ -72,11 +72,11 @@ public void testDocumentMigration(
).join();

// Populate the source cluster with data
var client = new OpenSearchClient(ConnectionContextTestParams.builder()
.host(esSourceContainer.getUrl())
.build()
.toConnectionContext()
);
var clientFactory = new OpenSearchClientFactory(ConnectionContextTestParams.builder()
.host(esSourceContainer.getUrl())
.build()
.toConnectionContext());
var client = clientFactory.determineVersionAndCreate();
var generator = new WorkloadGenerator(client);
generator.generate(new WorkloadOptions());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import java.util.function.Function;

import org.opensearch.migrations.CreateSnapshot;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory;
import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams;
import org.opensearch.migrations.bulkload.framework.SearchClusterContainer;
import org.opensearch.migrations.data.WorkloadGenerator;
Expand Down Expand Up @@ -124,11 +124,12 @@ private void testProcess(int expectedExitCode, Function<RunData, Integer> proces
).join();

// Populate the source cluster with data
var client = new OpenSearchClient(ConnectionContextTestParams.builder()
.host(esSourceContainer.getUrl())
.build()
.toConnectionContext()
var clientFactory = new OpenSearchClientFactory(ConnectionContextTestParams.builder()
.host(esSourceContainer.getUrl())
.build()
.toConnectionContext()
);
var client = clientFactory.determineVersionAndCreate();
var generator = new WorkloadGenerator(client);
generator.generate(new WorkloadOptions());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.opensearch.migrations.bulkload.common.DocumentReindexer;
import org.opensearch.migrations.bulkload.common.FileSystemRepo;
import org.opensearch.migrations.bulkload.common.LuceneDocumentsReader;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory;
import org.opensearch.migrations.bulkload.common.RestClient;
import org.opensearch.migrations.bulkload.common.RfsLuceneDocument;
import org.opensearch.migrations.bulkload.common.SnapshotShardUnpacker;
Expand Down Expand Up @@ -274,13 +274,14 @@ public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(
UUID.randomUUID().toString(),
Clock.offset(Clock.systemUTC(), Duration.ofMillis(nextClockShift))
)) {
return RfsMigrateDocuments.run(
readerFactory,
new DocumentReindexer(new OpenSearchClient(ConnectionContextTestParams.builder()
var clientFactory = new OpenSearchClientFactory(ConnectionContextTestParams.builder()
.host(targetAddress)
.compressionEnabled(compressionEnabled)
.build()
.toConnectionContext()), 1000, Long.MAX_VALUE, 1, defaultDocTransformer),
.toConnectionContext());
return RfsMigrateDocuments.run(
readerFactory,
new DocumentReindexer(clientFactory.determineVersionAndCreate(), 1000, Long.MAX_VALUE, 1, defaultDocTransformer),
new OpenSearchWorkCoordinator(
new CoordinateWorkHttpClient(ConnectionContextTestParams.builder()
.host(targetAddress)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import org.opensearch.migrations.bulkload.common.FileSystemSnapshotCreator;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.OpenSearchClientFactory;
import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams;
import org.opensearch.migrations.bulkload.framework.SearchClusterContainer;
import org.opensearch.migrations.bulkload.http.ClusterOperations;
Expand Down Expand Up @@ -58,11 +59,12 @@ protected void startClusters() {
@SneakyThrows
protected String createSnapshot(String snapshotName) {
var snapshotContext = SnapshotTestContext.factory().noOtelTracking();
var sourceClient = new OpenSearchClient(ConnectionContextTestParams.builder()
var clientFactory = new OpenSearchClientFactory(ConnectionContextTestParams.builder()
.host(sourceCluster.getUrl())
.insecure(true)
.build()
.toConnectionContext());
var sourceClient = clientFactory.determineVersionAndCreate();
var snapshotCreator = new org.opensearch.migrations.bulkload.common.FileSystemSnapshotCreator(
snapshotName,
sourceClient,
Expand Down Expand Up @@ -115,11 +117,12 @@ protected MigrationItemResult executeMigration(MigrateOrEvaluateArgs arguments,
* @return An OpenSearch client.
*/
protected OpenSearchClient createClient(SearchClusterContainer cluster) {
return new OpenSearchClient(ConnectionContextTestParams.builder()
var clientFactory = new OpenSearchClientFactory(ConnectionContextTestParams.builder()
.host(cluster.getUrl())
.insecure(true)
.build()
.toConnectionContext());
return clientFactory.determineVersionAndCreate();
}

protected SnapshotTestContext createSnapshotContext() {
Expand Down
Loading

0 comments on commit 5ca767b

Please sign in to comment.