From d69e9659232f5b0811fd12ae1922ec63cbaaa342 Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Wed, 15 Jan 2025 16:07:34 -0500 Subject: [PATCH] Use pluginSubject for system index interaction and remove usages of stashContext Signed-off-by: Craig Perkins --- .../jobscheduler/spi/utils/LockService.java | 53 +++++++-------- .../jobscheduler/JobSchedulerPlugin.java | 16 ++++- .../transport/RunAsSubjectClient.java | 65 +++++++++++++++++++ 3 files changed, 101 insertions(+), 33 deletions(-) create mode 100644 src/main/java/org/opensearch/jobscheduler/transport/RunAsSubjectClient.java diff --git a/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java b/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java index 19c5fb58..97ab003b 100644 --- a/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java +++ b/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java @@ -8,7 +8,6 @@ */ package org.opensearch.jobscheduler.spi.utils; -import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.jobscheduler.spi.JobExecutionContext; import org.opensearch.jobscheduler.spi.LockModel; import org.opensearch.jobscheduler.spi.ScheduledJobParameter; @@ -79,28 +78,23 @@ public boolean lockIndexExist() { @VisibleForTesting void createLockIndex(ActionListener listener) { - try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) { - if (lockIndexExist()) { - listener.onResponse(true); - } else { - final CreateIndexRequest request = new CreateIndexRequest(LOCK_INDEX_NAME).mapping( - lockMapping(), - (MediaType) XContentType.JSON - ); - client.admin() - .indices() - .create(request, ActionListener.wrap(response -> listener.onResponse(response.isAcknowledged()), exception -> { - if (exception instanceof ResourceAlreadyExistsException - || exception.getCause() instanceof ResourceAlreadyExistsException) { - listener.onResponse(true); - } else { - listener.onFailure(exception); - } - })); - } - } catch (Exception e) { - logger.error(e); - listener.onFailure(e); + if (lockIndexExist()) { + listener.onResponse(true); + } else { + final CreateIndexRequest request = new CreateIndexRequest(LOCK_INDEX_NAME).mapping( + lockMapping(), + (MediaType) XContentType.JSON + ); + client.admin() + .indices() + .create(request, ActionListener.wrap(response -> listener.onResponse(response.isAcknowledged()), exception -> { + if (exception instanceof ResourceAlreadyExistsException + || exception.getCause() instanceof ResourceAlreadyExistsException) { + listener.onResponse(true); + } else { + listener.onFailure(exception); + } + })); } } @@ -190,7 +184,7 @@ private boolean isLockReleasedOrExpired(final LockModel lock) { } private void updateLock(final LockModel updateLock, ActionListener listener) { - try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) { + try { UpdateRequest updateRequest = new UpdateRequest().index(LOCK_INDEX_NAME) .id(updateLock.getLockId()) .setIfSeqNo(updateLock.getSeqNo()) @@ -220,16 +214,13 @@ private void updateLock(final LockModel updateLock, ActionListener li ) ); } catch (IOException e) { - logger.error("IOException occurred updating lock.", e); - listener.onResponse(null); - } catch (Exception e) { - logger.error(e); + logger.error("IOException occurred creating lock", e); listener.onFailure(e); } } private void createLock(final LockModel tempLock, ActionListener listener) { - try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) { + try { final IndexRequest request = new IndexRequest(LOCK_INDEX_NAME).id(tempLock.getLockId()) .source(tempLock.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) .setIfSeqNo(SequenceNumbers.UNASSIGNED_SEQ_NO) @@ -256,7 +247,7 @@ private void createLock(final LockModel tempLock, ActionListener list } public void findLock(final String lockId, ActionListener listener) { - try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) { + try { GetRequest getRequest = new GetRequest(LOCK_INDEX_NAME).id(lockId); client.get(getRequest, ActionListener.wrap(response -> { if (!response.isExists()) { @@ -311,7 +302,7 @@ public void release(final LockModel lock, ActionListener listener) { * or not the delete was successful */ public void deleteLock(final String lockId, ActionListener listener) { - try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) { + try { DeleteRequest deleteRequest = new DeleteRequest(LOCK_INDEX_NAME).id(lockId); client.delete(deleteRequest, ActionListener.wrap(response -> { listener.onResponse( diff --git a/src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java b/src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java index 3873a4dc..56ed0768 100644 --- a/src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java +++ b/src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java @@ -14,6 +14,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.IndexScopedSettings; import org.opensearch.common.settings.SettingsFilter; +import org.opensearch.identity.PluginSubject; import org.opensearch.jobscheduler.rest.action.RestGetJobDetailsAction; import org.opensearch.jobscheduler.rest.action.RestGetLockAction; import org.opensearch.jobscheduler.rest.action.RestReleaseLockAction; @@ -38,9 +39,11 @@ import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexModule; import org.opensearch.indices.SystemIndexDescriptor; +import org.opensearch.jobscheduler.transport.RunAsSubjectClient; import org.opensearch.jobscheduler.utils.JobDetailsService; import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.ExtensiblePlugin; +import org.opensearch.plugins.IdentityAwarePlugin; import org.opensearch.plugins.Plugin; import org.opensearch.plugins.SystemIndexPlugin; import org.opensearch.repositories.RepositoriesService; @@ -63,7 +66,7 @@ import com.google.common.collect.ImmutableList; -public class JobSchedulerPlugin extends Plugin implements ActionPlugin, ExtensiblePlugin, SystemIndexPlugin { +public class JobSchedulerPlugin extends Plugin implements ActionPlugin, ExtensiblePlugin, SystemIndexPlugin, IdentityAwarePlugin { public static final String OPEN_DISTRO_JOB_SCHEDULER_THREAD_POOL_NAME = "open_distro_job_scheduler"; public static final String JS_BASE_URI = "/_plugins/_job_scheduler"; @@ -74,6 +77,7 @@ public class JobSchedulerPlugin extends Plugin implements ActionPlugin, Extensib private LockService lockService; private Map indexToJobProviders; private Set indicesToListen; + private RunAsSubjectClient pluginClient; private JobDetailsService jobDetailsService; @@ -111,7 +115,8 @@ public Collection createComponents( IndexNameExpressionResolver indexNameExpressionResolver, Supplier repositoriesServiceSupplier ) { - this.lockService = new LockService(client, clusterService); + this.pluginClient = new RunAsSubjectClient(client); + this.lockService = new LockService(pluginClient, clusterService); this.jobDetailsService = new JobDetailsService(client, clusterService, this.indicesToListen, this.indexToJobProviders); this.scheduler = new JobScheduler(threadPool, this.lockService); this.sweeper = initSweeper( @@ -250,4 +255,11 @@ public List getRestHandlers( return ImmutableList.of(restGetJobDetailsAction, restGetLockAction, restReleaseLockAction); } + @Override + public void assignSubject(PluginSubject pluginSubject) { + if (this.pluginClient != null) { + this.pluginClient.setSubject(pluginSubject); + } + } + } diff --git a/src/main/java/org/opensearch/jobscheduler/transport/RunAsSubjectClient.java b/src/main/java/org/opensearch/jobscheduler/transport/RunAsSubjectClient.java new file mode 100644 index 00000000..d02774bb --- /dev/null +++ b/src/main/java/org/opensearch/jobscheduler/transport/RunAsSubjectClient.java @@ -0,0 +1,65 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.transport; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionType; +import org.opensearch.client.Client; +import org.opensearch.client.FilterClient; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.identity.Subject; + +/** + * Implementation of client that will run transport actions in a stashed context and inject the name of the provided + * subject into the context. + */ +public class RunAsSubjectClient extends FilterClient { + + private static final Logger logger = LogManager.getLogger(RunAsSubjectClient.class); + + private Subject subject; + + public RunAsSubjectClient(Client delegate) { + super(delegate); + } + + public RunAsSubjectClient(Client delegate, Subject subject) { + super(delegate); + this.subject = subject; + } + + public void setSubject(Subject subject) { + this.subject = subject; + } + + @Override + protected void doExecute( + ActionType action, + Request request, + ActionListener listener + ) { + if (subject == null) { + throw new IllegalStateException("RunAsSubjectClient is not initialized."); + } + try (ThreadContext.StoredContext ctx = threadPool().getThreadContext().newStoredContext(false)) { + subject.runAs(() -> { + logger.info("Running transport action with subject: {}", subject.getPrincipal().getName()); + super.doExecute(action, request, ActionListener.runBefore(listener, ctx::restore)); + return null; + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +}