From 605057c658c5b0d0cdb2e27ec22d5b159fcf263b Mon Sep 17 00:00:00 2001 From: Chris Helma <25470211+chelma@users.noreply.github.com> Date: Thu, 4 Apr 2024 15:46:50 -0500 Subject: [PATCH] RFS can now take the source snapshot (#551) * RFS can now take the source snapshot Signed-off-by: Chris Helma * Minor tweaks per PR self-review Signed-off-by: Chris Helma * Updates per PR Comments Signed-off-by: Chris Helma --------- Signed-off-by: Chris Helma --- .gitignore | 12 +- RFS/README.md | 204 ++++++++++++++++-- RFS/build.gradle | 10 +- RFS/docker/TestSource_ES_7_10/Dockerfile | 20 ++ .../TestSource_ES_7_10/container-start.sh | 13 ++ .../java/com/rfs/ReindexFromSnapshot.java | 58 ++++- .../com/rfs/common/ConnectionDetails.java | 16 ++ .../main/java/com/rfs/common/RestClient.java | 46 ++-- .../com/rfs/common/S3SnapshotCreator.java | 156 ++++++++++++++ .../GlobalMetadataCreator_OS_2_11.java | 6 +- .../version_os_2_11/IndexCreator_OS_2_11.java | 6 +- 11 files changed, 497 insertions(+), 50 deletions(-) create mode 100644 RFS/docker/TestSource_ES_7_10/Dockerfile create mode 100755 RFS/docker/TestSource_ES_7_10/container-start.sh create mode 100644 RFS/src/main/java/com/rfs/common/S3SnapshotCreator.java diff --git a/.gitignore b/.gitignore index 65ae7c754..2389bcea0 100644 --- a/.gitignore +++ b/.gitignore @@ -7,7 +7,17 @@ __pycache__ *.egg-info* .python-version logs -test/opensearch-cluster-cdk/ +.vscode/ + +# Build files +plugins/opensearch/loggable-transport-netty4/.gradle/ + RFS/.gradle/ +RFS/bin/ RFS/build/ + +TrafficCapture/**/bin/ + +# CDK files from end-to-end testing opensearch-cluster-cdk/ +test/opensearch-cluster-cdk/ diff --git a/RFS/README.md b/RFS/README.md index e295571dd..726ad8a2e 100644 --- a/RFS/README.md +++ b/RFS/README.md @@ -2,7 +2,65 @@ ## How to run -### Set up your ES 6.8 Source Cluster +RFS provides a number of different options for running it. We'll look at some of them below. + +### Using a local snapshot directory + +In this scenario, you have a local directory on disk containing the snapshot you want to migrate. You'll supply the `--snapshot-dir` arg, but not the ones related to a source cluster (`--source-host`, `--source-username`, `--source-password`) or S3 (`--s3-local-dir`, `--s3-repo-uri`, `--s3-region`) + +``` +TARGET_HOST= +TARGET_USERNAME= +TARGET_PASSWORD= + +gradle build + +gradle run --args='-n global_state_snapshot --snapshot-dir ./test-resources/snapshots/ES_6_8_Single -l /tmp/lucene_files --target-host $TARGET_HOST --target-username $TARGET_USERNAME --target-password $TARGET_PASSWORD -s es_6_8 -t os_2_11 --movement-type everything' +``` + +### Using an existing S3 snapshot + +In this scenario, you have an existing snapshot in S3 you want to migrate. You'll supply the S3-related args (`--s3-local-dir`, `--s3-repo-uri`, `--s3-region`), but not the `--snapshot-dir` one or the ones related to a source cluster (`--source-host`, `--source-username`, `--source-password`). + +``` +S3_REPO_URI= +S3_REGION=us-east-1 + +TARGET_HOST= +TARGET_USERNAME= +TARGET_PASSWORD= + +gradle build + +gradle run --args='-n global_state_snapshot --s3-local-dir /tmp/s3_files --s3-repo-uri $S3_REPO_URI --s3-region $S3_REGION -l /tmp/lucene_files --target-host $TARGET_HOST --target-username $TARGET_USERNAME --target-password $TARGET_PASSWORD -s es_6_8 -t os_2_11 --movement-type everything' +``` + +### Using an source cluster + +In this scenario, you have a source cluster, and don't yet have a snapshot. RFS will need to first make a snapshot of your source cluster, send it to S3, and then begin reindexing. In this scenario, you'll supply the source cluster-related args (`--source-host`, `--source-username`, `--source-password`) and the S3-related args (`--s3-local-dir`, `--s3-repo-uri`, `--s3-region`), but not the `--snapshot-dir` one. + +``` +SOURCE_HOST= +SOURCE_USERNAME= +SOURCE_PASSWORD= + +S3_REPO_URI= +S3_REGION=us-east-1 + +TARGET_HOST= +TARGET_USERNAME= +TARGET_PASSWORD= + +gradle build + +gradle run --args='-n global_state_snapshot --source-host $SOURCE_HOST --source-username $SOURCE_USERNAME --source-password $SOURCE_PASSWORD --s3-local-dir /tmp/s3_files --s3-repo-uri $S3_REPO_URI --s3-region $S3_REGION -l /tmp/lucene_files --target-host $TARGET_HOST --target-username $TARGET_USERNAME --target-password $TARGET_PASSWORD -s es_6_8 -t os_2_11 --movement-type everything' +``` + +### Handling auth + +RFS currently supports both basic auth (username/password) and no auth for both the source and target clusters. To use the no-auth approach, just neglect the username/password arguments. + +## How to set up an ES 6.8 Source Cluster w/ an attached debugger ``` git clone git@github.com:elastic/elasticsearch.git @@ -93,24 +151,142 @@ curl -X PUT "localhost:9200/_snapshot/fs_repository/global_state_snapshot?wait_f "ignore_unavailable": true, "include_global_state": true }' -``` +``` -### Set up your OS 2.11 Target Cluster +## How to set up an ES 7.10 Source Cluster running in Docker -I've only tested the scripts going from ES 6.8 to OS 2.11. For my test target, I just spun up an Amazon OpenSearch Service 2.11 cluster with a master user/password combo and otherwise default settings. +The `./docker` directory contains a Dockerfile for an ES 7.10 cluster, which you can use for testing. You can run it like so: -### Run the scripts +``` +(cd ./docker/TestSource_ES_7_10; docker build . -t es-w-s3) -I've been running them VS Code integration, but you should be able to do it using their dedicated gradle commands as well. That would look something like: +docker run -d \ +-p 9200:9200 \ +-e discovery.type=single-node \ +--name elastic-source \ +es-w-s3 \ +/bin/sh -c '/usr/local/bin/docker-entrypoint.sh eswrapper & wait -n' +curl http://localhost:9200 ``` -SNAPSHOT_DIR=/Users/chelma/workspace/ElasticSearch/elasticsearch/distribution/build/cluster/shared/repo -LUCENE_DIR=/tmp/lucene_files -HOSTNAME= -USERNAME= -PASSWORD= -gradle build +### Providing AWS permissions for S3 snapshot creation + +While the container will have the `repository-s3` plugin installed out-of-the-box, to use it you'll need to provide AWS Credentials. This plugin will either accept credential [from the Elasticsearch Keystore](https://www.elastic.co/guide/en/elasticsearch/plugins/7.10/repository-s3-client.html) or via the standard ENV variables (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, and `AWS_SESSION_TOKEN`). The issue w/ either approach in local testing is renewal of the timeboxed creds. One possible solution is to use an IAM User, but that is generally frowned upon. The approach we'll take here is to accept that the test cluster is temporary, so the creds can be as well. Therefore, we can make an AWS IAM Role in our AWS Account with the creds it needs, assume it locally to generate the credential triple, and pipe that into the container using ENV variables. + +Start by making an AWS IAM Role (e.g. `arn:aws:iam::XXXXXXXXXXXX:role/testing-es-source-cluster-s3-access`) with S3 Full Access permissions in your AWS Account. You can then get credentials with that identity good for up to one hour: + +``` +unset access_key && unset secret_key && unset session_token + +output=$(aws sts assume-role --role-arn "arn:aws:iam::XXXXXXXXXXXX:role/testing-es-source-cluster-s3-access" --role-session-name "ES-Source-Cluster") + +access_key=$(echo $output | jq -r .Credentials.AccessKeyId) +secret_key=$(echo $output | jq -r .Credentials.SecretAccessKey) +session_token=$(echo $output | jq -r .Credentials.SessionToken) +``` + +The one hour limit is annoying but workable, given the only thing it's needed for is creating the snapshot at the very start of the RFS process. This is primarily driven by the fact that IAM limits session durations to one hour when the role is assumed via another role (e.g. role chaining). If your original creds in the AWS keyring are from an IAM User, etc, then this might not be a restriction for you and you can have up to 12 hours with the assumed creds. Ideas on how to improve this would be greatly appreciated. + +Anyways, you can then launch the container with those temporary credentials like so, using the : + +``` +docker run -d \ +-p 9200:9200 \ +-e discovery.type=single-node \ +-e AWS_ACCESS_KEY_ID=$access_key \ +-e AWS_SECRET_ACCESS_KEY=$secret_key \ +-e AWS_SESSION_TOKEN=$session_token \ +-v ~/.aws:/root/.aws:ro \ +--name elastic-source \ +es-w-s3 +``` + +If you need to renew the creds, you can just kill the existing source container, renew the creds, and spin up a new container. + +``` +docker stop elastic-source; docker rm elastic-source +``` + +### Setting up the Cluster w/ some sample docs + +You can set up the cluster w/ some sample docs like so: + +``` +curl -X PUT "localhost:9200/_component_template/posts_template" -H "Content-Type: application/json" -d' +{ + "template": { + "mappings": { + "properties": { + "contents": { "type": "text" }, + "author": { "type": "keyword" }, + "tags": { "type": "keyword" } + } + } + } +}' + +curl -X PUT "localhost:9200/_index_template/posts_index_template" -H "Content-Type: application/json" -d' +{ + "index_patterns": ["posts_*"], + "template": { + "settings": { + "number_of_shards": 1, + "number_of_replicas": 1 + }, + "aliases": { + "current_posts": {} + } + }, + "composed_of": ["posts_template"] +}' + +curl -X PUT "localhost:9200/posts_2023_02_25" + +curl -X POST "localhost:9200/current_posts/_doc" -H "Content-Type: application/json" -d' +{ + "contents": "This is a sample blog post content.", + "author": "Author Name", + "tags": ["Elasticsearch", "Tutorial"] +}' + +curl -X PUT "localhost:9200/posts_2024_01_01" -H "Content-Type: application/json" -d' +{ + "aliases": { + "another_alias": { + "routing": "user123", + "filter": { + "term": { + "author": "Tobias Funke" + } + } + } + } +}' + +curl -X POST "localhost:9200/another_alias/_doc" -H "Content-Type: application/json" -d' +{ + "contents": "How Elasticsearch helped my patients", + "author": "Tobias Funke", + "tags": ["Elasticsearch", "Tutorial"] +}' + +curl -X POST "localhost:9200/another_alias/_doc" -H "Content-Type: application/json" -d' +{ + "contents": "My Time in the Blue Man Group", + "author": "Tobias Funke", + "tags": ["Lifestyle"] +}' + +curl -X POST "localhost:9200/another_alias/_doc" -H "Content-Type: application/json" -d' +{ + "contents": "On the Importance of Word Choice", + "author": "Tobias Funke", + "tags": ["Lifestyle"] +}' +``` + +## How to set up an OS 2.11 Target Cluster + +The only target cluster version this has been tested agains is OpenSearch 2.11. For my test target, I just spun up an Amazon OpenSearch Service 2.11 cluster with a master user/password combo and otherwise default settings. -gradle run --args='-n global_state_snapshot --snapshot-dir $SNAPSHOT_DIR -l $LUCENE_DIR -h $HOSTNAME -u $USERNAME -p $PASSWORD -s es_6_8 -t os_2_11 --movement-type everything' -``` \ No newline at end of file diff --git a/RFS/build.gradle b/RFS/build.gradle index 6f19a3e75..a59a964af 100644 --- a/RFS/build.gradle +++ b/RFS/build.gradle @@ -12,17 +12,17 @@ repositories { dependencies { implementation 'com.beust:jcommander:1.81' + implementation 'com.fasterxml.jackson.core:jackson-databind:2.16.2' + implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-smile:2.16.2' + implementation 'com.fasterxml.jackson.core:jackson-annotations:2.16.2' + implementation 'com.fasterxml.jackson.core:jackson-core:2.16.2' + implementation 'io.netty:netty-codec-http:4.1.108.Final' implementation 'org.apache.logging.log4j:log4j-api:2.23.1' implementation 'org.apache.logging.log4j:log4j-core:2.23.1' implementation 'org.apache.lucene:lucene-core:8.11.3' implementation 'org.apache.lucene:lucene-analyzers-common:8.11.3' implementation 'org.apache.lucene:lucene-backward-codecs:8.11.3' - implementation 'com.fasterxml.jackson.core:jackson-databind:2.16.2' - implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-smile:2.16.2' - implementation 'com.fasterxml.jackson.core:jackson-annotations:2.16.2' - implementation 'com.fasterxml.jackson.core:jackson-core:2.16.2' implementation 'software.amazon.awssdk:s3:2.25.16' - implementation 'io.netty:netty-codec-http:4.1.108.Final' } application { diff --git a/RFS/docker/TestSource_ES_7_10/Dockerfile b/RFS/docker/TestSource_ES_7_10/Dockerfile new file mode 100644 index 000000000..6c758f997 --- /dev/null +++ b/RFS/docker/TestSource_ES_7_10/Dockerfile @@ -0,0 +1,20 @@ +FROM docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2 + +# Install the S3 Repo Plugin +RUN echo y | /usr/share/elasticsearch/bin/elasticsearch-plugin install repository-s3 + +# Install the AWS CLI for testing purposes +RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" && \ + unzip awscliv2.zip && \ + ./aws/install + +# Install our custom entrypoint script +COPY ./container-start.sh /usr/share/elasticsearch/container-start.sh + +# Configure Elastic +ENV ELASTIC_SEARCH_CONFIG_FILE=/usr/share/elasticsearch/config/elasticsearch.yml +# Prevents ES from complaining about nodes coun +RUN echo "discovery.type: single-node" >> $ELASTIC_SEARCH_CONFIG_FILE +ENV PATH=${PATH}:/usr/share/elasticsearch/jdk/bin/ + +CMD /usr/share/elasticsearch/container-start.sh \ No newline at end of file diff --git a/RFS/docker/TestSource_ES_7_10/container-start.sh b/RFS/docker/TestSource_ES_7_10/container-start.sh new file mode 100755 index 000000000..b2507c11c --- /dev/null +++ b/RFS/docker/TestSource_ES_7_10/container-start.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +echo "Setting AWS Creds from ENV Variables" +bin/elasticsearch-keystore create +echo $AWS_ACCESS_KEY_ID | bin/elasticsearch-keystore add s3.client.default.access_key --stdin +echo $AWS_SECRET_ACCESS_KEY | bin/elasticsearch-keystore add s3.client.default.secret_key --stdin + +if [ -n "$AWS_SESSION_TOKEN" ]; then + echo $AWS_SESSION_TOKEN | bin/elasticsearch-keystore add s3.client.default.session_token --stdin +fi + +echo "Starting Elasticsearch" +/usr/local/bin/docker-entrypoint.sh eswrapper \ No newline at end of file diff --git a/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java b/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java index c637ed025..4c456a84d 100644 --- a/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java +++ b/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java @@ -23,7 +23,7 @@ public class ReindexFromSnapshot { private static final Logger logger = LogManager.getLogger(ReindexFromSnapshot.class); public static class Args { - @Parameter(names = {"-n", "--snapshot-name"}, description = "The name of the snapshot to read", required = true) + @Parameter(names = {"-n", "--snapshot-name"}, description = "The name of the snapshot to migrate", required = true) public String snapshotName; @Parameter(names = {"--snapshot-dir"}, description = "The absolute path to the source snapshot directory on local disk", required = false) @@ -41,14 +41,23 @@ public static class Args { @Parameter(names = {"-l", "--lucene-dir"}, description = "The absolute path to the directory where we'll put the Lucene docs", required = true) public String luceneDirPath; - @Parameter(names = {"-h", "--target-host"}, description = "The target host and port (e.g. http://localhost:9200)", required = true) + @Parameter(names = {"--source-host"}, description = "The source host and port (e.g. http://localhost:9200)", required = false) + public String sourceHost = null; + + @Parameter(names = {"--source-username"}, description = "The source username; if not provided, will assume no auth on source", required = false) + public String sourceUser = null; + + @Parameter(names = {"--source-password"}, description = "The source password; if not provided, will assume no auth on source", required = false) + public String sourcePass = null; + + @Parameter(names = {"--target-host"}, description = "The target host and port (e.g. http://localhost:9200)", required = true) public String targetHost; - @Parameter(names = {"-u", "--target-username"}, description = "The target username", required = true) - public String targetUser; + @Parameter(names = {"--target-username"}, description = "The target username; if not provided, will assume no auth on target", required = false) + public String targetUser = null; - @Parameter(names = {"-p", "--target-password"}, description = "The target password", required = true) - public String targetPass; + @Parameter(names = {"--target-password"}, description = "The target password; if not provided, will assume no auth on target", required = false) + public String targetPass = null; @Parameter(names = {"-s", "--source-version"}, description = "The source cluster's version (e.g. 'es_6_8')", required = true, converter = ClusterVersion.ArgsConverter.class) public ClusterVersion sourceVersion; @@ -77,6 +86,9 @@ public static void main(String[] args) { String s3RepoUri = arguments.s3RepoUri; String s3Region = arguments.s3Region; Path luceneDirPath = Paths.get(arguments.luceneDirPath); + String sourceHost = arguments.sourceHost; + String sourceUser = arguments.sourceUser; + String sourcePass = arguments.sourcePass; String targetHost = arguments.targetHost; String targetUser = arguments.targetUser; String targetPass = arguments.targetPass; @@ -87,6 +99,7 @@ public static void main(String[] args) { Logging.setLevel(logLevel); + ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass); ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass); // Should probably be passed in as an arguments @@ -103,19 +116,50 @@ public static void main(String[] args) { throw new IllegalArgumentException("Unsupported target version: " + sourceVersion); } + /* + * You have three options for providing the snapshot data + * 1. A local snapshot directory + * 2. A source host we'll take the snapshot from + * 3. An S3 URI of an existing snapshot in S3 + * + * If you provide the source host, you still need to provide the S3 URI, etc to write the snapshot to. + */ + if (snapshotDirPath != null && (sourceHost != null || s3RepoUri != null)) { + throw new IllegalArgumentException("If you specify a local directory to take the snapshot from, you cannot specify a source host or S3 URI"); + } else if (sourceHost != null && (s3RepoUri == null || s3Region == null || s3LocalDirPath == null)) { + throw new IllegalArgumentException("If you specify a source host, you must also specify the S3 details as well"); + } + SourceRepo repo; if (snapshotDirPath != null) { repo = new FilesystemRepo(snapshotDirPath); } else if (s3RepoUri != null && s3Region != null && s3LocalDirPath != null) { repo = new S3Repo(s3LocalDirPath, s3RepoUri, s3Region); } else { - throw new IllegalArgumentException("You must specify either a local snapshot directory or an S3 URI"); + throw new IllegalArgumentException("Could not construct a source repo from the available, user-supplied arguments"); } // Set the transformer Transformer transformer = TransformFunctions.getTransformer(sourceVersion, targetVersion, awarenessAttributeDimensionality); try { + + if (sourceHost != null) { + // ========================================================================================================== + // Create the snapshot if necessary + // ========================================================================================================== + logger.info("=================================================================="); + logger.info("Attempting to create the snapshot..."); + S3SnapshotCreator snapshotCreator = new S3SnapshotCreator(snapshotName, sourceConnection, s3RepoUri, s3Region); + snapshotCreator.registerRepo(); + snapshotCreator.createSnapshot(); + while (!snapshotCreator.isSnapshotFinished()) { + logger.info("Snapshot not finished yet; sleeping for 5 seconds..."); + Thread.sleep(5000); + } + logger.info("Snapshot created successfully"); + } + // ========================================================================================================== // Read the Repo data file // ========================================================================================================== diff --git a/RFS/src/main/java/com/rfs/common/ConnectionDetails.java b/RFS/src/main/java/com/rfs/common/ConnectionDetails.java index 5a62c71d1..c9c8b1b44 100644 --- a/RFS/src/main/java/com/rfs/common/ConnectionDetails.java +++ b/RFS/src/main/java/com/rfs/common/ConnectionDetails.java @@ -4,12 +4,28 @@ * Stores the connection details (assuming basic auth) for an Elasticsearch/OpenSearch cluster */ public class ConnectionDetails { + public static enum AuthType { + BASIC, + NONE + } + public final String host; public final String username; public final String password; + public final AuthType authType; public ConnectionDetails(String host, String username, String password) { this.host = host; // http://localhost:9200 + + // If the username is provided, the password must be as well, and vice versa + if ((username == null && password != null) || (username != null && password == null)) { + throw new IllegalArgumentException("Both username and password must be provided, or neither"); + } else if (username != null){ + this.authType = AuthType.BASIC; + } else { + this.authType = AuthType.NONE; + } + this.username = username; this.password = password; } diff --git a/RFS/src/main/java/com/rfs/common/RestClient.java b/RFS/src/main/java/com/rfs/common/RestClient.java index 3a5fd5e43..0a4ad1849 100644 --- a/RFS/src/main/java/com/rfs/common/RestClient.java +++ b/RFS/src/main/java/com/rfs/common/RestClient.java @@ -14,28 +14,36 @@ public class RestClient { private static final Logger logger = LogManager.getLogger(RestClient.class); + + public class Response { + public final int code; + public final String body; + public final String message; + + public Response(int responseCode, String responseBody, String responseMessage) { + this.code = responseCode; + this.body = responseBody; + this.message = responseMessage; + } + } + public final ConnectionDetails connectionDetails; public RestClient(ConnectionDetails connectionDetails) { this.connectionDetails = connectionDetails; } - public int get(String path, boolean quietLogging) throws Exception { + public Response get(String path, boolean quietLogging) throws Exception { String urlString = connectionDetails.host + "/" + path; URL url = new URL(urlString); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); - // Construct the basic auth header value - String auth = connectionDetails.username + ":" + connectionDetails.password; - byte[] encodedAuth = Base64.getEncoder().encode(auth.getBytes(StandardCharsets.UTF_8)); - String authHeaderValue = "Basic " + new String(encodedAuth); - // Set the request method conn.setRequestMethod("GET"); - + // Set the necessary headers - conn.setRequestProperty("Authorization", authHeaderValue); + setAuthHeader(conn); // Enable input and output streams conn.setDoOutput(true); @@ -64,26 +72,21 @@ public int get(String path, boolean quietLogging) throws Exception { conn.disconnect(); - return responseCode; + return new Response(responseCode, responseBody, conn.getResponseMessage()); } - public int put(String path, String body, boolean quietLogging) throws Exception { + public Response put(String path, String body, boolean quietLogging) throws Exception { String urlString = connectionDetails.host + "/" + path; URL url = new URL(urlString); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); - // Construct the basic auth header value - String auth = connectionDetails.username + ":" + connectionDetails.password; - byte[] encodedAuth = Base64.getEncoder().encode(auth.getBytes(StandardCharsets.UTF_8)); - String authHeaderValue = "Basic " + new String(encodedAuth); - // Set the request method conn.setRequestMethod("PUT"); // Set the necessary headers conn.setRequestProperty("Content-Type", "application/json"); - conn.setRequestProperty("Authorization", authHeaderValue); + setAuthHeader(conn); // Enable input and output streams conn.setDoOutput(true); @@ -118,6 +121,15 @@ public int put(String path, String body, boolean quietLogging) throws Exception conn.disconnect(); - return responseCode; + return new Response(responseCode, responseBody, conn.getResponseMessage()); + } + + private void setAuthHeader(HttpURLConnection conn) { + if (connectionDetails.authType == ConnectionDetails.AuthType.BASIC) { + String auth = connectionDetails.username + ":" + connectionDetails.password; + byte[] encodedAuth = Base64.getEncoder().encode(auth.getBytes(StandardCharsets.UTF_8)); + String authHeaderValue = "Basic " + new String(encodedAuth); + conn.setRequestProperty("Authorization", authHeaderValue); + } } } diff --git a/RFS/src/main/java/com/rfs/common/S3SnapshotCreator.java b/RFS/src/main/java/com/rfs/common/S3SnapshotCreator.java new file mode 100644 index 000000000..09eb2b6e4 --- /dev/null +++ b/RFS/src/main/java/com/rfs/common/S3SnapshotCreator.java @@ -0,0 +1,156 @@ +package com.rfs.common; + +import java.net.HttpURLConnection; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +public class S3SnapshotCreator { + private static final Logger logger = LogManager.getLogger(S3SnapshotCreator.class); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ConnectionDetails connectionDetails; + private final String snapshotName; + private final String s3Uri; + private final String s3Region; + + public S3SnapshotCreator(String snapshotName, ConnectionDetails connectionDetails, String s3Uri, String s3Region) { + this.snapshotName = snapshotName; + this.connectionDetails = connectionDetails; + this.s3Uri = s3Uri; + this.s3Region = s3Region; + } + + public String getRepoName() { + return "migration_assistant_s3_repo"; + } + + public String getSnapshotName() { + return snapshotName; + } + + /* + * Extracts the bucket name from the S3 URI + * s3://my-bucket-name/my-folder/my-nested-folder => my-bucket-name + */ + public String getBucketName() { + return s3Uri.split("/")[2]; + } + + /* + * Extracts the base path from the S3 URI; could be nested arbitrarily deep + * s3://my-bucket-name/my-folder/my-nested-folder => my-folder/my-nested-folder + */ + public String getBasePath() { + int thirdSlashIndex = s3Uri.indexOf('/', 5); + if (thirdSlashIndex == -1) { + // Nothing after the bucket name + return null; + } + + // Extract everything after the third "/", excluding any final "/" + String rawBasePath = s3Uri.substring(thirdSlashIndex + 1); + if (rawBasePath.endsWith("/")) { + return rawBasePath.substring(0, rawBasePath.length() - 1); + } else { + return rawBasePath; + } + } + + public void registerRepo() throws Exception { + // Assemble the REST path + String targetName = "_snapshot/" + getRepoName(); + + // Assemble the request body + ObjectNode settings = mapper.createObjectNode(); + settings.put("bucket", getBucketName()); + settings.put("region", s3Region); + settings.put("base_path", getBasePath()); + + ObjectNode body = mapper.createObjectNode(); + body.put("type", "s3"); + body.set("settings", settings); + + // Register the repo; it's fine if it already exists + RestClient client = new RestClient(connectionDetails); + String bodyString = body.toString(); + RestClient.Response response = client.put(targetName, bodyString, false); + if (response.code == HttpURLConnection.HTTP_OK || response.code == HttpURLConnection.HTTP_CREATED) { + logger.info("S3 Repo registration successful"); + } else { + logger.error("S3 Repo registration failed"); + throw new RepoRegistrationFailed(getRepoName()); + } + } + + public void createSnapshot() throws Exception { + // Assemble the REST path + String targetName = "_snapshot/" + getRepoName() + "/" + getSnapshotName(); + + // Assemble the request body + ObjectNode body = mapper.createObjectNode(); + body.put("indices", "_all"); + body.put("ignore_unavailable", true); + body.put("include_global_state", true); + + // Register the repo; idempotent operation + RestClient client = new RestClient(connectionDetails); + String bodyString = body.toString(); + RestClient.Response response = client.put(targetName, bodyString, false); + if (response.code == HttpURLConnection.HTTP_OK || response.code == HttpURLConnection.HTTP_CREATED) { + logger.info("Snapshot " + getSnapshotName() + " creation successful"); + } else { + logger.error("Snapshot " + getSnapshotName() + " creation failed"); + throw new SnapshotCreationFailed(getSnapshotName()); + } + } + + public boolean isSnapshotFinished() throws Exception { + // Assemble the REST path + String targetName = "_snapshot/" + getRepoName() + "/" + getSnapshotName(); + + // Check if the snapshot has finished + RestClient client = new RestClient(connectionDetails); + RestClient.Response response = client.get(targetName, false); + if (response.code == HttpURLConnection.HTTP_NOT_FOUND) { + logger.error("Snapshot " + getSnapshotName() + " does not exist"); + throw new SnapshotDoesNotExist(getSnapshotName()); + } + JsonNode responseJson = mapper.readTree(response.body); + JsonNode firstSnapshot = responseJson.path("snapshots").get(0); + JsonNode stateNode = firstSnapshot.path("state"); + String state = stateNode.asText(); + + if (state.equals("SUCCESS")) { + return true; + } else if (state.equals("IN_PROGRESS")) { + return false; + } else { + logger.error("Snapshot " + getSnapshotName() + " has failed with state " + state); + throw new SnapshotCreationFailed(getSnapshotName()); + } + } + + public class RepoRegistrationFailed extends Exception { + public RepoRegistrationFailed(String repoName) { + super("Failed to register S3 repo " + repoName); + } + } + + public class SnapshotCreationFailed extends Exception { + public SnapshotCreationFailed(String snapshotName) { + super("Failed to create snapshot " + snapshotName); + } + } + + public class SnapshotDoesNotExist extends Exception { + public SnapshotDoesNotExist(String snapshotName) { + super("Snapshot " + snapshotName + " does not exist"); + } + } + +} diff --git a/RFS/src/main/java/com/rfs/version_os_2_11/GlobalMetadataCreator_OS_2_11.java b/RFS/src/main/java/com/rfs/version_os_2_11/GlobalMetadataCreator_OS_2_11.java index b1abe010d..16921eb4f 100644 --- a/RFS/src/main/java/com/rfs/version_os_2_11/GlobalMetadataCreator_OS_2_11.java +++ b/RFS/src/main/java/com/rfs/version_os_2_11/GlobalMetadataCreator_OS_2_11.java @@ -137,11 +137,11 @@ private static void createEntity(String entityName, ObjectNode settings, Connect // Confirm the index doesn't already exist, then create it RestClient client = new RestClient(connectionDetails); - int getResponseCode = client.get(path, true); - if (getResponseCode == HttpURLConnection.HTTP_NOT_FOUND) { + RestClient.Response response = client.get(path, true); + if (response.code == HttpURLConnection.HTTP_NOT_FOUND) { String bodyString = body.toString(); client.put(path, bodyString, false); - } else if (getResponseCode == HttpURLConnection.HTTP_OK) { + } else if (response.code == HttpURLConnection.HTTP_OK) { logger.warn(entityName + " already exists. Skipping creation."); } else { logger.warn("Could not confirm that " + entityName + " does not already exist. Skipping creation."); diff --git a/RFS/src/main/java/com/rfs/version_os_2_11/IndexCreator_OS_2_11.java b/RFS/src/main/java/com/rfs/version_os_2_11/IndexCreator_OS_2_11.java index 0701c9097..c3de7b5ef 100644 --- a/RFS/src/main/java/com/rfs/version_os_2_11/IndexCreator_OS_2_11.java +++ b/RFS/src/main/java/com/rfs/version_os_2_11/IndexCreator_OS_2_11.java @@ -32,11 +32,11 @@ public static void create(String targetName, IndexMetadata.Data indexMetadata, C // Confirm the index doesn't already exist, then create it RestClient client = new RestClient(connectionDetails); - int getResponseCode = client.get(targetName, true); - if (getResponseCode == HttpURLConnection.HTTP_NOT_FOUND) { + RestClient.Response response = client.get(targetName, true); + if (response.code == HttpURLConnection.HTTP_NOT_FOUND) { String bodyString = body.toString(); client.put(targetName, bodyString, false); - } else if (getResponseCode == HttpURLConnection.HTTP_OK) { + } else if (response.code == HttpURLConnection.HTTP_OK) { logger.warn("Index " + targetName + " already exists. Skipping creation."); } else { logger.warn("Could not confirm that index " + targetName + " does not already exist. Skipping creation.");