Skip to content

Commit

Permalink
Admission Controller Module Transport Interceptor Initial Commit
Browse files Browse the repository at this point in the history
Signed-off-by: Ajay Kumar Movva <movvaam@amazon.com>
  • Loading branch information
Ajay Kumar Movva committed Sep 4, 2023
1 parent d375e4c commit 25c16b6
Show file tree
Hide file tree
Showing 28 changed files with 1,692 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Introduce new dynamic cluster setting to control slice computation for concurrent segment search ([#9107](https://github.com/opensearch-project/OpenSearch/pull/9107))
- Implement on behalf of token passing for extensions ([#8679](https://github.com/opensearch-project/OpenSearch/pull/8679))
- Added encryption-sdk lib to provide encryption and decryption capabilities ([#8466](https://github.com/opensearch-project/OpenSearch/pull/8466))
- Admission Controller Module Transport Interceptor Initial Commit ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down
34 changes: 34 additions & 0 deletions modules/throttling/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

opensearchplugin {
description 'Plugin intercepting requests and throttle based on resource consumption'
classname 'org.opensearch.throttling.OpenSearchThrottlingModulePlugin'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.throttling;

import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.plugins.NetworkPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.throttling.admissioncontroller.AdmissionControlService;
import org.opensearch.throttling.admissioncontroller.transport.AdmissionControlTransportInterceptor;
import org.opensearch.transport.TransportInterceptor;

import java.util.ArrayList;
import java.util.List;

/**
* This plugin is used to register handlers to intercept both rest and transport requests.
*/
public class OpenSearchThrottlingModulePlugin extends Plugin implements NetworkPlugin {

AdmissionControlService admissionControlService;

/**
* Default Constructor for plugin
*/
public OpenSearchThrottlingModulePlugin() {}

/**
* Returns a list of {@link TransportInterceptor} instances that are used to intercept incoming and outgoing
* transport (inter-node) requests. This must not return <code>null</code>
*
* @param namedWriteableRegistry registry of all named writeables registered
* @param threadContext a {@link ThreadContext} of the current nodes or clients that can be used to set additional
* headers in the interceptors
* @return list of transport interceptors
*/
@Override
public List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry, ThreadContext threadContext) {
this.admissionControlService = AdmissionControlService.getInstance();
List<TransportInterceptor> interceptors = new ArrayList<>(0);
// TODO Will throw exception in next PR's. This needs to ensure the service is up before adding into transport interceptor.
if (this.admissionControlService != null) {
interceptors.add(new AdmissionControlTransportInterceptor(this.admissionControlService));
}
return interceptors;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Package contains classes related to throttling plugins
*/
package org.opensearch.throttling;
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.throttling;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.throttling.admissioncontroller.AdmissionControlService;
import org.opensearch.throttling.admissioncontroller.transport.AdmissionControlTransportInterceptor;
import org.opensearch.transport.TransportInterceptor;

import java.util.List;

import static org.mockito.Mockito.mock;

public class OpenSearchThrottlingModulePluginTests extends OpenSearchTestCase {
OpenSearchThrottlingModulePlugin openSearchThrottlingModulePlugin;
private ClusterService clusterService;
private ThreadPool threadPool;

@Override
public void setUp() throws Exception {
threadPool = new TestThreadPool("admission_controller_settings_test");
clusterService = new ClusterService(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool
);
super.setUp();
}

@Override
public void tearDown() throws Exception {
super.tearDown();
threadPool.shutdownNow();
}

public void testGetTransportInterceptors() {
openSearchThrottlingModulePlugin = new OpenSearchThrottlingModulePlugin();
List<TransportInterceptor> interceptors = openSearchThrottlingModulePlugin.getTransportInterceptors(
mock(NamedWriteableRegistry.class),
threadPool.getThreadContext()
);
assertEquals(interceptors.size(), 0);
AdmissionControlService admissionControlService = AdmissionControlService.newAdmissionControllerService(
Settings.EMPTY,
clusterService.getClusterSettings(),
threadPool
);
interceptors = openSearchThrottlingModulePlugin.getTransportInterceptors(
mock(NamedWriteableRegistry.class),
threadPool.getThreadContext()
);
assertEquals(interceptors.size(), 1);
assertEquals(interceptors.get(0).getClass(), AdmissionControlTransportInterceptor.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@
import org.opensearch.tasks.consumer.TopNSearchTasksLogger;
import org.opensearch.telemetry.TelemetrySettings;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.throttling.admissioncontroller.AdmissionControlSettings;
import org.opensearch.throttling.admissioncontroller.settings.IOBasedAdmissionControllerSettings;
import org.opensearch.transport.ProxyConnectionStrategy;
import org.opensearch.transport.RemoteClusterService;
import org.opensearch.transport.RemoteConnectionStrategy;
Expand Down Expand Up @@ -237,6 +239,10 @@ public void apply(Settings value, Settings current, Settings previous) {
public static Set<Setting<?>> BUILT_IN_CLUSTER_SETTINGS = Collections.unmodifiableSet(
new HashSet<>(
Arrays.asList(
AdmissionControlSettings.ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE,
AdmissionControlSettings.ADMISSION_CONTROLLER_FORCE_ENABLE_DEFAULT_SETTING,
IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE,
IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_URI_LIST,
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING,
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING,
AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING,
Expand Down
8 changes: 8 additions & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.RunnableTaskExecutionListener;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.throttling.admissioncontroller.AdmissionControlService;
import org.opensearch.transport.RemoteClusterService;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportInterceptor;
Expand Down Expand Up @@ -763,6 +764,12 @@ protected Node(
fileCacheCleaner
);

final AdmissionControlService admissionControlService = AdmissionControlService.newAdmissionControllerService(
settings,
clusterService.getClusterSettings(),
threadPool
);

final AliasValidator aliasValidator = new AliasValidator();

final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService, systemIndices);
Expand Down Expand Up @@ -1110,6 +1117,7 @@ protected Node(
b.bind(IndexingPressureService.class).toInstance(indexingPressureService);
b.bind(TaskResourceTrackingService.class).toInstance(taskResourceTrackingService);
b.bind(SearchBackpressureService.class).toInstance(searchBackpressureService);
b.bind(AdmissionControlService.class).toInstance(admissionControlService);
b.bind(UsageService.class).toInstance(usageService);
b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService());
b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.throttling.admissioncontroller;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.throttling.admissioncontroller.controllers.AdmissionController;
import org.opensearch.throttling.admissioncontroller.controllers.IOBasedAdmissionController;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static org.opensearch.throttling.admissioncontroller.AdmissionControlSettings.IO_BASED_ADMISSION_CONTROLLER;

/**
* Admission control Service that bootstraps and manages all the Admission Controllers in OpenSearch.
*/
public class AdmissionControlService {
private final ThreadPool threadPool;
public final AdmissionControlSettings admissionControlSettings;
private final ConcurrentMap<String, AdmissionController> ADMISSION_CONTROLLERS;
private static AdmissionControlService admissionControlService = null;
private static final Logger logger = LogManager.getLogger(AdmissionControlService.class);
private final ClusterSettings clusterSettings;
private final Settings settings;

/**
*
* @param settings Immutable settings instance
* @param clusterSettings ClusterSettings Instance
* @param threadPool ThreadPool Instance
*/
public AdmissionControlService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
this.threadPool = threadPool;
this.admissionControlSettings = new AdmissionControlSettings(clusterSettings, settings, this);
this.ADMISSION_CONTROLLERS = new ConcurrentHashMap<>();
this.clusterSettings = clusterSettings;
this.settings = settings;
this.initialise();
}

/**
* Initialise and Register all the admissionControllers
*/
public void initialise() {
// Initialise different type of admission controllers
registerAdmissionController(IO_BASED_ADMISSION_CONTROLLER);
}

/**
* Handler to trigger registered admissionController
*/
public boolean applyTransportLayerAdmissionController(String action) {
this.ADMISSION_CONTROLLERS.forEach((name, admissionController) -> { admissionController.acquire(admissionControlSettings); });
return true;
}

/**
*
* @return singleton admissionControllerService Instance
*/
public static AdmissionControlService getInstance() {
return admissionControlService;
}

/**
*
* @param settings Immutable settings instance
* @param clusterSettings ClusterSettings Instance
* @param threadPool ThreadPool Instance
* @return singleton admissionControllerService Instance
*/
public static synchronized AdmissionControlService newAdmissionControllerService(
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadPool
) {
if (admissionControlService == null) {
admissionControlService = new AdmissionControlService(settings, clusterSettings, threadPool);
}
return admissionControlService;
}

/**
*
* @return true if the admissionController Feature is enabled
*/
public Boolean isTransportLayerAdmissionControllerEnabled() {
return this.admissionControlSettings.isTransportLayerAdmissionControllerEnabled();
}

/**
*
* @return true if the admissionController Feature is enabled
*/
public Boolean isTransportLayerAdmissionControllerEnforced() {
return this.admissionControlSettings.isTransportLayerAdmissionControllerEnforced();
}

/**
*
* @param admissionControllerName admissionControllerName to register into the service.
*/
public void registerAdmissionController(String admissionControllerName) {
AdmissionController admissionController = this.controllerFactory(admissionControllerName);
if (admissionController != null) {
this.ADMISSION_CONTROLLERS.put(admissionControllerName, admissionController);
}
}

/**
* @return AdmissionController Instance
*/
private AdmissionController controllerFactory(String admissionControllerName) {
switch (admissionControllerName) {
case IO_BASED_ADMISSION_CONTROLLER:
return new IOBasedAdmissionController(admissionControllerName, this.settings, this.clusterSettings);
default:
return null;
}
}

/**
*
* @return list of the registered admissionControllers
*/
public List<AdmissionController> getListAdmissionControllers() {
return new ArrayList<>(this.ADMISSION_CONTROLLERS.values());
}

/**
*
* @param controllerName name of the admissionController
* @return instance of the AdmissionController Instance
*/
public AdmissionController getAdmissionController(String controllerName) {
if (this.ADMISSION_CONTROLLERS.containsKey(controllerName)) {
return this.ADMISSION_CONTROLLERS.get(controllerName);
}
return null;
}
}
Loading

0 comments on commit 25c16b6

Please sign in to comment.