From ab4140cefe1e57592a36a313fc9282b95c56472a Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Wed, 11 Oct 2023 09:27:04 -0400 Subject: [PATCH 1/2] Add the means to extract the contextual properties from HttpChannel, TcpChannel and TrasportChannel without excessive typecasting Signed-off-by: Andriy Redko --- CHANGELOG.md | 1 + .../http/netty4/Netty4HttpChannel.java | 16 ++++++++++++++++ .../transport/netty4/Netty4TcpChannel.java | 12 ++++++++++++ .../java/org/opensearch/http/HttpChannel.java | 14 ++++++++++++++ .../tracing/channels/TraceableHttpChannel.java | 5 +++++ .../channels/TraceableTcpTransportChannel.java | 5 +++++ .../transport/TaskTransportChannel.java | 5 +++++ .../org/opensearch/transport/TcpChannel.java | 15 +++++++++++++++ .../transport/TcpTransportChannel.java | 4 ++++ .../opensearch/transport/TransportChannel.java | 15 +++++++++++++++ 10 files changed, 92 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e107d2c04539..8df01835e66e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -138,6 +138,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Remote Store] Add support to reload repository metadata inplace ([#9569](https://github.com/opensearch-project/OpenSearch/pull/9569)) - [Metrics Framework] Add Metrics framework. ([#10241](https://github.com/opensearch-project/OpenSearch/pull/10241)) - Updating the separator for RemoteStoreLockManager since underscore is allowed in base64UUID url charset ([#10379](https://github.com/opensearch-project/OpenSearch/pull/10379)) +- Add the means to extract the contextual properties from HttpChannel, TcpCChannel and TrasportChannel without excessive typecasting ([#10562](https://github.com/opensearch-project/OpenSearch/pull/10562)) ### Deprecated diff --git a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpChannel.java b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpChannel.java index a83330356e35e..bc9615e0e3691 100644 --- a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpChannel.java +++ b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpChannel.java @@ -98,6 +98,22 @@ public Channel getNettyChannel() { return channel; } + @SuppressWarnings("unchecked") + @Override + public T get(String name, Class clazz) { + Object handler = getNettyChannel().pipeline().get(name); + + if (handler == null && inboundPipeline() != null) { + handler = inboundPipeline().get(name); + } + + if (handler != null && clazz.isInstance(handler) == true) { + return (T) handler; + } + + return null; + } + @Override public String toString() { return "Netty4HttpChannel{" + "localAddress=" + getLocalAddress() + ", remoteAddress=" + getRemoteAddress() + '}'; diff --git a/modules/transport-netty4/src/main/java/org/opensearch/transport/netty4/Netty4TcpChannel.java b/modules/transport-netty4/src/main/java/org/opensearch/transport/netty4/Netty4TcpChannel.java index 5db1f7c333157..53334926110f4 100644 --- a/modules/transport-netty4/src/main/java/org/opensearch/transport/netty4/Netty4TcpChannel.java +++ b/modules/transport-netty4/src/main/java/org/opensearch/transport/netty4/Netty4TcpChannel.java @@ -164,6 +164,18 @@ public void sendMessage(BytesReference reference, ActionListener listener) } } + @SuppressWarnings("unchecked") + @Override + public T get(String name, Class clazz) { + final Object handler = getNettyChannel().pipeline().get(name); + + if (handler != null && clazz.isInstance(handler) == true) { + return (T) handler; + } + + return null; + } + public Channel getNettyChannel() { return channel; } diff --git a/server/src/main/java/org/opensearch/http/HttpChannel.java b/server/src/main/java/org/opensearch/http/HttpChannel.java index 6dcdaf9034413..9505c3894b6b8 100644 --- a/server/src/main/java/org/opensearch/http/HttpChannel.java +++ b/server/src/main/java/org/opensearch/http/HttpChannel.java @@ -72,4 +72,18 @@ default void handleException(Exception ex) {} */ InetSocketAddress getRemoteAddress(); + /** + * Returns the contextual property associated with this specific HTTP channel (the + * implementation of how such properties are managed depends on the the particular + * transport engine). + * + * @param name the name of the property + * @param clazz the expected type of the property + * + * @return the value of the property. + * {@code null} if there's no such property or the expected type is not compatible. + */ + default T get(String name, Class clazz) { + return null; + } } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableHttpChannel.java b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableHttpChannel.java index 03848e8e58207..19e594cf97a13 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableHttpChannel.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableHttpChannel.java @@ -92,4 +92,9 @@ public InetSocketAddress getLocalAddress() { public InetSocketAddress getRemoteAddress() { return delegate.getRemoteAddress(); } + + @Override + public T get(String name, Class clazz) { + return delegate.get(name, clazz); + } } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java index 333e06eb037cb..ca9308a01c6f1 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java @@ -109,4 +109,9 @@ public void sendResponse(Exception exception) throws IOException { public Version getVersion() { return delegate.getVersion(); } + + @Override + public T get(String name, Class clazz) { + return delegate.get(name, clazz); + } } diff --git a/server/src/main/java/org/opensearch/transport/TaskTransportChannel.java b/server/src/main/java/org/opensearch/transport/TaskTransportChannel.java index 052611317f174..611cae840a3f7 100644 --- a/server/src/main/java/org/opensearch/transport/TaskTransportChannel.java +++ b/server/src/main/java/org/opensearch/transport/TaskTransportChannel.java @@ -89,4 +89,9 @@ public Version getVersion() { public TransportChannel getChannel() { return channel; } + + @Override + public T get(String name, Class clazz) { + return getChannel().get(name, clazz); + } } diff --git a/server/src/main/java/org/opensearch/transport/TcpChannel.java b/server/src/main/java/org/opensearch/transport/TcpChannel.java index eac137ec30f1a..e447a038c6ff7 100644 --- a/server/src/main/java/org/opensearch/transport/TcpChannel.java +++ b/server/src/main/java/org/opensearch/transport/TcpChannel.java @@ -96,6 +96,21 @@ public interface TcpChannel extends CloseableChannel { */ ChannelStats getChannelStats(); + /** + * Returns the contextual property associated with this specific TCP channel (the + * implementation of how such properties are managed depends on the the particular + * transport engine). + * + * @param name the name of the property + * @param clazz the expected type of the property + * + * @return the value of the property. + * {@code null} if there's no such property or the expected type is not compatible. + */ + default T get(String name, Class clazz) { + return null; + } + /** * Channel statistics * diff --git a/server/src/main/java/org/opensearch/transport/TcpTransportChannel.java b/server/src/main/java/org/opensearch/transport/TcpTransportChannel.java index a7bedcf93e129..e6884e115b5fe 100644 --- a/server/src/main/java/org/opensearch/transport/TcpTransportChannel.java +++ b/server/src/main/java/org/opensearch/transport/TcpTransportChannel.java @@ -130,4 +130,8 @@ public Version getVersion() { return version; } + @Override + public T get(String name, Class clazz) { + return getChannel().get(name, clazz); + } } diff --git a/server/src/main/java/org/opensearch/transport/TransportChannel.java b/server/src/main/java/org/opensearch/transport/TransportChannel.java index 3c582127f28e8..00f2ec9d43b7d 100644 --- a/server/src/main/java/org/opensearch/transport/TransportChannel.java +++ b/server/src/main/java/org/opensearch/transport/TransportChannel.java @@ -78,4 +78,19 @@ static void sendErrorResponse(TransportChannel channel, String actionName, Trans ); } } + + /** + * Returns the contextual property associated with this specific transport channel (the + * implementation of how such properties are managed depends on the the particular + * transport engine). + * + * @param name the name of the property + * @param clazz the expected type of the property + * + * @return the value of the property. + * {@code null} if there's no such property or the expected type is not compatible. + */ + default T get(String name, Class clazz) { + return null; + } } From 1c184e229f191fdebc803899b42ba5dfc5319b55 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Wed, 11 Oct 2023 12:16:47 -0400 Subject: [PATCH 2/2] Address code review comments Signed-off-by: Andriy Redko --- .../org/opensearch/http/netty4/Netty4HttpChannel.java | 7 ++++--- .../org/opensearch/transport/netty4/Netty4TcpChannel.java | 7 ++++--- server/src/main/java/org/opensearch/http/HttpChannel.java | 8 ++++---- .../telemetry/tracing/channels/TraceableHttpChannel.java | 3 ++- .../tracing/channels/TraceableTcpTransportChannel.java | 3 ++- .../org/opensearch/transport/TaskTransportChannel.java | 3 ++- .../main/java/org/opensearch/transport/TcpChannel.java | 8 ++++---- .../org/opensearch/transport/TcpTransportChannel.java | 3 ++- .../java/org/opensearch/transport/TransportChannel.java | 6 +++--- 9 files changed, 27 insertions(+), 21 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpChannel.java b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpChannel.java index bc9615e0e3691..6475a0b744c60 100644 --- a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpChannel.java +++ b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpChannel.java @@ -40,6 +40,7 @@ import org.opensearch.transport.netty4.Netty4TcpChannel; import java.net.InetSocketAddress; +import java.util.Optional; import io.netty.channel.Channel; import io.netty.channel.ChannelPipeline; @@ -100,7 +101,7 @@ public Channel getNettyChannel() { @SuppressWarnings("unchecked") @Override - public T get(String name, Class clazz) { + public Optional get(String name, Class clazz) { Object handler = getNettyChannel().pipeline().get(name); if (handler == null && inboundPipeline() != null) { @@ -108,10 +109,10 @@ public T get(String name, Class clazz) { } if (handler != null && clazz.isInstance(handler) == true) { - return (T) handler; + return Optional.of((T) handler); } - return null; + return Optional.empty(); } @Override diff --git a/modules/transport-netty4/src/main/java/org/opensearch/transport/netty4/Netty4TcpChannel.java b/modules/transport-netty4/src/main/java/org/opensearch/transport/netty4/Netty4TcpChannel.java index 53334926110f4..79a5bf9e95121 100644 --- a/modules/transport-netty4/src/main/java/org/opensearch/transport/netty4/Netty4TcpChannel.java +++ b/modules/transport-netty4/src/main/java/org/opensearch/transport/netty4/Netty4TcpChannel.java @@ -41,6 +41,7 @@ import org.opensearch.transport.TransportException; import java.net.InetSocketAddress; +import java.util.Optional; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; @@ -166,14 +167,14 @@ public void sendMessage(BytesReference reference, ActionListener listener) @SuppressWarnings("unchecked") @Override - public T get(String name, Class clazz) { + public Optional get(String name, Class clazz) { final Object handler = getNettyChannel().pipeline().get(name); if (handler != null && clazz.isInstance(handler) == true) { - return (T) handler; + return Optional.of((T) handler); } - return null; + return Optional.empty(); } public Channel getNettyChannel() { diff --git a/server/src/main/java/org/opensearch/http/HttpChannel.java b/server/src/main/java/org/opensearch/http/HttpChannel.java index 9505c3894b6b8..679a5d73c7837 100644 --- a/server/src/main/java/org/opensearch/http/HttpChannel.java +++ b/server/src/main/java/org/opensearch/http/HttpChannel.java @@ -36,6 +36,7 @@ import org.opensearch.core.action.ActionListener; import java.net.InetSocketAddress; +import java.util.Optional; /** * Represents an HTTP comms channel @@ -80,10 +81,9 @@ default void handleException(Exception ex) {} * @param name the name of the property * @param clazz the expected type of the property * - * @return the value of the property. - * {@code null} if there's no such property or the expected type is not compatible. + * @return the value of the property */ - default T get(String name, Class clazz) { - return null; + default Optional get(String name, Class clazz) { + return Optional.empty(); } } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableHttpChannel.java b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableHttpChannel.java index 19e594cf97a13..0a9757310fe8b 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableHttpChannel.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableHttpChannel.java @@ -18,6 +18,7 @@ import java.net.InetSocketAddress; import java.util.Objects; +import java.util.Optional; /** * Tracer wrapped {@link HttpChannel} @@ -94,7 +95,7 @@ public InetSocketAddress getRemoteAddress() { } @Override - public T get(String name, Class clazz) { + public Optional get(String name, Class clazz) { return delegate.get(name, clazz); } } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java index ca9308a01c6f1..bd60c35c3baac 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java @@ -20,6 +20,7 @@ import org.opensearch.transport.TransportChannel; import java.io.IOException; +import java.util.Optional; /** * Tracer wrapped {@link TransportChannel} @@ -111,7 +112,7 @@ public Version getVersion() { } @Override - public T get(String name, Class clazz) { + public Optional get(String name, Class clazz) { return delegate.get(name, clazz); } } diff --git a/server/src/main/java/org/opensearch/transport/TaskTransportChannel.java b/server/src/main/java/org/opensearch/transport/TaskTransportChannel.java index 611cae840a3f7..4dab0039ec878 100644 --- a/server/src/main/java/org/opensearch/transport/TaskTransportChannel.java +++ b/server/src/main/java/org/opensearch/transport/TaskTransportChannel.java @@ -37,6 +37,7 @@ import org.opensearch.core.transport.TransportResponse; import java.io.IOException; +import java.util.Optional; /** * Transport channel for tasks @@ -91,7 +92,7 @@ public TransportChannel getChannel() { } @Override - public T get(String name, Class clazz) { + public Optional get(String name, Class clazz) { return getChannel().get(name, clazz); } } diff --git a/server/src/main/java/org/opensearch/transport/TcpChannel.java b/server/src/main/java/org/opensearch/transport/TcpChannel.java index e447a038c6ff7..f98b65d0a4df1 100644 --- a/server/src/main/java/org/opensearch/transport/TcpChannel.java +++ b/server/src/main/java/org/opensearch/transport/TcpChannel.java @@ -38,6 +38,7 @@ import org.opensearch.core.common.bytes.BytesReference; import java.net.InetSocketAddress; +import java.util.Optional; /** * This is a tcp channel representing a single channel connection to another node. It is the base channel @@ -104,11 +105,10 @@ public interface TcpChannel extends CloseableChannel { * @param name the name of the property * @param clazz the expected type of the property * - * @return the value of the property. - * {@code null} if there's no such property or the expected type is not compatible. + * @return the value of the property */ - default T get(String name, Class clazz) { - return null; + default Optional get(String name, Class clazz) { + return Optional.empty(); } /** diff --git a/server/src/main/java/org/opensearch/transport/TcpTransportChannel.java b/server/src/main/java/org/opensearch/transport/TcpTransportChannel.java index e6884e115b5fe..81de0af07ea7c 100644 --- a/server/src/main/java/org/opensearch/transport/TcpTransportChannel.java +++ b/server/src/main/java/org/opensearch/transport/TcpTransportChannel.java @@ -38,6 +38,7 @@ import org.opensearch.search.query.QuerySearchResult; import java.io.IOException; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -131,7 +132,7 @@ public Version getVersion() { } @Override - public T get(String name, Class clazz) { + public Optional get(String name, Class clazz) { return getChannel().get(name, clazz); } } diff --git a/server/src/main/java/org/opensearch/transport/TransportChannel.java b/server/src/main/java/org/opensearch/transport/TransportChannel.java index 00f2ec9d43b7d..7423d59103302 100644 --- a/server/src/main/java/org/opensearch/transport/TransportChannel.java +++ b/server/src/main/java/org/opensearch/transport/TransportChannel.java @@ -39,6 +39,7 @@ import org.opensearch.core.transport.TransportResponse; import java.io.IOException; +import java.util.Optional; /** * A transport channel allows to send a response to a request on the channel. @@ -88,9 +89,8 @@ static void sendErrorResponse(TransportChannel channel, String actionName, Trans * @param clazz the expected type of the property * * @return the value of the property. - * {@code null} if there's no such property or the expected type is not compatible. */ - default T get(String name, Class clazz) { - return null; + default Optional get(String name, Class clazz) { + return Optional.empty(); } }