Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Extensions] Removing support for onIndexModule extension point #7674

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
Expand All @@ -37,7 +36,6 @@
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.ClusterSettingsResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.core.util.FileSystemUtils;
import org.opensearch.common.io.stream.StreamInput;
Expand All @@ -58,12 +56,6 @@
import org.opensearch.extensions.rest.RestActionsRequestHandler;
import org.opensearch.extensions.settings.CustomSettingsRequestHandler;
import org.opensearch.extensions.settings.RegisterCustomSettingsRequest;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndicesModuleRequest;
import org.opensearch.index.IndicesModuleResponse;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ConnectTransportException;
import org.opensearch.transport.TransportException;
Expand All @@ -80,8 +72,6 @@
*/
public class ExtensionsManager {
public static final String REQUEST_EXTENSION_ACTION_NAME = "internal:discovery/extensions";
public static final String INDICES_EXTENSION_POINT_ACTION_NAME = "indices:internal/extensions";
public static final String INDICES_EXTENSION_NAME_ACTION_NAME = "indices:internal/name";
public static final String REQUEST_EXTENSION_CLUSTER_STATE = "internal:discovery/clusterstate";
public static final String REQUEST_EXTENSION_CLUSTER_SETTINGS = "internal:discovery/clustersettings";
public static final String REQUEST_EXTENSION_ENVIRONMENT_SETTINGS = "internal:discovery/enviornmentsettings";
Expand Down Expand Up @@ -466,125 +456,6 @@ TransportResponse handleExtensionRequest(ExtensionRequest extensionRequest) thro
}
}

public void onIndexModule(IndexModule indexModule) throws UnknownHostException {
for (DiscoveryNode extensionNode : extensionIdMap.values()) {
onIndexModule(indexModule, extensionNode);
}
}

private void onIndexModule(IndexModule indexModule, DiscoveryNode extensionNode) throws UnknownHostException {
logger.info("onIndexModule index:" + indexModule.getIndex());
final CompletableFuture<IndicesModuleResponse> inProgressFuture = new CompletableFuture<>();
final CompletableFuture<AcknowledgedResponse> inProgressIndexNameFuture = new CompletableFuture<>();
final TransportResponseHandler<AcknowledgedResponse> acknowledgedResponseHandler = new TransportResponseHandler<
AcknowledgedResponse>() {
@Override
public void handleResponse(AcknowledgedResponse response) {
logger.info("ACK Response" + response);
inProgressIndexNameFuture.complete(response);
}

@Override
public void handleException(TransportException exp) {
inProgressIndexNameFuture.completeExceptionally(exp);
}

@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}

@Override
public AcknowledgedResponse read(StreamInput in) throws IOException {
return new AcknowledgedResponse(in);
}

};

final TransportResponseHandler<IndicesModuleResponse> indicesModuleResponseHandler = new TransportResponseHandler<
IndicesModuleResponse>() {

@Override
public IndicesModuleResponse read(StreamInput in) throws IOException {
return new IndicesModuleResponse(in);
}

@Override
public void handleResponse(IndicesModuleResponse response) {
logger.info("received {}", response);
if (response.getIndexEventListener() == true) {
indexModule.addIndexEventListener(new IndexEventListener() {
@Override
public void beforeIndexRemoved(
IndexService indexService,
IndicesClusterStateService.AllocatedIndices.IndexRemovalReason reason
) {
logger.info("Index Event Listener is called");
String indexName = indexService.index().getName();
logger.info("Index Name" + indexName.toString());
try {
logger.info("Sending extension request type: " + INDICES_EXTENSION_NAME_ACTION_NAME);
transportService.sendRequest(
extensionNode,
INDICES_EXTENSION_NAME_ACTION_NAME,
new IndicesModuleRequest(indexModule),
acknowledgedResponseHandler
);
inProgressIndexNameFuture.orTimeout(EXTENSION_REQUEST_WAIT_TIMEOUT, TimeUnit.SECONDS).join();
} catch (CompletionException e) {
if (e.getCause() instanceof TimeoutException) {
logger.info("No response from extension to request.");
}
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
} else {
throw new RuntimeException(e.getCause());
}
}
}
});
}
inProgressFuture.complete(response);
}

@Override
public void handleException(TransportException exp) {
logger.error(new ParameterizedMessage("IndicesModuleRequest failed"), exp);
inProgressFuture.completeExceptionally(exp);
}

@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
};

try {
logger.info("Sending extension request type: " + INDICES_EXTENSION_POINT_ACTION_NAME);
transportService.sendRequest(
extensionNode,
INDICES_EXTENSION_POINT_ACTION_NAME,
new IndicesModuleRequest(indexModule),
indicesModuleResponseHandler
);
inProgressFuture.orTimeout(EXTENSION_REQUEST_WAIT_TIMEOUT, TimeUnit.SECONDS).join();
logger.info("Received response from Extension");
} catch (CompletionException e) {
if (e.getCause() instanceof TimeoutException) {
logger.info("No response from extension to request.");
}
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
} else {
throw new RuntimeException(e.getCause());
}
}
}

private ExtensionsSettings readFromExtensionsYml(Path filePath) throws IOException {
Yaml yaml = new Yaml();
try (InputStream inputStream = Files.newInputStream(filePath)) {
Expand Down Expand Up @@ -655,14 +526,6 @@ static String getRequestExtensionActionName() {
return REQUEST_EXTENSION_ACTION_NAME;
}

static String getIndicesExtensionPointActionName() {
return INDICES_EXTENSION_POINT_ACTION_NAME;
}

static String getIndicesExtensionNameActionName() {
return INDICES_EXTENSION_NAME_ACTION_NAME;
}

static String getRequestExtensionClusterState() {
return REQUEST_EXTENSION_CLUSTER_STATE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.opensearch.extensions;

import java.io.IOException;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.util.Optional;

Expand All @@ -22,7 +21,6 @@
import org.opensearch.extensions.action.ExtensionActionRequest;
import org.opensearch.extensions.action.ExtensionActionResponse;
import org.opensearch.extensions.action.RemoteExtensionActionResponse;
import org.opensearch.index.IndexModule;
import org.opensearch.transport.TransportService;

/**
Expand Down Expand Up @@ -70,11 +68,6 @@ public void initialize() {
// no-op
}

@Override
public void onIndexModule(IndexModule indexModule) throws UnknownHostException {
// no-op
}

@Override
public Optional<DiscoveryExtensionNode> lookupInitializedExtensionById(final String extensionId) {
// no-op not found
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.node.Node;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.extensions.ExtensionsManager;
import org.opensearch.plugins.PluginsService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
Expand Down Expand Up @@ -289,7 +288,6 @@ public class IndicesService extends AbstractLifecycleComponent
*/
private final Settings settings;
private final PluginsService pluginsService;
private final ExtensionsManager extensionsManager;
private final NodeEnvironment nodeEnv;
private final NamedXContentRegistry xContentRegistry;
private final TimeValue shardsClosedTimeout;
Expand Down Expand Up @@ -342,7 +340,6 @@ protected void doStart() {
public IndicesService(
Settings settings,
PluginsService pluginsService,
ExtensionsManager extensionsManager,
NodeEnvironment nodeEnv,
NamedXContentRegistry xContentRegistry,
AnalysisRegistry analysisRegistry,
Expand All @@ -368,7 +365,6 @@ public IndicesService(
this.settings = settings;
this.threadPool = threadPool;
this.pluginsService = pluginsService;
this.extensionsManager = extensionsManager;
this.nodeEnv = nodeEnv;
this.xContentRegistry = xContentRegistry;
this.valuesSourceRegistry = valuesSourceRegistry;
Expand Down Expand Up @@ -810,7 +806,6 @@ private synchronized IndexService createIndexService(
indexModule.addIndexOperationListener(operationListener);
}
pluginsService.onIndexModule(indexModule);
extensionsManager.onIndexModule(indexModule);
for (IndexEventListener listener : builtInListeners) {
indexModule.addIndexEventListener(listener);
}
Expand Down
1 change: 0 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,6 @@ protected Node(
final IndicesService indicesService = new IndicesService(
settings,
pluginsService,
extensionsManager,
nodeEnvironment,
xContentRegistry,
analysisModule.getAnalysisRegistry(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.env.EnvironmentSettingsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.bytes.BytesReference;
Expand All @@ -65,29 +64,20 @@
import org.opensearch.common.settings.SettingsModule;
import org.opensearch.common.transport.TransportAddress;
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.env.Environment;
import org.opensearch.env.TestEnvironment;
import org.opensearch.extensions.proto.ExtensionRequestProto;
import org.opensearch.extensions.rest.RegisterRestActionsRequest;
import org.opensearch.extensions.settings.RegisterCustomSettingsRequest;
import org.opensearch.identity.IdentityService;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.analysis.AnalysisRegistry;
import org.opensearch.index.engine.EngineConfigFactory;
import org.opensearch.index.engine.InternalEngineFactory;
import org.opensearch.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.rest.RestController;
import org.opensearch.test.FeatureFlagSetter;
import org.opensearch.test.IndexSettingsModule;
import org.opensearch.test.MockLogAppender;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.client.NoOpNodeClient;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.NodeNotConnectedException;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportResponse;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -838,40 +828,6 @@ public void testRegisterHandler() throws Exception {

}

public void testOnIndexModule() throws Exception {
Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8);
ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir);
initialize(extensionsManager);

Environment environment = TestEnvironment.newEnvironment(settings);
AnalysisRegistry emptyAnalysisRegistry = new AnalysisRegistry(
environment,
emptyMap(),
emptyMap(),
emptyMap(),
emptyMap(),
emptyMap(),
emptyMap(),
emptyMap(),
emptyMap(),
emptyMap()
);

IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test_index", settings);
IndexModule indexModule = new IndexModule(
indexSettings,
emptyAnalysisRegistry,
new InternalEngineFactory(),
new EngineConfigFactory(indexSettings),
Collections.emptyMap(),
() -> true,
new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)),
Collections.emptyMap()
);
expectThrows(NodeNotConnectedException.class, () -> extensionsManager.onIndexModule(indexModule));

}

public void testIncompatibleExtensionRegistration() throws IOException, IllegalAccessException {

try (MockLogAppender mockLogAppender = MockLogAppender.createForLoggers(LogManager.getLogger(ExtensionsManager.class))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@
import org.opensearch.ingest.IngestService;
import org.opensearch.monitor.StatusInfo;
import org.opensearch.node.ResponseCollectorService;
import org.opensearch.extensions.ExtensionsManager;
import org.opensearch.plugins.PluginsService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
Expand Down Expand Up @@ -1807,7 +1806,6 @@ public void onFailure(final Exception e) {
indicesService = new IndicesService(
settings,
mock(PluginsService.class),
mock(ExtensionsManager.class),
nodeEnv,
namedXContentRegistry,
new AnalysisRegistry(
Expand Down