diff --git a/docs/changelog/86757.yaml b/docs/changelog/86757.yaml new file mode 100644 index 0000000000000..1f7bebd3dc2ff --- /dev/null +++ b/docs/changelog/86757.yaml @@ -0,0 +1,5 @@ +pr: 86757 +summary: "Prefer secondary auth headers for transforms" +area: Transform +type: enhancement +issues: [] diff --git a/docs/reference/transform/apis/preview-transform.asciidoc b/docs/reference/transform/apis/preview-transform.asciidoc index 241143877c674..9758270319dea 100644 --- a/docs/reference/transform/apis/preview-transform.asciidoc +++ b/docs/reference/transform/apis/preview-transform.asciidoc @@ -27,10 +27,15 @@ Previews a {transform}. Requires the following privileges: -* cluster: `manage_transform` (the `transform_admin` built-in role grants this +* cluster: `manage_transform` (the `transform_admin` built-in role grants this privilege) * source indices: `read`, `view_index_metadata`. - ++ +-- +NOTE: If you provide +<>, those +credentials are used. +-- [[preview-transform-desc]] == {api-description-title} diff --git a/docs/reference/transform/apis/put-transform.asciidoc b/docs/reference/transform/apis/put-transform.asciidoc index 5c6ad6ecb0e26..a2d5ea5317486 100644 --- a/docs/reference/transform/apis/put-transform.asciidoc +++ b/docs/reference/transform/apis/put-transform.asciidoc @@ -24,6 +24,12 @@ Requires the following privileges: * source indices: `read`, `view_index_metadata` * destination index: `read`, `create_index`, `index`. If a `retention_policy` is configured, the `delete` privilege is also required. ++ +-- +NOTE: If you provide +<>, those +credentials are used. +-- [[put-transform-desc]] == {api-description-title} diff --git a/docs/reference/transform/apis/update-transform.asciidoc b/docs/reference/transform/apis/update-transform.asciidoc index 7b386fe0ed48c..4c8143c8a9f97 100644 --- a/docs/reference/transform/apis/update-transform.asciidoc +++ b/docs/reference/transform/apis/update-transform.asciidoc @@ -44,7 +44,9 @@ each checkpoint. * When {es} {security-features} are enabled, your {transform} remembers which roles the user who updated it had at the time of update and runs with those -privileges. +privileges. If you provide +<>, those +credentials are used instead. * You must use {kib} or this API to update a {transform}. Do not update a {transform} directly via `.transform-internal*` indices using the {es} index API. If {es} {security-features} are enabled, do not give users any privileges on diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java index 14d9fa964138c..f10b069493315 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java @@ -42,12 +42,14 @@ public class TransformPivotRestIT extends TransformRestTestCase { + private static final String TEST_USER_NAME_NO_ACCESS = "no_authorization"; private static final String TEST_USER_NAME = "transform_admin_plus_data"; private static final String DATA_ACCESS_ROLE = "test_data_access"; private static final String BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS = basicAuthHeaderValue( TEST_USER_NAME, TEST_PASSWORD_SECURE_STRING ); + private static final String BASIC_AUTH_VALUE_NO_ACCESS = basicAuthHeaderValue(TEST_USER_NAME_NO_ACCESS, TEST_PASSWORD_SECURE_STRING); private static boolean indicesCreated = false; @@ -96,6 +98,34 @@ public void testSimplePivot() throws Exception { assertOneCount(transformIndex + "/_search?q=reviewer:user_26", "hits.hits._source.affiliate_missing", 0); } + public void testSimplePivotWithSecondaryHeaders() throws Exception { + setupUser(TEST_USER_NAME_NO_ACCESS, List.of("transform_admin")); + String transformId = "simple-pivot"; + String transformIndex = "pivot_reviews"; + setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex); + createPivotReviewsTransform( + transformId, + transformIndex, + null, + null, + BASIC_AUTH_VALUE_NO_ACCESS, + BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS, + REVIEWS_INDEX_NAME + ); + startAndWaitForTransform( + transformId, + transformIndex, + BASIC_AUTH_VALUE_NO_ACCESS, + BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS, + new String[0] + ); + + // we expect 27 documents as there shall be 27 user_id's + // Just need to validate that things ran with secondary headers + Map indexStats = getAsMap(transformIndex + "/_stats"); + assertEquals(27, XContentMapValues.extractValue("_all.total.docs.count", indexStats)); + } + public void testSimpleDataStreamPivot() throws Exception { String indexName = "reviews_data_stream"; createReviewsIndex(indexName, 1000, 27, "date", true, -1, null); diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java index 1190ea420abd9..191120bf208c6 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java @@ -43,6 +43,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase { protected static final String TEST_PASSWORD = "x-pack-test-password"; + private static final String SECONDARY_AUTH_KEY = "es-secondary-authorization"; protected static final SecureString TEST_PASSWORD_SECURE_STRING = new SecureString(TEST_PASSWORD.toCharArray()); private static final String BASIC_AUTH_VALUE_SUPER_USER = basicAuthHeaderValue("x_pack_rest_user", TEST_PASSWORD_SECURE_STRING); @@ -267,7 +268,7 @@ protected void createContinuousPivotReviewsTransform(String transformId, String } }""".formatted(transformIndex, REVIEWS_INDEX_NAME); - createReviewsTransform(transformId, authHeader, config); + createReviewsTransform(transformId, authHeader, null, config); } protected void createPivotReviewsTransform( @@ -277,6 +278,18 @@ protected void createPivotReviewsTransform( String pipeline, String authHeader, String sourceIndex + ) throws IOException { + createPivotReviewsTransform(transformId, transformIndex, query, pipeline, authHeader, null, sourceIndex); + } + + protected void createPivotReviewsTransform( + String transformId, + String transformIndex, + String query, + String pipeline, + String authHeader, + String secondaryAuthHeader, + String sourceIndex ) throws IOException { String config = "{"; @@ -326,7 +339,7 @@ protected void createPivotReviewsTransform( "frequency": "1s" }"""; - createReviewsTransform(transformId, authHeader, config); + createReviewsTransform(transformId, authHeader, secondaryAuthHeader, config); } protected void createLatestReviewsTransform(String transformId, String transformIndex) throws IOException { @@ -347,11 +360,17 @@ protected void createLatestReviewsTransform(String transformId, String transform "frequency": "1s" }""".formatted(transformIndex, REVIEWS_INDEX_NAME); - createReviewsTransform(transformId, null, config); + createReviewsTransform(transformId, null, null, config); } - private void createReviewsTransform(String transformId, String authHeader, String config) throws IOException { - final Request createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, authHeader); + private void createReviewsTransform(String transformId, String authHeader, String secondaryAuthHeader, String config) + throws IOException { + final Request createTransformRequest = createRequestWithSecondaryAuth( + "PUT", + getTransformEndpoint() + transformId, + authHeader, + secondaryAuthHeader + ); createTransformRequest.setJsonEntity(config); Map createTransformResponse = entityAsMap(client().performRequest(createTransformRequest)); @@ -360,7 +379,7 @@ private void createReviewsTransform(String transformId, String authHeader, Strin protected void createPivotReviewsTransform(String transformId, String transformIndex, String query, String pipeline, String authHeader) throws IOException { - createPivotReviewsTransform(transformId, transformIndex, query, pipeline, authHeader, REVIEWS_INDEX_NAME); + createPivotReviewsTransform(transformId, transformIndex, query, pipeline, authHeader, null, REVIEWS_INDEX_NAME); } protected void startTransform(String transformId) throws IOException { @@ -369,7 +388,18 @@ protected void startTransform(String transformId) throws IOException { protected void startTransform(String transformId, String authHeader, String... warnings) throws IOException { // start the transform - final Request startTransformRequest = createRequestWithAuth("POST", getTransformEndpoint() + transformId + "/_start", authHeader); + startTransform(transformId, authHeader, null, warnings); + } + + protected void startTransform(String transformId, String authHeader, String secondaryAuthHeader, String... warnings) + throws IOException { + // start the transform + final Request startTransformRequest = createRequestWithSecondaryAuth( + "POST", + getTransformEndpoint() + transformId + "/_start", + authHeader, + secondaryAuthHeader + ); if (warnings.length > 0) { startTransformRequest.setOptions(expectWarnings(warnings)); } @@ -404,8 +434,18 @@ protected void startAndWaitForTransform(String transformId, String transformInde protected void startAndWaitForTransform(String transformId, String transformIndex, String authHeader, String... warnings) throws Exception { + startAndWaitForTransform(transformId, transformIndex, authHeader, null, warnings); + } + + protected void startAndWaitForTransform( + String transformId, + String transformIndex, + String authHeader, + String secondaryAuthHeader, + String... warnings + ) throws Exception { // start the transform - startTransform(transformId, authHeader, warnings); + startTransform(transformId, authHeader, secondaryAuthHeader, warnings); assertTrue(indexExists(transformIndex)); // wait until the transform has been created and all data is available waitForTransformCheckpoint(transformId); @@ -435,18 +475,29 @@ protected void resetTransform(String transformId, boolean force) throws IOExcept assertThat(resetTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); } - protected Request createRequestWithAuth(final String method, final String endpoint, final String authHeader) { + protected Request createRequestWithSecondaryAuth( + final String method, + final String endpoint, + final String authHeader, + final String secondaryAuthHeader + ) { final Request request = new Request(method, endpoint); + RequestOptions.Builder options = request.getOptions().toBuilder(); if (authHeader != null) { - RequestOptions.Builder options = request.getOptions().toBuilder(); options.addHeader("Authorization", authHeader); - request.setOptions(options); } - + if (secondaryAuthHeader != null) { + options.addHeader(SECONDARY_AUTH_KEY, secondaryAuthHeader); + } + request.setOptions(options); return request; } + protected Request createRequestWithAuth(final String method, final String endpoint, final String authHeader) { + return createRequestWithSecondaryAuth(method, endpoint, authHeader, null); + } + void waitForTransformStopped(String transformId) throws Exception { assertBusy(() -> { assertEquals("stopped", getTransformState(transformId)); }, 15, TimeUnit.SECONDS); } diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUpdateIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUpdateIT.java index 88d81c52dc382..a89bb94489b35 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUpdateIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUpdateIT.java @@ -38,15 +38,18 @@ public class TransformUpdateIT extends TransformRestTestCase { TEST_ADMIN_USER_NAME_2, TEST_PASSWORD_SECURE_STRING ); + private static final String TEST_ADMIN_USER_NAME_NO_DATA = "transform_admin_no_data"; + private static final String BASIC_AUTH_VALUE_TRANSFORM_ADMIN_NO_DATA = basicAuthHeaderValue( + TEST_ADMIN_USER_NAME_NO_DATA, + TEST_PASSWORD_SECURE_STRING + ); private static final String DATA_ACCESS_ROLE = "test_data_access"; private static final String DATA_ACCESS_ROLE_2 = "test_data_access_2"; - private static boolean indicesCreated = false; - // preserve indices in order to reuse source indices in several test cases @Override protected boolean preserveIndicesUponCompletion() { - return true; + return false; } @Override @@ -70,14 +73,8 @@ public void createIndexes() throws IOException { setupUser(TEST_USER_NAME, Arrays.asList("transform_user", DATA_ACCESS_ROLE)); setupUser(TEST_ADMIN_USER_NAME_1, Arrays.asList("transform_admin", DATA_ACCESS_ROLE)); setupUser(TEST_ADMIN_USER_NAME_2, Arrays.asList("transform_admin", DATA_ACCESS_ROLE_2)); - - // it's not possible to run it as @BeforeClass as clients aren't initialized then, so we need this little hack - if (indicesCreated) { - return; - } - + setupUser(TEST_ADMIN_USER_NAME_NO_DATA, List.of("transform_admin")); createReviewsIndex(); - indicesCreated = true; } @SuppressWarnings("unchecked") @@ -149,8 +146,15 @@ public void testUpdateDeprecatedSettings() throws Exception { assertThat(XContentMapValues.extractValue("settings.max_page_search_size", transform), equalTo(555)); } - @SuppressWarnings("unchecked") public void testUpdateTransferRights() throws Exception { + updateTransferRightsTester(false); + } + + public void testUpdateTransferRightsSecondaryAuthHeaders() throws Exception { + updateTransferRightsTester(true); + } + + private void updateTransferRightsTester(boolean useSecondaryAuthHeaders) throws Exception { String transformId = "transform1"; // Note: Due to a bug the transform does not fail to start after deleting the user and role, therefore invalidating // the credentials stored with the config. As a workaround we use a 2nd transform that uses the same config @@ -160,17 +164,23 @@ public void testUpdateTransferRights() throws Exception { setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest); setupDataAccessRole(DATA_ACCESS_ROLE_2, REVIEWS_INDEX_NAME, transformDest); - final Request createTransformRequest = createRequestWithAuth( - "PUT", - getTransformEndpoint() + transformId, - BASIC_AUTH_VALUE_TRANSFORM_ADMIN_2 - ); - - final Request createTransformRequest_2 = createRequestWithAuth( - "PUT", - getTransformEndpoint() + transformIdCloned, - BASIC_AUTH_VALUE_TRANSFORM_ADMIN_2 - ); + final Request createTransformRequest = useSecondaryAuthHeaders + ? createRequestWithSecondaryAuth( + "PUT", + getTransformEndpoint() + transformId, + BASIC_AUTH_VALUE_TRANSFORM_ADMIN_NO_DATA, + BASIC_AUTH_VALUE_TRANSFORM_ADMIN_2 + ) + : createRequestWithAuth("PUT", getTransformEndpoint() + transformId, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_2); + + final Request createTransformRequest_2 = useSecondaryAuthHeaders + ? createRequestWithSecondaryAuth( + "PUT", + getTransformEndpoint() + transformIdCloned, + BASIC_AUTH_VALUE_TRANSFORM_ADMIN_NO_DATA, + BASIC_AUTH_VALUE_TRANSFORM_ADMIN_2 + ) + : createRequestWithAuth("PUT", getTransformEndpoint() + transformIdCloned, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_2); String config = """ { @@ -229,24 +239,37 @@ public void testUpdateTransferRights() throws Exception { assertEquals(1, XContentMapValues.extractValue("count", transforms)); // start using admin 1, but as the header is still admin 2 + // This fails as the stored header is still admin 2 try { - startAndWaitForTransform(transformId, transformDest, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1); + if (useSecondaryAuthHeaders) { + startAndWaitForTransform( + transformId, + transformDest, + BASIC_AUTH_VALUE_TRANSFORM_ADMIN_NO_DATA, + BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1, + new String[0] + ); + } else { + startAndWaitForTransform(transformId, transformDest, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1); + } fail("request should have failed"); } catch (ResponseException e) { assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(500)); } - assertBusy(() -> { Map transformStatsAsMap = getTransformStateAndStats(transformId); assertThat(XContentMapValues.extractValue("stats.documents_indexed", transformStatsAsMap), equalTo(0)); }, 3, TimeUnit.SECONDS); // update the transform with an empty body, the credentials (headers) should change - final Request updateRequest = createRequestWithAuth( - "POST", - getTransformEndpoint() + transformIdCloned + "/_update", - BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1 - ); + final Request updateRequest = useSecondaryAuthHeaders + ? createRequestWithSecondaryAuth( + "POST", + getTransformEndpoint() + transformIdCloned + "/_update", + BASIC_AUTH_VALUE_TRANSFORM_ADMIN_NO_DATA, + BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1 + ) + : createRequestWithAuth("POST", getTransformEndpoint() + transformIdCloned + "/_update", BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1); updateRequest.setJsonEntity("{}"); assertOK(client().performRequest(updateRequest)); @@ -256,8 +279,17 @@ public void testUpdateTransferRights() throws Exception { assertEquals(1, XContentMapValues.extractValue("count", transforms)); // start with updated configuration should succeed - startAndWaitForTransform(transformIdCloned, transformDest, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1); - + if (useSecondaryAuthHeaders) { + startAndWaitForTransform( + transformIdCloned, + transformDest, + BASIC_AUTH_VALUE_TRANSFORM_ADMIN_NO_DATA, + BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1, + new String[0] + ); + } else { + startAndWaitForTransform(transformIdCloned, transformDest, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1); + } assertBusy(() -> { Map transformStatsAsMap = getTransformStateAndStats(transformIdCloned); assertThat(XContentMapValues.extractValue("stats.documents_indexed", transformStatsAsMap), equalTo(27)); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformPrivilegeChecker.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformPrivilegeChecker.java index fbc5681d0b96c..07ef743966722 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformPrivilegeChecker.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformPrivilegeChecker.java @@ -24,9 +24,11 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import static java.util.stream.Collectors.joining; import static java.util.stream.Collectors.toList; +import static org.elasticsearch.xpack.transform.utils.SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable; /** * {@link TransformPrivilegeChecker} is responsible for checking whether the user has the right privileges in order to work with transform. @@ -43,21 +45,23 @@ static void checkPrivileges( boolean checkDestIndexPrivileges, ActionListener listener ) { - final String username = securityContext.getUser().principal(); + useSecondaryAuthIfAvailable(securityContext, () -> { + final String username = securityContext.getUser().principal(); - ActionListener hasPrivilegesResponseListener = ActionListener.wrap( - response -> handlePrivilegesResponse(operationName, username, config.getId(), response, listener), - listener::onFailure - ); + ActionListener hasPrivilegesResponseListener = ActionListener.wrap( + response -> handlePrivilegesResponse(operationName, username, config.getId(), response, listener), + listener::onFailure + ); - HasPrivilegesRequest hasPrivilegesRequest = buildPrivilegesRequest( - config, - indexNameExpressionResolver, - clusterState, - username, - checkDestIndexPrivileges - ); - client.execute(HasPrivilegesAction.INSTANCE, hasPrivilegesRequest, hasPrivilegesResponseListener); + HasPrivilegesRequest hasPrivilegesRequest = buildPrivilegesRequest( + config, + indexNameExpressionResolver, + clusterState, + username, + checkDestIndexPrivileges + ); + client.execute(HasPrivilegesAction.INSTANCE, hasPrivilegesRequest, hasPrivilegesResponseListener); + }); } private static HasPrivilegesRequest buildPrivilegesRequest( @@ -128,7 +132,7 @@ private static void handlePrivilegesResponse( .entrySet() .stream() .filter(e -> Boolean.TRUE.equals(e.getValue()) == false) - .map(e -> e.getKey()) + .map(Map.Entry::getKey) .collect(joining(", ", indexPrivileges.getResource() + ":[", "]")) ) .collect(toList()); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java index ea294a6cd65ca..01b256235b9e8 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java @@ -29,7 +29,6 @@ import org.elasticsearch.ingest.IngestService; import org.elasticsearch.license.License; import org.elasticsearch.license.RemoteClusterLicenseChecker; -import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -64,11 +63,11 @@ import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.xpack.core.transform.action.PreviewTransformAction.DUMMY_DEST_INDEX_FOR_PREVIEW; +import static org.elasticsearch.xpack.transform.utils.SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable; public class TransportPreviewTransformAction extends HandledTransportAction { private static final int NUMBER_OF_PREVIEW_BUCKETS = 100; - private final XPackLicenseState licenseState; private final SecurityContext securityContext; private final IndexNameExpressionResolver indexNameExpressionResolver; private final Client client; @@ -80,7 +79,6 @@ public class TransportPreviewTransformAction extends HandledTransportAction li final Function function = FunctionFactory.create(config); // <4> Validate transform query - ActionListener validateConfigListener = ActionListener.wrap(validateConfigResponse -> { - getPreview( - config.getId(), // note: @link{PreviewTransformAction} sets an id, so this is never null - function, - config.getSource(), - config.getDestination().getPipeline(), - config.getDestination().getIndex(), - config.getSyncConfig(), - listener - ); - }, listener::onFailure); + ActionListener validateConfigListener = ActionListener.wrap( + validateConfigResponse -> useSecondaryAuthIfAvailable( + securityContext, + () -> getPreview( + config.getId(), // note: @link{PreviewTransformAction} sets an id, so this is never null + function, + config.getSource(), + config.getDestination().getPipeline(), + config.getDestination().getIndex(), + config.getSyncConfig(), + listener + ) + ), + listener::onFailure + ); // <3> Validate transform function config ActionListener validateSourceDestListener = ActionListener.wrap( - validateSourceDestResponse -> { function.validateConfig(validateConfigListener); }, + validateSourceDestResponse -> function.validateConfig(validateConfigListener), listener::onFailure ); // <2> Validate source and destination indices - ActionListener checkPrivilegesListener = ActionListener.wrap(aVoid -> { - sourceDestValidator.validate( + ActionListener checkPrivilegesListener = ActionListener.wrap( + aVoid -> sourceDestValidator.validate( clusterState, config.getSource().getIndex(), config.getDestination().getIndex(), config.getDestination().getPipeline(), SourceDestValidations.getValidationsForPreview(config.getAdditionalSourceDestValidations()), validateSourceDestListener - ); - }, listener::onFailure); + ), + listener::onFailure + ); // <1> Early check to verify that the user can create the destination index and can read from the source if (XPackSettings.SECURITY_ENABLED.get(nodeSettings)) { @@ -228,7 +230,7 @@ private void getPreview( ); List warnings = TransformConfigLinter.getWarnings(function, source, syncConfig); - warnings.forEach(warning -> HeaderWarning.addWarning(warning)); + warnings.forEach(HeaderWarning::addWarning); listener.onResponse(new Response(docs, generatedDestIndexSettings)); }, listener::onFailure); @@ -240,7 +242,7 @@ private void getPreview( Clock.systemUTC() ); List warnings = TransformConfigLinter.getWarnings(function, source, syncConfig); - warnings.forEach(warning -> HeaderWarning.addWarning(warning)); + warnings.forEach(HeaderWarning::addWarning); listener.onResponse(new Response(docs, generatedDestIndexSettings)); } else { List> results = docs.stream().map(doc -> { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java index 2f79d118a48a5..06374919733ea 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java @@ -24,7 +24,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.ingest.IngestService; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -48,6 +47,8 @@ import java.util.List; import java.util.Map; +import static org.elasticsearch.xpack.transform.utils.SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable; + public class TransportPutTransformAction extends AcknowledgedTransportMasterNodeAction { private static final Logger logger = LogManager.getLogger(TransportPutTransformAction.class); @@ -67,8 +68,7 @@ public TransportPutTransformAction( IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService, TransformServices transformServices, - Client client, - IngestService ingestService + Client client ) { super( PutTransformAction.NAME, @@ -92,54 +92,61 @@ public TransportPutTransformAction( @Override protected void masterOperation(Task task, Request request, ClusterState clusterState, ActionListener listener) { XPackPlugin.checkReadyForXPackCustomMetadata(clusterState); - - // set headers to run transform as calling user - Map filteredHeaders = ClientHelper.getPersistableSafeSecurityHeaders( - threadPool.getThreadContext(), - clusterService.state() - ); - - TransformConfig config = request.getConfig().setHeaders(filteredHeaders).setCreateTime(Instant.now()).setVersion(Version.CURRENT); - - String transformId = config.getId(); - // quick check whether a transform has already been created under that name - if (PersistentTasksCustomMetadata.getTaskWithId(clusterState, transformId) != null) { - listener.onFailure( - new ResourceAlreadyExistsException(TransformMessages.getMessage(TransformMessages.REST_PUT_TRANSFORM_EXISTS, transformId)) + useSecondaryAuthIfAvailable(securityContext, () -> { + // set headers to run transform as calling user + Map filteredHeaders = ClientHelper.getPersistableSafeSecurityHeaders( + threadPool.getThreadContext(), + clusterService.state() ); - return; - } - // <3> Create the transform - ActionListener validateTransformListener = ActionListener.wrap( - validationResponse -> { putTransform(request, listener); }, - listener::onFailure - ); + TransformConfig config = request.getConfig() + .setHeaders(filteredHeaders) + .setCreateTime(Instant.now()) + .setVersion(Version.CURRENT); + + String transformId = config.getId(); + // quick check whether a transform has already been created under that name + if (PersistentTasksCustomMetadata.getTaskWithId(clusterState, transformId) != null) { + listener.onFailure( + new ResourceAlreadyExistsException( + TransformMessages.getMessage(TransformMessages.REST_PUT_TRANSFORM_EXISTS, transformId) + ) + ); + return; + } - // <2> Validate source and destination indices - ActionListener checkPrivilegesListener = ActionListener.wrap(aVoid -> { - client.execute( - ValidateTransformAction.INSTANCE, - new ValidateTransformAction.Request(config, request.isDeferValidation(), request.timeout()), - validateTransformListener + // <3> Create the transform + ActionListener validateTransformListener = ActionListener.wrap( + validationResponse -> putTransform(request, listener), + listener::onFailure ); - }, listener::onFailure); - // <1> Early check to verify that the user can create the destination index and can read from the source - if (XPackSettings.SECURITY_ENABLED.get(settings) && request.isDeferValidation() == false) { - TransformPrivilegeChecker.checkPrivileges( - "create", - securityContext, - indexNameExpressionResolver, - clusterState, - client, - config, - true, - checkPrivilegesListener + // <2> Validate source and destination indices + ActionListener checkPrivilegesListener = ActionListener.wrap( + aVoid -> client.execute( + ValidateTransformAction.INSTANCE, + new ValidateTransformAction.Request(config, request.isDeferValidation(), request.timeout()), + validateTransformListener + ), + listener::onFailure ); - } else { // No security enabled, just move on - checkPrivilegesListener.onResponse(null); - } + + // <1> Early check to verify that the user can create the destination index and can read from the source + if (XPackSettings.SECURITY_ENABLED.get(settings) && request.isDeferValidation() == false) { + TransformPrivilegeChecker.checkPrivileges( + "create", + securityContext, + indexNameExpressionResolver, + clusterState, + client, + config, + true, + checkPrivilegesListener + ); + } else { // No security enabled, just move on + checkPrivilegesListener.onResponse(null); + } + }); } @Override diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java index a18ede68d06a3..62db81bd68e01 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java @@ -25,7 +25,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.MasterNotDiscoveredException; -import org.elasticsearch.ingest.IngestService; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -51,6 +50,8 @@ import java.util.List; import java.util.Map; +import static org.elasticsearch.xpack.transform.utils.SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable; + public class TransportUpdateTransformAction extends TransportTasksAction { private static final Logger logger = LogManager.getLogger(TransportUpdateTransformAction.class); @@ -71,8 +72,7 @@ public TransportUpdateTransformAction( IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService, TransformServices transformServices, - Client client, - IngestService ingestService + Client client ) { super( UpdateTransformAction.NAME, @@ -117,66 +117,71 @@ protected void doExecute(Task task, Request request, ActionListener li } return; } + useSecondaryAuthIfAvailable(securityContext, () -> { + // set headers to run transform as calling user + Map filteredHeaders = ClientHelper.getPersistableSafeSecurityHeaders( + threadPool.getThreadContext(), + clusterService.state() + ); - // set headers to run transform as calling user - Map filteredHeaders = ClientHelper.getPersistableSafeSecurityHeaders( - threadPool.getThreadContext(), - clusterService.state() - ); - - TransformConfigUpdate update = request.getUpdate(); - update.setHeaders(filteredHeaders); - - // GET transform and attempt to update - // We don't want the update to complete if the config changed between GET and INDEX - transformConfigManager.getTransformConfigurationForUpdate(request.getId(), ActionListener.wrap(configAndVersion -> { - TransformUpdater.updateTransform( - securityContext, - indexNameExpressionResolver, - clusterState, - settings, - client, - transformConfigManager, - configAndVersion.v1(), - update, - configAndVersion.v2(), - request.isDeferValidation(), - false, // dryRun - true, // checkAccess - request.getTimeout(), - ActionListener.wrap(updateResponse -> { - TransformConfig updatedConfig = updateResponse.getConfig(); - auditor.info(updatedConfig.getId(), "Updated transform."); - logger.debug("[{}] Updated transform [{}]", updatedConfig.getId(), updateResponse.getStatus()); - - checkTransformConfigAndLogWarnings(updatedConfig); - - if (update.changesSettings(configAndVersion.v1())) { - PersistentTasksCustomMetadata.PersistentTask transformTask = TransformTask.getTransformTask( - request.getId(), - clusterState - ); - - // to send a request to apply new settings at runtime, several requirements must be met: - // - transform must be running, meaning a task exists - // - transform is not failed (stopped transforms do not have a task) - // - the node where transform is executed on is at least 7.8.0 in order to understand the request - if (transformTask != null - && transformTask.isAssigned() - && transformTask.getState() instanceof TransformState - && ((TransformState) transformTask.getState()).getTaskState() != TransformTaskState.FAILED - && clusterState.nodes().get(transformTask.getExecutorNode()).getVersion().onOrAfter(Version.V_7_8_0)) { - - request.setNodes(transformTask.getExecutorNode()); - request.setConfig(updatedConfig); - super.doExecute(task, request, listener); - return; - } - } - listener.onResponse(new Response(updatedConfig)); - }, listener::onFailure) + TransformConfigUpdate update = request.getUpdate(); + update.setHeaders(filteredHeaders); + + // GET transform and attempt to update + // We don't want the update to complete if the config changed between GET and INDEX + transformConfigManager.getTransformConfigurationForUpdate( + request.getId(), + ActionListener.wrap( + configAndVersion -> TransformUpdater.updateTransform( + securityContext, + indexNameExpressionResolver, + clusterState, + settings, + client, + transformConfigManager, + configAndVersion.v1(), + update, + configAndVersion.v2(), + request.isDeferValidation(), + false, // dryRun + true, // checkAccess + request.getTimeout(), + ActionListener.wrap(updateResponse -> { + TransformConfig updatedConfig = updateResponse.getConfig(); + auditor.info(updatedConfig.getId(), "Updated transform."); + logger.debug("[{}] Updated transform [{}]", updatedConfig.getId(), updateResponse.getStatus()); + + checkTransformConfigAndLogWarnings(updatedConfig); + + if (update.changesSettings(configAndVersion.v1())) { + PersistentTasksCustomMetadata.PersistentTask transformTask = TransformTask.getTransformTask( + request.getId(), + clusterState + ); + + // to send a request to apply new settings at runtime, several requirements must be met: + // - transform must be running, meaning a task exists + // - transform is not failed (stopped transforms do not have a task) + // - the node where transform is executed on is at least 7.8.0 in order to understand the request + if (transformTask != null + && transformTask.isAssigned() + && transformTask.getState() instanceof TransformState + && ((TransformState) transformTask.getState()).getTaskState() != TransformTaskState.FAILED + && clusterState.nodes().get(transformTask.getExecutorNode()).getVersion().onOrAfter(Version.V_7_8_0)) { + + request.setNodes(transformTask.getExecutorNode()); + request.setConfig(updatedConfig); + super.doExecute(task, request, listener); + return; + } + } + listener.onResponse(new Response(updatedConfig)); + }, listener::onFailure) + ), + listener::onFailure + ) ); - }, listener::onFailure)); + }); } private void checkTransformConfigAndLogWarnings(TransformConfig config) { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/SecondaryAuthorizationUtils.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/SecondaryAuthorizationUtils.java new file mode 100644 index 0000000000000..b61ed60381d04 --- /dev/null +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/SecondaryAuthorizationUtils.java @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.utils; + +import org.elasticsearch.xpack.core.security.SecurityContext; +import org.elasticsearch.xpack.core.security.authc.support.SecondaryAuthentication; + +public final class SecondaryAuthorizationUtils { + + private SecondaryAuthorizationUtils() {} + + /** + * This executes the supplied runnable inside the secondary auth context if it exists; + */ + public static void useSecondaryAuthIfAvailable(SecurityContext securityContext, Runnable runnable) { + if (securityContext == null) { + runnable.run(); + return; + } + SecondaryAuthentication secondaryAuth = securityContext.getSecondaryAuthentication(); + if (secondaryAuth != null) { + runnable = secondaryAuth.wrap(runnable); + } + runnable.run(); + } +} diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformPrivilegeCheckerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformPrivilegeCheckerTests.java index e1edb09f4f263..a86c1d06ab553 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformPrivilegeCheckerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformPrivilegeCheckerTests.java @@ -22,6 +22,8 @@ import org.elasticsearch.indices.TestIndexNameExpressionResolver; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.security.SecurityContext; import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest; import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesResponse; @@ -75,12 +77,8 @@ public class TransformPrivilegeCheckerTests extends ESTestCase { .setSource(new SourceConfig(SOURCE_INDEX_NAME)) .setDest(new DestConfig(DEST_INDEX_NAME, null)) .build(); - - private final SecurityContext securityContext = new SecurityContext(Settings.EMPTY, null) { - public User getUser() { - return new User(USER_NAME); - } - }; + private ThreadPool threadPool; + private SecurityContext securityContext; private final IndexNameExpressionResolver indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance(); private MyMockClient client; @@ -90,11 +88,18 @@ public void setupClient() { client.close(); } client = new MyMockClient(getTestName()); + threadPool = new TestThreadPool("transform_privilege_checker_tests"); + securityContext = new SecurityContext(Settings.EMPTY, threadPool.getThreadContext()) { + public User getUser() { + return new User(USER_NAME); + } + }; } @After public void tearDownClient() { client.close(); + threadPool.shutdown(); } public void testCheckPrivileges_NoCheckDestIndexPrivileges() {