diff --git a/CHANGELOG.md b/CHANGELOG.md index 6138e6dd969b1..34ada66076ad1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Remote Store] Rate limiter for remote store low priority uploads ([#14374](https://github.com/opensearch-project/OpenSearch/pull/14374/)) - Apply the date histogram rewrite optimization to range aggregation ([#13865](https://github.com/opensearch-project/OpenSearch/pull/13865)) - [Writable Warm] Add composite directory implementation and integrate it with FileCache ([12782](https://github.com/opensearch-project/OpenSearch/pull/12782)) +- Add allowlist setting for ingest processors ([#14439](https://github.com/opensearch-project/OpenSearch/issues/14439)) ### Dependencies - Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442)) diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 233a8d732d178..f5bf43aa7aa7f 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -130,6 +130,7 @@ import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.store.IndicesStore; +import org.opensearch.ingest.IngestService; import org.opensearch.monitor.fs.FsHealthService; import org.opensearch.monitor.fs.FsService; import org.opensearch.monitor.jvm.JvmGcMonitorService; @@ -754,7 +755,8 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING, RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS, RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA, - SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING + SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING, + IngestService.PROCESSORS_ALLOWLIST_SETTING ) ) ); diff --git a/server/src/main/java/org/opensearch/ingest/IngestService.java b/server/src/main/java/org/opensearch/ingest/IngestService.java index 2281ccd4c0382..fb8b74bd29d05 100644 --- a/server/src/main/java/org/opensearch/ingest/IngestService.java +++ b/server/src/main/java/org/opensearch/ingest/IngestService.java @@ -59,9 +59,11 @@ import org.opensearch.cluster.service.ClusterManagerTaskKeys; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Nullable; import org.opensearch.common.collect.Tuple; import org.opensearch.common.metrics.OperationMetrics; import org.opensearch.common.regex.Regex; +import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.AbstractRunnable; @@ -93,6 +95,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.IntConsumer; import java.util.stream.Collectors; @@ -109,6 +112,13 @@ public class IngestService implements ClusterStateApplier, ReportingService> PROCESSORS_ALLOWLIST_SETTING = Setting.listSetting( + "ingest.processors.allowlist", + List.of(), + Function.identity(), + Setting.Property.NodeScope + ); + private final ClusterService clusterService; private final ScriptService scriptService; private final Map processorFactories; @@ -136,6 +146,12 @@ public IngestService( ) { this.clusterService = clusterService; this.scriptService = scriptService; + final Set allowlist; + if (PROCESSORS_ALLOWLIST_SETTING.exists(clusterService.getSettings())) { + allowlist = Set.copyOf(PROCESSORS_ALLOWLIST_SETTING.get(clusterService.getSettings())); + } else { + allowlist = null; + } this.processorFactories = processorFactories( ingestPlugins, new Processor.Parameters( @@ -149,7 +165,8 @@ public IngestService( client, threadPool.generic()::execute, indicesService - ) + ), + allowlist ); this.threadPool = threadPool; @@ -158,17 +175,40 @@ public IngestService( deletePipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_PIPELINE_KEY, true); } - private static Map processorFactories(List ingestPlugins, Processor.Parameters parameters) { + private static Map processorFactories( + List ingestPlugins, + Processor.Parameters parameters, + @Nullable Set allowlist + ) { + Set unknownAllowlistProcessors = (allowlist == null) ? new HashSet<>() : new HashSet<>(allowlist); Map processorFactories = new HashMap<>(); for (IngestPlugin ingestPlugin : ingestPlugins) { Map newProcessors = ingestPlugin.getProcessors(parameters); + unknownAllowlistProcessors.removeAll(newProcessors.keySet()); for (Map.Entry entry : newProcessors.entrySet()) { - if (processorFactories.put(entry.getKey(), entry.getValue()) != null) { - throw new IllegalArgumentException("Ingest processor [" + entry.getKey() + "] is already registered"); + if (allowlist == null || allowlist.contains(entry.getKey())) { + if (processorFactories.put(entry.getKey(), entry.getValue()) != null) { + throw new IllegalArgumentException("Ingest processor [" + entry.getKey() + "] is already registered"); + } + } else { + logger.info( + "Not registering ingest processor [{}] because it is not allowed per the [{}] setting", + entry.getKey(), + PROCESSORS_ALLOWLIST_SETTING.getKey() + ); } } } - return Collections.unmodifiableMap(processorFactories); + if (unknownAllowlistProcessors.isEmpty() == false) { + throw new IllegalArgumentException( + "Processor(s) [" + + unknownAllowlistProcessors + + "] were defined in [" + + PROCESSORS_ALLOWLIST_SETTING.getKey() + + "] but do not exist" + ); + } + return Map.copyOf(processorFactories); } public static boolean resolvePipelines( diff --git a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java index 684297c11c140..2ff22abaa8345 100644 --- a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java @@ -58,6 +58,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.SetOnce; import org.opensearch.common.metrics.OperationStats; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.xcontent.XContentType; @@ -118,7 +119,6 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.eq; -import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -135,7 +135,7 @@ public Map getProcessors(Processor.Parameters paramet }; private ThreadPool threadPool; - private BulkRequest mockBulkRequest; + private Settings settings = Settings.EMPTY; @Before public void setup() { @@ -143,57 +143,25 @@ public void setup() { ExecutorService executorService = OpenSearchExecutors.newDirectExecutorService(); when(threadPool.generic()).thenReturn(executorService); when(threadPool.executor(anyString())).thenReturn(executorService); - mockBulkRequest = mock(BulkRequest.class); - lenient().when(mockBulkRequest.batchSize()).thenReturn(1); } public void testIngestPlugin() { - Client client = mock(Client.class); - IngestService ingestService = new IngestService( - mock(ClusterService.class), - threadPool, - null, - null, - null, - Collections.singletonList(DUMMY_PLUGIN), - client, - mock(IndicesService.class) - ); + IngestService ingestService = createWithPlugins(List.of(DUMMY_PLUGIN)); Map factories = ingestService.getProcessorFactories(); assertTrue(factories.containsKey("foo")); assertEquals(1, factories.size()); } public void testIngestPluginDuplicate() { - Client client = mock(Client.class); IllegalArgumentException e = expectThrows( IllegalArgumentException.class, - () -> new IngestService( - mock(ClusterService.class), - threadPool, - null, - null, - null, - Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN), - client, - mock(IndicesService.class) - ) + () -> createWithPlugins(List.of(DUMMY_PLUGIN, DUMMY_PLUGIN)) ); assertTrue(e.getMessage(), e.getMessage().contains("already registered")); } public void testExecuteIndexPipelineDoesNotExist() { - Client client = mock(Client.class); - IngestService ingestService = new IngestService( - mock(ClusterService.class), - threadPool, - null, - null, - null, - Collections.singletonList(DUMMY_PLUGIN), - client, - mock(IndicesService.class) - ); + IngestService ingestService = createWithPlugins(List.of(DUMMY_PLUGIN)); final IndexRequest indexRequest = new IndexRequest("_index").id("_id") .source(emptyMap()) .setPipeline("_id") @@ -253,7 +221,7 @@ public void testInnerUpdatePipelines() { assertThat(ingestService.pipelines().size(), is(0)); PipelineConfiguration pipeline1 = new PipelineConfiguration("_id1", new BytesArray("{\"processors\": []}"), MediaTypeRegistry.JSON); - IngestMetadata ingestMetadata = new IngestMetadata(mapOf("_id1", pipeline1)); + IngestMetadata ingestMetadata = new IngestMetadata(Map.of("_id1", pipeline1)); ingestService.innerUpdatePipelines(ingestMetadata); assertThat(ingestService.pipelines().size(), is(1)); @@ -261,7 +229,7 @@ public void testInnerUpdatePipelines() { assertThat(ingestService.pipelines().get("_id1").pipeline.getProcessors().size(), equalTo(0)); PipelineConfiguration pipeline2 = new PipelineConfiguration("_id2", new BytesArray("{\"processors\": []}"), MediaTypeRegistry.JSON); - ingestMetadata = new IngestMetadata(mapOf("_id1", pipeline1, "_id2", pipeline2)); + ingestMetadata = new IngestMetadata(Map.of("_id1", pipeline1, "_id2", pipeline2)); ingestService.innerUpdatePipelines(ingestMetadata); assertThat(ingestService.pipelines().size(), is(2)); @@ -271,7 +239,7 @@ public void testInnerUpdatePipelines() { assertThat(ingestService.pipelines().get("_id2").pipeline.getProcessors().size(), equalTo(0)); PipelineConfiguration pipeline3 = new PipelineConfiguration("_id3", new BytesArray("{\"processors\": []}"), MediaTypeRegistry.JSON); - ingestMetadata = new IngestMetadata(mapOf("_id1", pipeline1, "_id2", pipeline2, "_id3", pipeline3)); + ingestMetadata = new IngestMetadata(Map.of("_id1", pipeline1, "_id2", pipeline2, "_id3", pipeline3)); ingestService.innerUpdatePipelines(ingestMetadata); assertThat(ingestService.pipelines().size(), is(3)); @@ -282,7 +250,7 @@ public void testInnerUpdatePipelines() { assertThat(ingestService.pipelines().get("_id3").pipeline.getId(), equalTo("_id3")); assertThat(ingestService.pipelines().get("_id3").pipeline.getProcessors().size(), equalTo(0)); - ingestMetadata = new IngestMetadata(mapOf("_id1", pipeline1, "_id3", pipeline3)); + ingestMetadata = new IngestMetadata(Map.of("_id1", pipeline1, "_id3", pipeline3)); ingestService.innerUpdatePipelines(ingestMetadata); assertThat(ingestService.pipelines().size(), is(2)); @@ -296,7 +264,7 @@ public void testInnerUpdatePipelines() { new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"), MediaTypeRegistry.JSON ); - ingestMetadata = new IngestMetadata(mapOf("_id1", pipeline1, "_id3", pipeline3)); + ingestMetadata = new IngestMetadata(Map.of("_id1", pipeline1, "_id3", pipeline3)); ingestService.innerUpdatePipelines(ingestMetadata); assertThat(ingestService.pipelines().size(), is(2)); @@ -312,25 +280,6 @@ public void testInnerUpdatePipelines() { assertThat(ingestService.pipelines(), sameInstance(pipelines)); } - private static Map mapOf(K key, V value) { - return Collections.singletonMap(key, value); - } - - private static Map mapOf(K key1, V value1, K key2, V value2) { - Map map = new HashMap<>(); - map.put(key1, value1); - map.put(key2, value2); - return map; - } - - private static Map mapOf(K key1, V value1, K key2, V value2, K key3, V value3) { - Map map = new HashMap<>(); - map.put(key1, value1); - map.put(key2, value2); - map.put(key3, value3); - return map; - } - public void testDelete() { IngestService ingestService = createWithProcessors(); PipelineConfiguration config = new PipelineConfiguration( @@ -853,7 +802,7 @@ public void testExecuteSuccess() { completionHandler, indexReq -> {}, Names.WRITE, - mockBulkRequest + new BulkRequest() ); verify(failureHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -885,7 +834,7 @@ public void testExecuteEmptyPipeline() throws Exception { completionHandler, indexReq -> {}, Names.WRITE, - mockBulkRequest + new BulkRequest() ); verify(failureHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -945,7 +894,7 @@ public void testExecutePropagateAllMetadataUpdates() throws Exception { completionHandler, indexReq -> {}, Names.WRITE, - mockBulkRequest + new BulkRequest() ); verify(processor).execute(any(), any()); verify(failureHandler, never()).accept(any(), any()); @@ -990,7 +939,7 @@ public void testExecuteFailure() throws Exception { completionHandler, indexReq -> {}, Names.WRITE, - mockBulkRequest + new BulkRequest() ); verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any()); verify(failureHandler, times(1)).accept(eq(0), any(RuntimeException.class)); @@ -1049,7 +998,7 @@ public void testExecuteSuccessWithOnFailure() throws Exception { completionHandler, indexReq -> {}, Names.WRITE, - mockBulkRequest + new BulkRequest() ); verify(failureHandler, never()).accept(eq(0), any(IngestProcessorException.class)); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -1099,7 +1048,7 @@ public void testExecuteFailureWithNestedOnFailure() throws Exception { completionHandler, indexReq -> {}, Names.WRITE, - mockBulkRequest + new BulkRequest() ); verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any()); verify(failureHandler, times(1)).accept(eq(0), any(RuntimeException.class)); @@ -1290,7 +1239,7 @@ public void testStats() throws Exception { completionHandler, indexReq -> {}, Names.WRITE, - mockBulkRequest + new BulkRequest() ); final IngestStats afterFirstRequestStats = ingestService.stats(); assertThat(afterFirstRequestStats.getPipelineStats().size(), equalTo(2)); @@ -1315,7 +1264,7 @@ public void testStats() throws Exception { completionHandler, indexReq -> {}, Names.WRITE, - mockBulkRequest + new BulkRequest() ); final IngestStats afterSecondRequestStats = ingestService.stats(); assertThat(afterSecondRequestStats.getPipelineStats().size(), equalTo(2)); @@ -1345,7 +1294,7 @@ public void testStats() throws Exception { completionHandler, indexReq -> {}, Names.WRITE, - mockBulkRequest + new BulkRequest() ); final IngestStats afterThirdRequestStats = ingestService.stats(); assertThat(afterThirdRequestStats.getPipelineStats().size(), equalTo(2)); @@ -1379,7 +1328,7 @@ public void testStats() throws Exception { completionHandler, indexReq -> {}, Names.WRITE, - mockBulkRequest + new BulkRequest() ); final IngestStats afterForthRequestStats = ingestService.stats(); assertThat(afterForthRequestStats.getPipelineStats().size(), equalTo(2)); @@ -1503,17 +1452,7 @@ public Map getProcessors(Processor.Parameters paramet }; // Create ingest service: - Client client = mock(Client.class); - IngestService ingestService = new IngestService( - mock(ClusterService.class), - threadPool, - null, - null, - null, - Arrays.asList(testPlugin), - client, - mock(IndicesService.class) - ); + IngestService ingestService = createWithPlugins(List.of(testPlugin)); ingestService.addIngestClusterStateListener(ingestClusterStateListener); // Create pipeline and apply the resulting cluster state, which should update the counter in the right order: @@ -1566,7 +1505,7 @@ public void testCBORParsing() throws Exception { (thread, e) -> {}, indexReq -> {}, Names.WRITE, - mockBulkRequest + new BulkRequest() ); } @@ -1970,6 +1909,37 @@ public void testPrepareBatches_different_index_pipeline() { assertEquals(4, batches.size()); } + public void testAllowlist() { + runAllowlistTest(List.of()); + runAllowlistTest(List.of("one")); + runAllowlistTest(List.of("two")); + runAllowlistTest(List.of("three", "one")); + runAllowlistTest(List.of("one", "two", "three")); + } + + private void runAllowlistTest(List allowlist) { + this.settings = Settings.builder().putList(IngestService.PROCESSORS_ALLOWLIST_SETTING.getKey(), allowlist).build(); + final IngestService ingestService = createWithProcessors(threeFakeProcessorFactories()); + assertEquals(Set.copyOf(allowlist), ingestService.getProcessorFactories().keySet()); + } + + public void testAllowlistNotSpecified() { + final Settings.Builder builder = Settings.builder(); + builder.remove(IngestService.PROCESSORS_ALLOWLIST_SETTING.getKey()); + this.settings = builder.build(); + final IngestService ingestService = createWithProcessors(threeFakeProcessorFactories()); + assertEquals(Set.of("one", "two", "three"), ingestService.getProcessorFactories().keySet()); + } + + public void testAllowlistHasNonexistentProcessors() { + this.settings = Settings.builder().putList(IngestService.PROCESSORS_ALLOWLIST_SETTING.getKey(), List.of("threeve")).build(); + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> createWithProcessors(threeFakeProcessorFactories()) + ); + assertTrue(e.getMessage(), e.getMessage().contains("threeve")); + } + private IngestService.IndexRequestWrapper createIndexRequestWrapper(String index, List pipelines) { IndexRequest indexRequest = new IndexRequest(index); return new IngestService.IndexRequestWrapper(0, indexRequest, pipelines, true); @@ -1983,7 +1953,7 @@ private IngestDocument eqIndexTypeId(final Long version, final VersionType versi return argThat(new IngestDocumentMatcher("_index", "_type", "_id", version, versionType, source)); } - private static IngestService createWithProcessors() { + private IngestService createWithProcessors() { Map processors = new HashMap<>(); processors.put("set", (factories, tag, description, config) -> { String field = (String) config.remove("field"); @@ -1998,19 +1968,38 @@ private static IngestService createWithProcessors() { return createWithProcessors(processors); } - private static IngestService createWithProcessors(Map processors) { - - Client client = mock(Client.class); - ThreadPool threadPool = mock(ThreadPool.class); - ExecutorService executorService = OpenSearchExecutors.newDirectExecutorService(); - when(threadPool.generic()).thenReturn(executorService); - when(threadPool.executor(anyString())).thenReturn(executorService); - return new IngestService(mock(ClusterService.class), threadPool, null, null, null, Collections.singletonList(new IngestPlugin() { + private IngestService createWithProcessors(Map processors) { + return createWithPlugins(List.of(new IngestPlugin() { @Override public Map getProcessors(final Processor.Parameters parameters) { return processors; } - }), client, mock(IndicesService.class)); + })); + } + + private IngestService createWithPlugins(List plugins) { + return new IngestService( + createClusterService(), + threadPool, + null, + null, + null, + plugins, + mock(Client.class), + mock(IndicesService.class) + ); + } + + private ClusterService createClusterService() { + return new ClusterService(settings, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool); + } + + private static Map threeFakeProcessorFactories() { + return Map.ofEntries( + Map.entry("one", (f, t, d, c) -> new FakeProcessor("one", t, d, (i) -> {})), + Map.entry("two", (f, t, d, c) -> new FakeProcessor("two", t, d, (i) -> {})), + Map.entry("three", (f, t, d, c) -> new FakeProcessor("three", t, d, (i) -> {})) + ); } private CompoundProcessor mockCompoundProcessor() {