Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature/extensions] Integrated CreateComponent extensionPoint #3265

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(waitForTimedOut);
}

@Override
public String toString() {
return "ClusterStateResponse{" + "clusterState=" + clusterState + '}';
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.settings.Settings;
import org.opensearch.transport.TransportResponse;

import java.io.IOException;
import java.util.Objects;

/**
* PluginSettings Response for Extensibility
*
* @opensearch.internal
*/
public class ClusterSettingsResponse extends TransportResponse {
private final Settings clusterSettings;

public ClusterSettingsResponse(ClusterService clusterService) {
this.clusterSettings = clusterService.getSettings();
}

public ClusterSettingsResponse(StreamInput in) throws IOException {
super(in);
this.clusterSettings = Settings.readSettingsFromStream(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
Settings.writeSettingsToStream(clusterSettings, out);
}

@Override
public String toString() {
return "ClusterSettingsResponse{" + "clusterSettings=" + clusterSettings + '}';
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ClusterSettingsResponse that = (ClusterSettingsResponse) o;
return Objects.equals(clusterSettings, that.clusterSettings);
}

@Override
public int hashCode() {
return Objects.hash(clusterSettings);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster;

import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.transport.TransportResponse;

import java.io.IOException;
import java.util.Objects;

/**
* LocalNode Response for Extensibility
*
* @opensearch.internal
*/
public class LocalNodeResponse extends TransportResponse {
private final DiscoveryNode localNode;

public LocalNodeResponse(ClusterService clusterService) {
this.localNode = clusterService.localNode();
}

public LocalNodeResponse(StreamInput in) throws IOException {
super(in);
this.localNode = new DiscoveryNode(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
this.localNode.writeTo(out);
}

@Override
public String toString() {
return "LocalNodeResponse{" + "localNode=" + localNode + '}';
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
LocalNodeResponse that = (LocalNodeResponse) o;
return Objects.equals(localNode, that.localNode);
}

@Override
public int hashCode() {
return Objects.hash(localNode);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.extensions;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.transport.TransportRequest;

import java.io.IOException;
import java.util.Objects;

/**
* CLusterService Request for Extensibility
*
* @opensearch.internal
*/
public class ExtensionRequest extends TransportRequest {
private static final Logger logger = LogManager.getLogger(ExtensionRequest.class);
private ExtensionsOrchestrator.RequestType requestType;

public ExtensionRequest(ExtensionsOrchestrator.RequestType requestType) {
this.requestType = requestType;
}

public ExtensionRequest(StreamInput in) throws IOException {
super(in);
this.requestType = in.readEnum(ExtensionsOrchestrator.RequestType.class);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeEnum(requestType);
}

public ExtensionsOrchestrator.RequestType getRequestType() {
return this.requestType;
}

public String toString() {
return "ExtensionRequest{" + "requestType=" + requestType + '}';
}

@Override
public boolean equals(Object o) {

if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ExtensionRequest that = (ExtensionRequest) o;
return Objects.equals(requestType, that.requestType);
}

@Override
public int hashCode() {
return Objects.hash(requestType);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.Version;
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.cluster.*;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.io.FileSystemUtils;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.settings.Settings;
Expand All @@ -49,6 +52,7 @@
import org.opensearch.plugins.PluginsService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponse;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;

Expand All @@ -61,20 +65,39 @@ public class ExtensionsOrchestrator implements ReportingService<PluginsAndModule
public static final String REQUEST_EXTENSION_ACTION_NAME = "internal:discovery/extensions";
public static final String INDICES_EXTENSION_POINT_ACTION_NAME = "indices:internal/extensions";
public static final String INDICES_EXTENSION_NAME_ACTION_NAME = "indices:internal/name";
public static final String REQUEST_EXTENSION_CLUSTER_STATE = "internal:discovery/clusterstate";
public static final String REQUEST_EXTENSION_LOCAL_NODE = "internal:discovery/localnode";
public static final String REQUEST_EXTENSION_CLUSTER_SETTINGS = "internal:discovery/clustersettings";

private static final Logger logger = LogManager.getLogger(ExtensionsOrchestrator.class);

/**
* Enum for Extension Requests
*
* @opensearch.internal
*/
public static enum RequestType {
REQUEST_EXTENSION_CLUSTER_STATE,
REQUEST_EXTENSION_LOCAL_NODE,
REQUEST_EXTENSION_CLUSTER_SETTINGS,
CREATE_COMPONENT,
ON_INDEX_MODULE,
GET_SETTINGS
};

private final Path extensionsPath;
final Set<DiscoveryExtension> extensionsSet;
Set<DiscoveryExtension> extensionsInitializedSet;
TransportService transportService;
ClusterService clusterService;

public ExtensionsOrchestrator(Settings settings, Path extensionsPath) throws IOException {
logger.info("ExtensionsOrchestrator initialized");
this.extensionsPath = extensionsPath;
this.transportService = null;
this.extensionsSet = new HashSet<DiscoveryExtension>();
this.extensionsInitializedSet = new HashSet<DiscoveryExtension>();

this.clusterService = null;
/*
* Now Discover extensions
*/
Expand All @@ -86,6 +109,34 @@ public void setTransportService(TransportService transportService) {
this.transportService = transportService;
}

public void setClusterService(ClusterService clusterService) {
this.clusterService = clusterService;
transportService.registerRequestHandler(
REQUEST_EXTENSION_CLUSTER_STATE,
ThreadPool.Names.GENERIC,
false,
false,
ExtensionRequest::new,
((request, channel, task) -> channel.sendResponse(handleExtensionRequest(request)))
);
transportService.registerRequestHandler(
REQUEST_EXTENSION_LOCAL_NODE,
ThreadPool.Names.GENERIC,
false,
false,
ExtensionRequest::new,
((request, channel, task) -> channel.sendResponse(handleExtensionRequest(request)))
);
transportService.registerRequestHandler(
REQUEST_EXTENSION_CLUSTER_SETTINGS,
ThreadPool.Names.GENERIC,
false,
false,
ExtensionRequest::new,
((request, channel, task) -> channel.sendResponse(handleExtensionRequest(request)))
);
}

@Override
public PluginsAndModules info() {
return null;
Expand Down Expand Up @@ -187,14 +238,33 @@ public String executor() {
transportService.sendRequest(
extensionNode,
REQUEST_EXTENSION_ACTION_NAME,
new PluginRequest(extensionNode, new ArrayList<DiscoveryExtension>(extensionsSet)),
new PluginRequest(transportService.getLocalNode(), new ArrayList<DiscoveryExtension>(extensionsSet)),
pluginResponseHandler
);
} catch (Exception e) {
logger.error(e.toString());
}
}

TransportResponse handleExtensionRequest(ExtensionRequest extensionRequest) {
// Read enum
if (extensionRequest.getRequestType() == RequestType.REQUEST_EXTENSION_CLUSTER_STATE) {
ClusterStateResponse clusterStateResponse = new ClusterStateResponse(
clusterService.getClusterName(),
clusterService.state(),
false
);
return clusterStateResponse;
} else if (extensionRequest.getRequestType() == RequestType.REQUEST_EXTENSION_LOCAL_NODE) {
LocalNodeResponse localNodeResponse = new LocalNodeResponse(clusterService);
return localNodeResponse;
} else if (extensionRequest.getRequestType() == RequestType.REQUEST_EXTENSION_CLUSTER_SETTINGS) {
ClusterSettingsResponse clusterSettingsResponse = new ClusterSettingsResponse(clusterService);
return clusterSettingsResponse;
}
return null;
}

public void onIndexModule(IndexModule indexModule) throws UnknownHostException {
for (DiscoveryNode extensionNode : extensionsSet) {
onIndexModule(indexModule, extensionNode);
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,7 @@ protected Node(
* This seems like a chicken and egg problem.
*/
this.extensionsOrchestrator.setTransportService(transportService);
this.extensionsOrchestrator.setClusterService(clusterService);
final GatewayMetaState gatewayMetaState = new GatewayMetaState();
final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);
final SearchTransportService searchTransportService = new SearchTransportService(
Expand Down Expand Up @@ -1085,7 +1086,6 @@ public Node start() throws NodeValidationException {
assert transportService.getLocalNode().equals(localNodeFactory.getNode())
: "transportService has a different local node than the factory provided";
injector.getInstance(PeerRecoverySourceService.class).start();
extensionsOrchestrator.extensionsInitialize();

// Load (and maybe upgrade) the metadata stored on disk
final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class);
Expand Down Expand Up @@ -1130,6 +1130,7 @@ public Node start() throws NodeValidationException {
assert clusterService.localNode().equals(localNodeFactory.getNode())
: "clusterService has a different local node than the factory provided";
transportService.acceptIncomingRequests();
extensionsOrchestrator.extensionsInitialize();
discovery.startInitialJoin();
final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings());
configureNodeAndClusterIdStateListener(clusterService);
Expand Down