Skip to content

Commit

Permalink
Add the means to extract the contextual properties from HttpChannel, …
Browse files Browse the repository at this point in the history
…TcpChannel and TrasportChannel without excessive typecasting (#10562)

* Add the means to extract the contextual properties from HttpChannel, TcpChannel and TrasportChannel without excessive typecasting

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>

* Address code review comments

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>

---------

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
  • Loading branch information
reta authored Oct 11, 2023
1 parent 489de2a commit c55e1b4
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Store] Add support to reload repository metadata inplace ([#9569](https://github.com/opensearch-project/OpenSearch/pull/9569))
- [Metrics Framework] Add Metrics framework. ([#10241](https://github.com/opensearch-project/OpenSearch/pull/10241))
- Updating the separator for RemoteStoreLockManager since underscore is allowed in base64UUID url charset ([#10379](https://github.com/opensearch-project/OpenSearch/pull/10379))
- Add the means to extract the contextual properties from HttpChannel, TcpCChannel and TrasportChannel without excessive typecasting ([#10562](https://github.com/opensearch-project/OpenSearch/pull/10562))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.transport.netty4.Netty4TcpChannel;

import java.net.InetSocketAddress;
import java.util.Optional;

import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
Expand Down Expand Up @@ -98,6 +99,22 @@ public Channel getNettyChannel() {
return channel;
}

@SuppressWarnings("unchecked")
@Override
public <T> Optional<T> get(String name, Class<T> clazz) {
Object handler = getNettyChannel().pipeline().get(name);

if (handler == null && inboundPipeline() != null) {
handler = inboundPipeline().get(name);
}

if (handler != null && clazz.isInstance(handler) == true) {
return Optional.of((T) handler);
}

return Optional.empty();
}

@Override
public String toString() {
return "Netty4HttpChannel{" + "localAddress=" + getLocalAddress() + ", remoteAddress=" + getRemoteAddress() + '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.transport.TransportException;

import java.net.InetSocketAddress;
import java.util.Optional;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
Expand Down Expand Up @@ -164,6 +165,18 @@ public void sendMessage(BytesReference reference, ActionListener<Void> listener)
}
}

@SuppressWarnings("unchecked")
@Override
public <T> Optional<T> get(String name, Class<T> clazz) {
final Object handler = getNettyChannel().pipeline().get(name);

if (handler != null && clazz.isInstance(handler) == true) {
return Optional.of((T) handler);
}

return Optional.empty();
}

public Channel getNettyChannel() {
return channel;
}
Expand Down
14 changes: 14 additions & 0 deletions server/src/main/java/org/opensearch/http/HttpChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.core.action.ActionListener;

import java.net.InetSocketAddress;
import java.util.Optional;

/**
* Represents an HTTP comms channel
Expand Down Expand Up @@ -72,4 +73,17 @@ default void handleException(Exception ex) {}
*/
InetSocketAddress getRemoteAddress();

/**
* Returns the contextual property associated with this specific HTTP channel (the
* implementation of how such properties are managed depends on the the particular
* transport engine).
*
* @param name the name of the property
* @param clazz the expected type of the property
*
* @return the value of the property
*/
default <T> Optional<T> get(String name, Class<T> clazz) {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.net.InetSocketAddress;
import java.util.Objects;
import java.util.Optional;

/**
* Tracer wrapped {@link HttpChannel}
Expand Down Expand Up @@ -92,4 +93,9 @@ public InetSocketAddress getLocalAddress() {
public InetSocketAddress getRemoteAddress() {
return delegate.getRemoteAddress();
}

@Override
public <T> Optional<T> get(String name, Class<T> clazz) {
return delegate.get(name, clazz);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.transport.TransportChannel;

import java.io.IOException;
import java.util.Optional;

/**
* Tracer wrapped {@link TransportChannel}
Expand Down Expand Up @@ -109,4 +110,9 @@ public void sendResponse(Exception exception) throws IOException {
public Version getVersion() {
return delegate.getVersion();
}

@Override
public <T> Optional<T> get(String name, Class<T> clazz) {
return delegate.get(name, clazz);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.core.transport.TransportResponse;

import java.io.IOException;
import java.util.Optional;

/**
* Transport channel for tasks
Expand Down Expand Up @@ -89,4 +90,9 @@ public Version getVersion() {
public TransportChannel getChannel() {
return channel;
}

@Override
public <T> Optional<T> get(String name, Class<T> clazz) {
return getChannel().get(name, clazz);
}
}
15 changes: 15 additions & 0 deletions server/src/main/java/org/opensearch/transport/TcpChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.core.common.bytes.BytesReference;

import java.net.InetSocketAddress;
import java.util.Optional;

/**
* This is a tcp channel representing a single channel connection to another node. It is the base channel
Expand Down Expand Up @@ -96,6 +97,20 @@ public interface TcpChannel extends CloseableChannel {
*/
ChannelStats getChannelStats();

/**
* Returns the contextual property associated with this specific TCP channel (the
* implementation of how such properties are managed depends on the the particular
* transport engine).
*
* @param name the name of the property
* @param clazz the expected type of the property
*
* @return the value of the property
*/
default <T> Optional<T> get(String name, Class<T> clazz) {
return Optional.empty();
}

/**
* Channel statistics
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.search.query.QuerySearchResult;

import java.io.IOException;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -130,4 +131,8 @@ public Version getVersion() {
return version;
}

@Override
public <T> Optional<T> get(String name, Class<T> clazz) {
return getChannel().get(name, clazz);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.core.transport.TransportResponse;

import java.io.IOException;
import java.util.Optional;

/**
* A transport channel allows to send a response to a request on the channel.
Expand Down Expand Up @@ -78,4 +79,18 @@ static void sendErrorResponse(TransportChannel channel, String actionName, Trans
);
}
}

/**
* Returns the contextual property associated with this specific transport channel (the
* implementation of how such properties are managed depends on the the particular
* transport engine).
*
* @param name the name of the property
* @param clazz the expected type of the property
*
* @return the value of the property.
*/
default <T> Optional<T> get(String name, Class<T> clazz) {
return Optional.empty();
}
}

0 comments on commit c55e1b4

Please sign in to comment.