Skip to content

Commit

Permalink
Use pluginSubject for system index interaction and remove usages of s…
Browse files Browse the repository at this point in the history
…tashContext

Signed-off-by: Craig Perkins <cwperx@amazon.com>
  • Loading branch information
cwperks committed Jan 15, 2025
1 parent f1ad865 commit d69e965
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
*/
package org.opensearch.jobscheduler.spi.utils;

import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.jobscheduler.spi.JobExecutionContext;
import org.opensearch.jobscheduler.spi.LockModel;
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
Expand Down Expand Up @@ -79,28 +78,23 @@ public boolean lockIndexExist() {

@VisibleForTesting
void createLockIndex(ActionListener<Boolean> listener) {
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) {
if (lockIndexExist()) {
listener.onResponse(true);
} else {
final CreateIndexRequest request = new CreateIndexRequest(LOCK_INDEX_NAME).mapping(
lockMapping(),
(MediaType) XContentType.JSON
);
client.admin()
.indices()
.create(request, ActionListener.wrap(response -> listener.onResponse(response.isAcknowledged()), exception -> {
if (exception instanceof ResourceAlreadyExistsException
|| exception.getCause() instanceof ResourceAlreadyExistsException) {
listener.onResponse(true);
} else {
listener.onFailure(exception);
}
}));
}
} catch (Exception e) {
logger.error(e);
listener.onFailure(e);
if (lockIndexExist()) {
listener.onResponse(true);
} else {
final CreateIndexRequest request = new CreateIndexRequest(LOCK_INDEX_NAME).mapping(
lockMapping(),
(MediaType) XContentType.JSON
);
client.admin()
.indices()
.create(request, ActionListener.wrap(response -> listener.onResponse(response.isAcknowledged()), exception -> {
if (exception instanceof ResourceAlreadyExistsException
|| exception.getCause() instanceof ResourceAlreadyExistsException) {
listener.onResponse(true);
} else {
listener.onFailure(exception);
}
}));
}
}

Expand Down Expand Up @@ -190,7 +184,7 @@ private boolean isLockReleasedOrExpired(final LockModel lock) {
}

private void updateLock(final LockModel updateLock, ActionListener<LockModel> listener) {
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) {
try {
UpdateRequest updateRequest = new UpdateRequest().index(LOCK_INDEX_NAME)
.id(updateLock.getLockId())
.setIfSeqNo(updateLock.getSeqNo())
Expand Down Expand Up @@ -220,16 +214,13 @@ private void updateLock(final LockModel updateLock, ActionListener<LockModel> li
)
);
} catch (IOException e) {
logger.error("IOException occurred updating lock.", e);
listener.onResponse(null);
} catch (Exception e) {
logger.error(e);
logger.error("IOException occurred creating lock", e);
listener.onFailure(e);
}
}

private void createLock(final LockModel tempLock, ActionListener<LockModel> listener) {
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) {
try {
final IndexRequest request = new IndexRequest(LOCK_INDEX_NAME).id(tempLock.getLockId())
.source(tempLock.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.setIfSeqNo(SequenceNumbers.UNASSIGNED_SEQ_NO)
Expand All @@ -256,7 +247,7 @@ private void createLock(final LockModel tempLock, ActionListener<LockModel> list
}

public void findLock(final String lockId, ActionListener<LockModel> listener) {
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) {
try {
GetRequest getRequest = new GetRequest(LOCK_INDEX_NAME).id(lockId);
client.get(getRequest, ActionListener.wrap(response -> {
if (!response.isExists()) {
Expand Down Expand Up @@ -311,7 +302,7 @@ public void release(final LockModel lock, ActionListener<Boolean> listener) {
* or not the delete was successful
*/
public void deleteLock(final String lockId, ActionListener<Boolean> listener) {
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) {
try {
DeleteRequest deleteRequest = new DeleteRequest(LOCK_INDEX_NAME).id(lockId);
client.delete(deleteRequest, ActionListener.wrap(response -> {
listener.onResponse(
Expand Down
16 changes: 14 additions & 2 deletions src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.identity.PluginSubject;
import org.opensearch.jobscheduler.rest.action.RestGetJobDetailsAction;
import org.opensearch.jobscheduler.rest.action.RestGetLockAction;
import org.opensearch.jobscheduler.rest.action.RestReleaseLockAction;
Expand All @@ -38,9 +39,11 @@
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexModule;
import org.opensearch.indices.SystemIndexDescriptor;
import org.opensearch.jobscheduler.transport.RunAsSubjectClient;
import org.opensearch.jobscheduler.utils.JobDetailsService;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.ExtensiblePlugin;
import org.opensearch.plugins.IdentityAwarePlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SystemIndexPlugin;
import org.opensearch.repositories.RepositoriesService;
Expand All @@ -63,7 +66,7 @@

import com.google.common.collect.ImmutableList;

public class JobSchedulerPlugin extends Plugin implements ActionPlugin, ExtensiblePlugin, SystemIndexPlugin {
public class JobSchedulerPlugin extends Plugin implements ActionPlugin, ExtensiblePlugin, SystemIndexPlugin, IdentityAwarePlugin {

public static final String OPEN_DISTRO_JOB_SCHEDULER_THREAD_POOL_NAME = "open_distro_job_scheduler";
public static final String JS_BASE_URI = "/_plugins/_job_scheduler";
Expand All @@ -74,6 +77,7 @@ public class JobSchedulerPlugin extends Plugin implements ActionPlugin, Extensib
private LockService lockService;
private Map<String, ScheduledJobProvider> indexToJobProviders;
private Set<String> indicesToListen;
private RunAsSubjectClient pluginClient;

private JobDetailsService jobDetailsService;

Expand Down Expand Up @@ -111,7 +115,8 @@ public Collection<Object> createComponents(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
this.lockService = new LockService(client, clusterService);
this.pluginClient = new RunAsSubjectClient(client);
this.lockService = new LockService(pluginClient, clusterService);
this.jobDetailsService = new JobDetailsService(client, clusterService, this.indicesToListen, this.indexToJobProviders);
this.scheduler = new JobScheduler(threadPool, this.lockService);
this.sweeper = initSweeper(
Expand Down Expand Up @@ -250,4 +255,11 @@ public List getRestHandlers(
return ImmutableList.of(restGetJobDetailsAction, restGetLockAction, restReleaseLockAction);
}

@Override
public void assignSubject(PluginSubject pluginSubject) {
if (this.pluginClient != null) {
this.pluginClient.setSubject(pluginSubject);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.jobscheduler.transport;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionType;
import org.opensearch.client.Client;
import org.opensearch.client.FilterClient;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.identity.Subject;

/**
* Implementation of client that will run transport actions in a stashed context and inject the name of the provided
* subject into the context.
*/
public class RunAsSubjectClient extends FilterClient {

private static final Logger logger = LogManager.getLogger(RunAsSubjectClient.class);

private Subject subject;

public RunAsSubjectClient(Client delegate) {
super(delegate);
}

public RunAsSubjectClient(Client delegate, Subject subject) {
super(delegate);
this.subject = subject;
}

public void setSubject(Subject subject) {
this.subject = subject;
}

@Override
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
ActionType<Response> action,
Request request,
ActionListener<Response> listener
) {
if (subject == null) {
throw new IllegalStateException("RunAsSubjectClient is not initialized.");
}
try (ThreadContext.StoredContext ctx = threadPool().getThreadContext().newStoredContext(false)) {
subject.runAs(() -> {
logger.info("Running transport action with subject: {}", subject.getPrincipal().getName());
super.doExecute(action, request, ActionListener.runBefore(listener, ctx::restore));
return null;
});
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

0 comments on commit d69e965

Please sign in to comment.