Skip to content

Commit

Permalink
Admission Controller Module Rest and Transport Interceptor Commit
Browse files Browse the repository at this point in the history
Signed-off-by: Ajay Kumar Movva <movvaam@amazon.com>
  • Loading branch information
Ajay Kumar Movva committed Aug 12, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent db768d1 commit d9d0686
Showing 14 changed files with 390 additions and 7 deletions.
17 changes: 17 additions & 0 deletions modules/throttling/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
apply plugin: 'opensearch.java-rest-test'

opensearchplugin {
description 'Plugin intercepting requests and throttle based on resource consumption'
classname 'org.opensearch.throttling.OpenSearchThrottlingModulePlugin'
extendedPlugins = ['transport-netty4']
}

dependencies {
api project(path: ':modules:reindex')
implementation project(path: ':modules:transport-netty4')
compileOnly project(':modules:transport-netty4')
}

testClusters.all {
module ':modules:reindex'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.throttling;

import io.netty.channel.ChannelHandler;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.plugins.NetworkPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.throttling.admissioncontroller.AdmissionControllerRestHandler;
import org.opensearch.throttling.admissioncontroller.AdmissionControllerTransportInterceptor;
import org.opensearch.transport.Netty4HandlerExtension;
import org.opensearch.transport.TransportInterceptor;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class OpenSearchThrottlingModulePlugin extends Plugin implements NetworkPlugin, Netty4HandlerExtension {

private static final Map<String, ChannelHandler> HANDLERS = new HashMap<String, ChannelHandler>();

@Override
public List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry, ThreadContext threadContext) {
List<TransportInterceptor> interceptors = new ArrayList<>(0);
interceptors.add(new AdmissionControllerTransportInterceptor(null));
return interceptors;
}

@Override
public Map<String, ChannelHandler> getHandlers() {
if (HANDLERS.isEmpty()) {
HANDLERS.put("opensearch-throttling-plugin:AdmissionControlRestHandler", new AdmissionControllerRestHandler());
}
return HANDLERS;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.throttling.admissioncontroller;

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

@ChannelHandler.Sharable
public class AdmissionControllerRestHandler extends ChannelDuplexHandler {
private static final Logger LOGGER = LogManager.getLogger(AdmissionControllerRestHandler.class);

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
assert msg instanceof FullHttpRequest : "Invalid message type: " + msg.getClass();
FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;
String uri = getUri(fullHttpRequest);
applyAdmissionControl(uri);
ctx.fireChannelRead(msg);
}

private String getUri(FullHttpRequest fullHttpRequest) {
return fullHttpRequest.uri();
}

private void applyAdmissionControl(String requestURI) {
// apply admission controller
// LOGGER.info("Apply Admission Controller Triggered URI: " + requestURI);
}

private void releaseAdmissionControl(ChannelHandlerContext ctx) {
// release the acquired objects
// LOGGER.info("Released Admission Controller Handler");
}

private long getContentLength(FullHttpRequest fullHttpRequest) {
String contentLengthHeader = fullHttpRequest.headers().get(HttpHeaderNames.CONTENT_LENGTH);
return contentLengthHeader == null ? 0 : Long.parseLong(contentLengthHeader);
}

@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
releaseAdmissionControl(ctx);
super.close(ctx, promise);
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
releaseAdmissionControl(ctx);
super.write(ctx, msg, promise);
}
}
39 changes: 39 additions & 0 deletions modules/throttling/src/main/plugin-metadata/plugin-security.policy
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

grant {
// needed to generate runtime classes
permission java.lang.RuntimePermission "createClassLoader";

// needed to find the classloader to load allowlisted classes from
permission java.lang.RuntimePermission "getClassLoader";
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#
org.opensearch.throttling.OpenSearchThrottlingModulePlugin
Original file line number Diff line number Diff line change
@@ -93,12 +93,17 @@
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.NettyAllocator;
import org.opensearch.transport.NettyByteBufSizer;
import org.opensearch.transport.NettySettings;
import org.opensearch.transport.SharedGroupFactory;
import org.opensearch.transport.netty4.Netty4Utils;

import java.net.InetSocketAddress;
import java.net.SocketOption;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CHUNK_SIZE;
import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH;
@@ -182,6 +187,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {

private volatile ServerBootstrap serverBootstrap;
private volatile SharedGroupFactory.SharedGroup sharedGroup;
private List<ChannelHandler> channelHandlers;

public Netty4HttpServerTransport(
Settings settings,
@@ -223,6 +229,25 @@ public Netty4HttpServerTransport(
);
}

public Netty4HttpServerTransport(
Settings settings,
NetworkService networkService,
BigArrays bigArrays,
ThreadPool threadPool,
NamedXContentRegistry xContentRegistry,
Dispatcher dispatcher,
ClusterSettings clusterSettings,
SharedGroupFactory sharedGroupFactory,
Map<String, ChannelHandler> channelHandlers
) {
this(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, clusterSettings, sharedGroupFactory);
this.channelHandlers = NettySettings.HANDLER_ORDERING.get(settings)
.stream()
.map(channelHandlers::get)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}

public Settings settings() {
return this.settings;
}
@@ -418,7 +443,6 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpMessage msg) throws E
final ChannelPipeline pipeline = ctx.pipeline();
pipeline.addAfter(ctx.name(), "handler", getRequestHandler());
pipeline.replace(this, "decoder_compress", new HttpContentDecompressor());

pipeline.addAfter("decoder_compress", "aggregator", aggregator);
if (handlingSettings.isCompression()) {
pipeline.addAfter(
@@ -430,7 +454,9 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpMessage msg) throws E
pipeline.addBefore("handler", "request_creator", requestCreator);
pipeline.addBefore("handler", "response_creator", responseCreator);
pipeline.addBefore("handler", "pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents));

transport.channelHandlers.forEach(
handler -> ch.pipeline().addBefore("request_creator", handler.getClass().getSimpleName(), handler)
);
ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
}
});
@@ -497,7 +523,6 @@ protected void initChannel(Channel childChannel) throws Exception {
childChannel.pipeline()
.addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
}

childChannel.pipeline()
.addLast("aggregator", aggregator)
.addLast("request_creator", requestCreator)
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.transport;

import io.netty.channel.ChannelHandler;

import java.util.Collections;
import java.util.Map;

public interface Netty4HandlerExtension {
default Map<String, ChannelHandler> getHandlers() {
return Collections.emptyMap();
}
}
Original file line number Diff line number Diff line change
@@ -32,6 +32,7 @@

package org.opensearch.transport;

import io.netty.channel.ChannelHandler;
import org.opensearch.Version;
import org.opensearch.common.SetOnce;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
@@ -46,23 +47,43 @@
import org.opensearch.http.HttpServerTransport;
import org.opensearch.http.netty4.Netty4HttpServerTransport;
import org.opensearch.core.indices.breaker.CircuitBreakerService;
import org.opensearch.plugins.ExtensiblePlugin;
import org.opensearch.plugins.NetworkPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.netty4.Netty4Transport;

import java.util.Arrays;
import java.util.Collections;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.Arrays;
import java.util.Map;
import java.util.Collections;
import java.util.HashMap;
import java.util.function.Supplier;

public class Netty4ModulePlugin extends Plugin implements NetworkPlugin {
public class Netty4ModulePlugin extends Plugin implements NetworkPlugin, ExtensiblePlugin {

public static final String NETTY_TRANSPORT_NAME = "netty4";
public static final String NETTY_HTTP_TRANSPORT_NAME = "netty4";

private final SetOnce<SharedGroupFactory> groupFactory = new SetOnce<>();
private static final List<Netty4HandlerExtension> EXTENSIONS = new ArrayList<>();

@Override
public void loadExtensions(ExtensionLoader loader) {
Set<String> uniqueNames = new HashSet<>();
for (Netty4HandlerExtension extension : loader.loadExtensions(Netty4HandlerExtension.class)) {
String name = extension.getClass().getName();
if (uniqueNames.contains(name)) {
continue;
}
Netty4ModulePlugin.EXTENSIONS.add(extension);
uniqueNames.add(name);
}
assert 1 == Netty4ModulePlugin.EXTENSIONS.size() : "More than 1 extensions are not supported";
}

@Override
public List<Setting<?>> getSettings() {
@@ -124,6 +145,8 @@ public Map<String, Supplier<HttpServerTransport>> getHttpTransports(
HttpServerTransport.Dispatcher dispatcher,
ClusterSettings clusterSettings
) {
Map<String, ChannelHandler> channelHandlers = new HashMap<>();
Netty4ModulePlugin.EXTENSIONS.forEach(extension -> channelHandlers.putAll(extension.getHandlers()));
return Collections.singletonMap(
NETTY_HTTP_TRANSPORT_NAME,
() -> new Netty4HttpServerTransport(
@@ -134,7 +157,8 @@ public Map<String, Supplier<HttpServerTransport>> getHttpTransports(
xContentRegistry,
dispatcher,
clusterSettings,
getSharedGroupFactory(settings)
getSharedGroupFactory(settings),
channelHandlers
)
);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.transport;

import org.opensearch.common.settings.Setting;

import java.util.Arrays;
import java.util.List;
import java.util.function.Function;

public final class NettySettings {
private NettySettings() {}

// TODO: Evaluate which one is better, Moving this to yml(AMI Config) or keeping it here.
public static final Setting<List<String>> HANDLER_ORDERING = Setting.listSetting(
"opensearch.netty.plugin.handler.ordering",
Arrays.asList("opensearch-throttling-plugin:AdmissionControlRestHandler"),
Function.identity(),
Setting.Property.NodeScope
);
}
Loading

0 comments on commit d9d0686

Please sign in to comment.