Skip to content

Commit

Permalink
[Extensions] Removing support for onIndexModule extension point (#7674)
Browse files Browse the repository at this point in the history
* Removing support for half baked IndexModule extension point

Signed-off-by: Sarat Vemulapalli <vemulapallisarat@gmail.com>

* Adding spotless changes

Signed-off-by: Sarat Vemulapalli <vemulapallisarat@gmail.com>

* Removing support for half baked IndexModule extension point

Signed-off-by: Sarat Vemulapalli <vemulapallisarat@gmail.com>

---------

Signed-off-by: Sarat Vemulapalli <vemulapallisarat@gmail.com>
  • Loading branch information
saratvemulapalli authored May 23, 2023
1 parent df57051 commit fb0ff06
Show file tree
Hide file tree
Showing 6 changed files with 0 additions and 196 deletions.
137 changes: 0 additions & 137 deletions server/src/main/java/org/opensearch/extensions/ExtensionsManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
Expand All @@ -37,7 +36,6 @@
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.ClusterSettingsResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.core.util.FileSystemUtils;
import org.opensearch.common.io.stream.StreamInput;
Expand All @@ -58,12 +56,6 @@
import org.opensearch.extensions.rest.RestActionsRequestHandler;
import org.opensearch.extensions.settings.CustomSettingsRequestHandler;
import org.opensearch.extensions.settings.RegisterCustomSettingsRequest;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndicesModuleRequest;
import org.opensearch.index.IndicesModuleResponse;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ConnectTransportException;
import org.opensearch.transport.TransportException;
Expand All @@ -80,8 +72,6 @@
*/
public class ExtensionsManager {
public static final String REQUEST_EXTENSION_ACTION_NAME = "internal:discovery/extensions";
public static final String INDICES_EXTENSION_POINT_ACTION_NAME = "indices:internal/extensions";
public static final String INDICES_EXTENSION_NAME_ACTION_NAME = "indices:internal/name";
public static final String REQUEST_EXTENSION_CLUSTER_STATE = "internal:discovery/clusterstate";
public static final String REQUEST_EXTENSION_CLUSTER_SETTINGS = "internal:discovery/clustersettings";
public static final String REQUEST_EXTENSION_ENVIRONMENT_SETTINGS = "internal:discovery/enviornmentsettings";
Expand Down Expand Up @@ -466,125 +456,6 @@ TransportResponse handleExtensionRequest(ExtensionRequest extensionRequest) thro
}
}

public void onIndexModule(IndexModule indexModule) throws UnknownHostException {
for (DiscoveryNode extensionNode : extensionIdMap.values()) {
onIndexModule(indexModule, extensionNode);
}
}

private void onIndexModule(IndexModule indexModule, DiscoveryNode extensionNode) throws UnknownHostException {
logger.info("onIndexModule index:" + indexModule.getIndex());
final CompletableFuture<IndicesModuleResponse> inProgressFuture = new CompletableFuture<>();
final CompletableFuture<AcknowledgedResponse> inProgressIndexNameFuture = new CompletableFuture<>();
final TransportResponseHandler<AcknowledgedResponse> acknowledgedResponseHandler = new TransportResponseHandler<
AcknowledgedResponse>() {
@Override
public void handleResponse(AcknowledgedResponse response) {
logger.info("ACK Response" + response);
inProgressIndexNameFuture.complete(response);
}

@Override
public void handleException(TransportException exp) {
inProgressIndexNameFuture.completeExceptionally(exp);
}

@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}

@Override
public AcknowledgedResponse read(StreamInput in) throws IOException {
return new AcknowledgedResponse(in);
}

};

final TransportResponseHandler<IndicesModuleResponse> indicesModuleResponseHandler = new TransportResponseHandler<
IndicesModuleResponse>() {

@Override
public IndicesModuleResponse read(StreamInput in) throws IOException {
return new IndicesModuleResponse(in);
}

@Override
public void handleResponse(IndicesModuleResponse response) {
logger.info("received {}", response);
if (response.getIndexEventListener() == true) {
indexModule.addIndexEventListener(new IndexEventListener() {
@Override
public void beforeIndexRemoved(
IndexService indexService,
IndicesClusterStateService.AllocatedIndices.IndexRemovalReason reason
) {
logger.info("Index Event Listener is called");
String indexName = indexService.index().getName();
logger.info("Index Name" + indexName.toString());
try {
logger.info("Sending extension request type: " + INDICES_EXTENSION_NAME_ACTION_NAME);
transportService.sendRequest(
extensionNode,
INDICES_EXTENSION_NAME_ACTION_NAME,
new IndicesModuleRequest(indexModule),
acknowledgedResponseHandler
);
inProgressIndexNameFuture.orTimeout(EXTENSION_REQUEST_WAIT_TIMEOUT, TimeUnit.SECONDS).join();
} catch (CompletionException e) {
if (e.getCause() instanceof TimeoutException) {
logger.info("No response from extension to request.");
}
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
} else {
throw new RuntimeException(e.getCause());
}
}
}
});
}
inProgressFuture.complete(response);
}

@Override
public void handleException(TransportException exp) {
logger.error(new ParameterizedMessage("IndicesModuleRequest failed"), exp);
inProgressFuture.completeExceptionally(exp);
}

@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
};

try {
logger.info("Sending extension request type: " + INDICES_EXTENSION_POINT_ACTION_NAME);
transportService.sendRequest(
extensionNode,
INDICES_EXTENSION_POINT_ACTION_NAME,
new IndicesModuleRequest(indexModule),
indicesModuleResponseHandler
);
inProgressFuture.orTimeout(EXTENSION_REQUEST_WAIT_TIMEOUT, TimeUnit.SECONDS).join();
logger.info("Received response from Extension");
} catch (CompletionException e) {
if (e.getCause() instanceof TimeoutException) {
logger.info("No response from extension to request.");
}
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
} else {
throw new RuntimeException(e.getCause());
}
}
}

private ExtensionsSettings readFromExtensionsYml(Path filePath) throws IOException {
Yaml yaml = new Yaml();
try (InputStream inputStream = Files.newInputStream(filePath)) {
Expand Down Expand Up @@ -655,14 +526,6 @@ static String getRequestExtensionActionName() {
return REQUEST_EXTENSION_ACTION_NAME;
}

static String getIndicesExtensionPointActionName() {
return INDICES_EXTENSION_POINT_ACTION_NAME;
}

static String getIndicesExtensionNameActionName() {
return INDICES_EXTENSION_NAME_ACTION_NAME;
}

static String getRequestExtensionClusterState() {
return REQUEST_EXTENSION_CLUSTER_STATE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.opensearch.extensions;

import java.io.IOException;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.util.Optional;

Expand All @@ -22,7 +21,6 @@
import org.opensearch.extensions.action.ExtensionActionRequest;
import org.opensearch.extensions.action.ExtensionActionResponse;
import org.opensearch.extensions.action.RemoteExtensionActionResponse;
import org.opensearch.index.IndexModule;
import org.opensearch.transport.TransportService;

/**
Expand Down Expand Up @@ -70,11 +68,6 @@ public void initialize() {
// no-op
}

@Override
public void onIndexModule(IndexModule indexModule) throws UnknownHostException {
// no-op
}

@Override
public Optional<DiscoveryExtensionNode> lookupInitializedExtensionById(final String extensionId) {
// no-op not found
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.node.Node;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.extensions.ExtensionsManager;
import org.opensearch.plugins.PluginsService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
Expand Down Expand Up @@ -289,7 +288,6 @@ public class IndicesService extends AbstractLifecycleComponent
*/
private final Settings settings;
private final PluginsService pluginsService;
private final ExtensionsManager extensionsManager;
private final NodeEnvironment nodeEnv;
private final NamedXContentRegistry xContentRegistry;
private final TimeValue shardsClosedTimeout;
Expand Down Expand Up @@ -342,7 +340,6 @@ protected void doStart() {
public IndicesService(
Settings settings,
PluginsService pluginsService,
ExtensionsManager extensionsManager,
NodeEnvironment nodeEnv,
NamedXContentRegistry xContentRegistry,
AnalysisRegistry analysisRegistry,
Expand All @@ -368,7 +365,6 @@ public IndicesService(
this.settings = settings;
this.threadPool = threadPool;
this.pluginsService = pluginsService;
this.extensionsManager = extensionsManager;
this.nodeEnv = nodeEnv;
this.xContentRegistry = xContentRegistry;
this.valuesSourceRegistry = valuesSourceRegistry;
Expand Down Expand Up @@ -810,7 +806,6 @@ private synchronized IndexService createIndexService(
indexModule.addIndexOperationListener(operationListener);
}
pluginsService.onIndexModule(indexModule);
extensionsManager.onIndexModule(indexModule);
for (IndexEventListener listener : builtInListeners) {
indexModule.addIndexEventListener(listener);
}
Expand Down
1 change: 0 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,6 @@ protected Node(
final IndicesService indicesService = new IndicesService(
settings,
pluginsService,
extensionsManager,
nodeEnvironment,
xContentRegistry,
analysisModule.getAnalysisRegistry(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.env.EnvironmentSettingsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.bytes.BytesReference;
Expand All @@ -65,29 +64,20 @@
import org.opensearch.common.settings.SettingsModule;
import org.opensearch.common.transport.TransportAddress;
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.env.Environment;
import org.opensearch.env.TestEnvironment;
import org.opensearch.extensions.proto.ExtensionRequestProto;
import org.opensearch.extensions.rest.RegisterRestActionsRequest;
import org.opensearch.extensions.settings.RegisterCustomSettingsRequest;
import org.opensearch.identity.IdentityService;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.analysis.AnalysisRegistry;
import org.opensearch.index.engine.EngineConfigFactory;
import org.opensearch.index.engine.InternalEngineFactory;
import org.opensearch.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.rest.RestController;
import org.opensearch.test.FeatureFlagSetter;
import org.opensearch.test.IndexSettingsModule;
import org.opensearch.test.MockLogAppender;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.client.NoOpNodeClient;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.NodeNotConnectedException;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportResponse;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -838,40 +828,6 @@ public void testRegisterHandler() throws Exception {

}

public void testOnIndexModule() throws Exception {
Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8);
ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir);
initialize(extensionsManager);

Environment environment = TestEnvironment.newEnvironment(settings);
AnalysisRegistry emptyAnalysisRegistry = new AnalysisRegistry(
environment,
emptyMap(),
emptyMap(),
emptyMap(),
emptyMap(),
emptyMap(),
emptyMap(),
emptyMap(),
emptyMap(),
emptyMap()
);

IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test_index", settings);
IndexModule indexModule = new IndexModule(
indexSettings,
emptyAnalysisRegistry,
new InternalEngineFactory(),
new EngineConfigFactory(indexSettings),
Collections.emptyMap(),
() -> true,
new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)),
Collections.emptyMap()
);
expectThrows(NodeNotConnectedException.class, () -> extensionsManager.onIndexModule(indexModule));

}

public void testIncompatibleExtensionRegistration() throws IOException, IllegalAccessException {

try (MockLogAppender mockLogAppender = MockLogAppender.createForLoggers(LogManager.getLogger(ExtensionsManager.class))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@
import org.opensearch.ingest.IngestService;
import org.opensearch.monitor.StatusInfo;
import org.opensearch.node.ResponseCollectorService;
import org.opensearch.extensions.ExtensionsManager;
import org.opensearch.plugins.PluginsService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
Expand Down Expand Up @@ -1807,7 +1806,6 @@ public void onFailure(final Exception e) {
indicesService = new IndicesService(
settings,
mock(PluginsService.class),
mock(ExtensionsManager.class),
nodeEnv,
namedXContentRegistry,
new AnalysisRegistry(
Expand Down

0 comments on commit fb0ff06

Please sign in to comment.