Skip to content

Commit

Permalink
Introduce ssl settings to reindex from remote (#38292)
Browse files Browse the repository at this point in the history
Adds reindex.ssl.* settings for reindex from remote.

This uses the ssl-config/ internal library to parse and load SSL
configuration and files. This is applied when using the low level
rest client to connect to a remote ES node

Backport of: #37527
Relates: #37287
Resolves: #29755
  • Loading branch information
tvernum authored Feb 4, 2019
1 parent 502c3d8 commit ad38b09
Show file tree
Hide file tree
Showing 25 changed files with 646 additions and 38 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ allprojects {
}

/* Sets up the dependencies that we build as part of this project but
register as thought they were external to resolve internally. We register
register as though they were external to resolve internally. We register
them as external dependencies so the build plugin that we use can be used
to build elasticsearch plugins outside of the elasticsearch source tree. */
ext.projectSubstitutions = [
Expand All @@ -217,6 +217,7 @@ allprojects {
"org.elasticsearch:elasticsearch-core:${version}": ':libs:core',
"org.elasticsearch:elasticsearch-x-content:${version}": ':libs:x-content',
"org.elasticsearch:elasticsearch-secure-sm:${version}": ':libs:secure-sm',
"org.elasticsearch:elasticsearch-ssl-config:${version}": ':libs:elasticsearch-ssl-config',
"org.elasticsearch.client:elasticsearch-rest-client:${version}": ':client:rest',
"org.elasticsearch.client:elasticsearch-rest-client-sniffer:${version}": ':client:sniffer',
"org.elasticsearch.client:elasticsearch-rest-high-level-client:${version}": ':client:rest-high-level',
Expand Down
6 changes: 6 additions & 0 deletions modules/reindex/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ unitTest {

dependencies {
compile "org.elasticsearch.client:elasticsearch-rest-client:${version}"
compile "org.elasticsearch:elasticsearch-ssl-config:${version}"
// for http - testing reindex from remote
testCompile project(path: ':modules:transport-netty4', configuration: 'runtime')
// for parent/child testing
Expand All @@ -71,6 +72,11 @@ thirdPartyAudit.ignoreMissingClasses (
'org.apache.log.Logger',
)

forbiddenPatterns {
// PKCS#12 file are not UTF-8
exclude '**/*.p12'
}

// Support for testing reindex-from-remote against old Elaticsearch versions
configurations {
oldesFixture
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.action.bulk.Retry;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -83,13 +84,15 @@
* Abstract base for scrolling across a search and executing bulk actions on all results. All package private methods are package private so
* their tests can use them. Most methods run in the listener thread pool because the are meant to be fast and don't expect to block.
*/
public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBulkByScrollRequest<Request>> {
public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBulkByScrollRequest<Request>,
Action extends TransportAction<Request, ?>> {

protected final Logger logger;
protected final BulkByScrollTask task;
protected final WorkerBulkByScrollTaskState worker;
protected final ThreadPool threadPool;
protected final ScriptService scriptService;

protected final Action mainAction;
/**
* The request for this action. Named mainRequest because we create lots of <code>request</code> variables all representing child
* requests of this mainRequest.
Expand All @@ -113,7 +116,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu

public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, boolean needsSourceDocumentVersions,
boolean needsSourceDocumentSeqNoAndPrimaryTerm, Logger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, Request mainRequest, ScriptService scriptService,
ThreadPool threadPool, Action mainAction, Request mainRequest,
ActionListener<BulkByScrollResponse> listener) {

this.task = task;
Expand All @@ -125,7 +128,7 @@ public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, boolean needsSourc
this.logger = logger;
this.client = client;
this.threadPool = threadPool;
this.scriptService = scriptService;
this.mainAction = mainAction;
this.mainRequest = mainRequest;
this.listener = listener;
BackoffPolicy backoffPolicy = buildBackoffPolicy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,22 @@
/**
* Implementation of delete-by-query using scrolling and bulk.
*/
public class AsyncDeleteByQueryAction extends AbstractAsyncBulkByScrollAction<DeleteByQueryRequest> {
public class AsyncDeleteByQueryAction extends AbstractAsyncBulkByScrollAction<DeleteByQueryRequest, TransportDeleteByQueryAction> {

private final boolean useSeqNoForCAS;

public AsyncDeleteByQueryAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, DeleteByQueryRequest request, ScriptService scriptService,
ClusterState clusterState, ActionListener<BulkByScrollResponse> listener) {
ThreadPool threadPool, TransportDeleteByQueryAction action, DeleteByQueryRequest request,
ScriptService scriptService, ClusterState clusterState, ActionListener<BulkByScrollResponse> listener) {
super(task,
// not all nodes support sequence number powered optimistic concurrency control, we fall back to version
clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0) == false,
// all nodes support sequence number powered optimistic concurrency control and we can use it
clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0),
logger, client, threadPool, request, scriptService, listener);
logger, client, threadPool, action, request, listener);
useSeqNoForCAS = clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0);
}


@Override
protected boolean accept(ScrollableHitSource.Hit doc) {
// Delete-by-query does not require the source to delete a document
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,32 @@

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;

Expand Down Expand Up @@ -69,8 +80,19 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
new RestRethrottleAction(settings, restController, nodesInCluster));
}

@Override
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
NamedXContentRegistry xContentRegistry, Environment environment,
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
return Collections.singletonList(new ReindexSslConfig(environment.settings(), environment, resourceWatcherService));
}

@Override
public List<Setting<?>> getSettings() {
return singletonList(TransportReindexAction.REMOTE_CLUSTER_WHITELIST);
final List<Setting<?>> settings = new ArrayList<>();
settings.add(TransportReindexAction.REMOTE_CLUSTER_WHITELIST);
settings.addAll(ReindexSslConfig.getSettings());
return settings;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.reindex;

import org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.SecureSetting;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.ssl.SslConfiguration;
import org.elasticsearch.common.ssl.SslConfigurationKeys;
import org.elasticsearch.common.ssl.SslConfigurationLoader;
import org.elasticsearch.env.Environment;
import org.elasticsearch.watcher.FileChangesListener;
import org.elasticsearch.watcher.FileWatcher;
import org.elasticsearch.watcher.ResourceWatcherService;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import static org.elasticsearch.common.settings.Setting.listSetting;
import static org.elasticsearch.common.settings.Setting.simpleString;

/**
* Loads "reindex.ssl.*" configuration from Settings, and makes the applicable configuration (trust manager / key manager / hostname
* verification / cipher-suites) available for reindex-from-remote.
*/
class ReindexSslConfig {

private static final Map<String, Setting<?>> SETTINGS = new HashMap<>();
private static final Map<String, Setting<SecureString>> SECURE_SETTINGS = new HashMap<>();

static {
Setting.Property[] defaultProperties = new Setting.Property[] { Setting.Property.NodeScope, Setting.Property.Filtered };
Setting.Property[] deprecatedProperties = new Setting.Property[] { Setting.Property.Deprecated, Setting.Property.NodeScope,
Setting.Property.Filtered };
for (String key : SslConfigurationKeys.getStringKeys()) {
String settingName = "reindex.ssl." + key;
final Setting.Property[] properties = SslConfigurationKeys.isDeprecated(key) ? deprecatedProperties : defaultProperties;
SETTINGS.put(settingName, simpleString(settingName, properties));
}
for (String key : SslConfigurationKeys.getListKeys()) {
String settingName = "reindex.ssl." + key;
final Setting.Property[] properties = SslConfigurationKeys.isDeprecated(key) ? deprecatedProperties : defaultProperties;
SETTINGS.put(settingName, listSetting(settingName, Collections.emptyList(), Function.identity(), properties));
}
for (String key : SslConfigurationKeys.getSecureStringKeys()) {
String settingName = "reindex.ssl." + key;
SECURE_SETTINGS.put(settingName, SecureSetting.secureString(settingName, null));
}
}

private final SslConfiguration configuration;
private volatile SSLContext context;

public static List<Setting<?>> getSettings() {
List<Setting<?>> settings = new ArrayList<>();
settings.addAll(SETTINGS.values());
settings.addAll(SECURE_SETTINGS.values());
return settings;
}

ReindexSslConfig(Settings settings, Environment environment, ResourceWatcherService resourceWatcher) {
final SslConfigurationLoader loader = new SslConfigurationLoader("reindex.ssl.") {

@Override
protected String getSettingAsString(String key) {
return settings.get(key);
}

@Override
protected char[] getSecureSetting(String key) {
final Setting<SecureString> setting = SECURE_SETTINGS.get(key);
if (setting == null) {
throw new IllegalArgumentException("The secure setting [" + key + "] is not registered");
}
return setting.get(settings).getChars();
}

@Override
protected List<String> getSettingAsList(String key) throws Exception {
return settings.getAsList(key);
}
};
configuration = loader.load(environment.configFile());
reload();

final FileChangesListener listener = new FileChangesListener() {
@Override
public void onFileCreated(Path file) {
onFileChanged(file);
}

@Override
public void onFileDeleted(Path file) {
onFileChanged(file);
}

@Override
public void onFileChanged(Path file) {
ReindexSslConfig.this.reload();
}
};
for (Path file : configuration.getDependentFiles()) {
try {
final FileWatcher watcher = new FileWatcher(file);
watcher.addListener(listener);
resourceWatcher.add(watcher, ResourceWatcherService.Frequency.HIGH);
} catch (IOException e) {
throw new UncheckedIOException("cannot watch file [" + file + "]", e);
}
}
}

private void reload() {
this.context = configuration.createSslContext();
}

/**
* Encapsulate the loaded SSL configuration as a HTTP-client {@link SSLIOSessionStrategy}.
* The returned strategy is immutable, but successive calls will return different objects that may have different
* configurations if the underlying key/certificate files are modified.
*/
SSLIOSessionStrategy getStrategy() {
final HostnameVerifier hostnameVerifier = configuration.getVerificationMode().isHostnameVerificationEnabled()
? new DefaultHostnameVerifier()
: new NoopHostnameVerifier();
final String[] protocols = configuration.getSupportedProtocols().toArray(Strings.EMPTY_ARRAY);
final String[] cipherSuites = configuration.getCipherSuites().toArray(Strings.EMPTY_ARRAY);
return new SSLIOSessionStrategy(context, protocols, cipherSuites, hostnameVerifier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void doExecute(Task task, DeleteByQueryRequest request, ActionListener<Bu
ClusterState state = clusterService.state();
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(),
bulkByScrollTask);
new AsyncDeleteByQueryAction(bulkByScrollTask, logger, assigningClient, threadPool, request, scriptService, state,
new AsyncDeleteByQueryAction(bulkByScrollTask, logger, assigningClient, threadPool, this, request, scriptService, state,
listener).start();
}
);
Expand Down
Loading

0 comments on commit ad38b09

Please sign in to comment.