From c55e1b47bffd40defc86686e9c6961d32dd087c2 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Wed, 11 Oct 2023 14:03:49 -0400 Subject: [PATCH] Add the means to extract the contextual properties from HttpChannel, TcpChannel and TrasportChannel without excessive typecasting (#10562) * Add the means to extract the contextual properties from HttpChannel, TcpChannel and TrasportChannel without excessive typecasting Signed-off-by: Andriy Redko * Address code review comments Signed-off-by: Andriy Redko --------- Signed-off-by: Andriy Redko --- CHANGELOG.md | 1 + .../http/netty4/Netty4HttpChannel.java | 17 +++++++++++++++++ .../transport/netty4/Netty4TcpChannel.java | 13 +++++++++++++ .../java/org/opensearch/http/HttpChannel.java | 14 ++++++++++++++ .../tracing/channels/TraceableHttpChannel.java | 6 ++++++ .../channels/TraceableTcpTransportChannel.java | 6 ++++++ .../transport/TaskTransportChannel.java | 6 ++++++ .../org/opensearch/transport/TcpChannel.java | 15 +++++++++++++++ .../transport/TcpTransportChannel.java | 5 +++++ .../opensearch/transport/TransportChannel.java | 15 +++++++++++++++ 10 files changed, 98 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 563f3e43e9707..5e31085eb3935 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -139,6 +139,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Remote Store] Add support to reload repository metadata inplace ([#9569](https://github.com/opensearch-project/OpenSearch/pull/9569)) - [Metrics Framework] Add Metrics framework. ([#10241](https://github.com/opensearch-project/OpenSearch/pull/10241)) - Updating the separator for RemoteStoreLockManager since underscore is allowed in base64UUID url charset ([#10379](https://github.com/opensearch-project/OpenSearch/pull/10379)) +- Add the means to extract the contextual properties from HttpChannel, TcpCChannel and TrasportChannel without excessive typecasting ([#10562](https://github.com/opensearch-project/OpenSearch/pull/10562)) ### Deprecated diff --git a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpChannel.java b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpChannel.java index a83330356e35e..6475a0b744c60 100644 --- a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpChannel.java +++ b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpChannel.java @@ -40,6 +40,7 @@ import org.opensearch.transport.netty4.Netty4TcpChannel; import java.net.InetSocketAddress; +import java.util.Optional; import io.netty.channel.Channel; import io.netty.channel.ChannelPipeline; @@ -98,6 +99,22 @@ public Channel getNettyChannel() { return channel; } + @SuppressWarnings("unchecked") + @Override + public Optional get(String name, Class clazz) { + Object handler = getNettyChannel().pipeline().get(name); + + if (handler == null && inboundPipeline() != null) { + handler = inboundPipeline().get(name); + } + + if (handler != null && clazz.isInstance(handler) == true) { + return Optional.of((T) handler); + } + + return Optional.empty(); + } + @Override public String toString() { return "Netty4HttpChannel{" + "localAddress=" + getLocalAddress() + ", remoteAddress=" + getRemoteAddress() + '}'; diff --git a/modules/transport-netty4/src/main/java/org/opensearch/transport/netty4/Netty4TcpChannel.java b/modules/transport-netty4/src/main/java/org/opensearch/transport/netty4/Netty4TcpChannel.java index 5db1f7c333157..79a5bf9e95121 100644 --- a/modules/transport-netty4/src/main/java/org/opensearch/transport/netty4/Netty4TcpChannel.java +++ b/modules/transport-netty4/src/main/java/org/opensearch/transport/netty4/Netty4TcpChannel.java @@ -41,6 +41,7 @@ import org.opensearch.transport.TransportException; import java.net.InetSocketAddress; +import java.util.Optional; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; @@ -164,6 +165,18 @@ public void sendMessage(BytesReference reference, ActionListener listener) } } + @SuppressWarnings("unchecked") + @Override + public Optional get(String name, Class clazz) { + final Object handler = getNettyChannel().pipeline().get(name); + + if (handler != null && clazz.isInstance(handler) == true) { + return Optional.of((T) handler); + } + + return Optional.empty(); + } + public Channel getNettyChannel() { return channel; } diff --git a/server/src/main/java/org/opensearch/http/HttpChannel.java b/server/src/main/java/org/opensearch/http/HttpChannel.java index 6dcdaf9034413..679a5d73c7837 100644 --- a/server/src/main/java/org/opensearch/http/HttpChannel.java +++ b/server/src/main/java/org/opensearch/http/HttpChannel.java @@ -36,6 +36,7 @@ import org.opensearch.core.action.ActionListener; import java.net.InetSocketAddress; +import java.util.Optional; /** * Represents an HTTP comms channel @@ -72,4 +73,17 @@ default void handleException(Exception ex) {} */ InetSocketAddress getRemoteAddress(); + /** + * Returns the contextual property associated with this specific HTTP channel (the + * implementation of how such properties are managed depends on the the particular + * transport engine). + * + * @param name the name of the property + * @param clazz the expected type of the property + * + * @return the value of the property + */ + default Optional get(String name, Class clazz) { + return Optional.empty(); + } } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableHttpChannel.java b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableHttpChannel.java index 03848e8e58207..0a9757310fe8b 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableHttpChannel.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableHttpChannel.java @@ -18,6 +18,7 @@ import java.net.InetSocketAddress; import java.util.Objects; +import java.util.Optional; /** * Tracer wrapped {@link HttpChannel} @@ -92,4 +93,9 @@ public InetSocketAddress getLocalAddress() { public InetSocketAddress getRemoteAddress() { return delegate.getRemoteAddress(); } + + @Override + public Optional get(String name, Class clazz) { + return delegate.get(name, clazz); + } } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java index 333e06eb037cb..bd60c35c3baac 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java @@ -20,6 +20,7 @@ import org.opensearch.transport.TransportChannel; import java.io.IOException; +import java.util.Optional; /** * Tracer wrapped {@link TransportChannel} @@ -109,4 +110,9 @@ public void sendResponse(Exception exception) throws IOException { public Version getVersion() { return delegate.getVersion(); } + + @Override + public Optional get(String name, Class clazz) { + return delegate.get(name, clazz); + } } diff --git a/server/src/main/java/org/opensearch/transport/TaskTransportChannel.java b/server/src/main/java/org/opensearch/transport/TaskTransportChannel.java index 052611317f174..4dab0039ec878 100644 --- a/server/src/main/java/org/opensearch/transport/TaskTransportChannel.java +++ b/server/src/main/java/org/opensearch/transport/TaskTransportChannel.java @@ -37,6 +37,7 @@ import org.opensearch.core.transport.TransportResponse; import java.io.IOException; +import java.util.Optional; /** * Transport channel for tasks @@ -89,4 +90,9 @@ public Version getVersion() { public TransportChannel getChannel() { return channel; } + + @Override + public Optional get(String name, Class clazz) { + return getChannel().get(name, clazz); + } } diff --git a/server/src/main/java/org/opensearch/transport/TcpChannel.java b/server/src/main/java/org/opensearch/transport/TcpChannel.java index eac137ec30f1a..f98b65d0a4df1 100644 --- a/server/src/main/java/org/opensearch/transport/TcpChannel.java +++ b/server/src/main/java/org/opensearch/transport/TcpChannel.java @@ -38,6 +38,7 @@ import org.opensearch.core.common.bytes.BytesReference; import java.net.InetSocketAddress; +import java.util.Optional; /** * This is a tcp channel representing a single channel connection to another node. It is the base channel @@ -96,6 +97,20 @@ public interface TcpChannel extends CloseableChannel { */ ChannelStats getChannelStats(); + /** + * Returns the contextual property associated with this specific TCP channel (the + * implementation of how such properties are managed depends on the the particular + * transport engine). + * + * @param name the name of the property + * @param clazz the expected type of the property + * + * @return the value of the property + */ + default Optional get(String name, Class clazz) { + return Optional.empty(); + } + /** * Channel statistics * diff --git a/server/src/main/java/org/opensearch/transport/TcpTransportChannel.java b/server/src/main/java/org/opensearch/transport/TcpTransportChannel.java index a7bedcf93e129..81de0af07ea7c 100644 --- a/server/src/main/java/org/opensearch/transport/TcpTransportChannel.java +++ b/server/src/main/java/org/opensearch/transport/TcpTransportChannel.java @@ -38,6 +38,7 @@ import org.opensearch.search.query.QuerySearchResult; import java.io.IOException; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -130,4 +131,8 @@ public Version getVersion() { return version; } + @Override + public Optional get(String name, Class clazz) { + return getChannel().get(name, clazz); + } } diff --git a/server/src/main/java/org/opensearch/transport/TransportChannel.java b/server/src/main/java/org/opensearch/transport/TransportChannel.java index 3c582127f28e8..7423d59103302 100644 --- a/server/src/main/java/org/opensearch/transport/TransportChannel.java +++ b/server/src/main/java/org/opensearch/transport/TransportChannel.java @@ -39,6 +39,7 @@ import org.opensearch.core.transport.TransportResponse; import java.io.IOException; +import java.util.Optional; /** * A transport channel allows to send a response to a request on the channel. @@ -78,4 +79,18 @@ static void sendErrorResponse(TransportChannel channel, String actionName, Trans ); } } + + /** + * Returns the contextual property associated with this specific transport channel (the + * implementation of how such properties are managed depends on the the particular + * transport engine). + * + * @param name the name of the property + * @param clazz the expected type of the property + * + * @return the value of the property. + */ + default Optional get(String name, Class clazz) { + return Optional.empty(); + } }