Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pluginSubject for system index interaction and remove usages of stashContext #714

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
*/
package org.opensearch.jobscheduler.spi.utils;

import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.jobscheduler.spi.JobExecutionContext;
import org.opensearch.jobscheduler.spi.LockModel;
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
Expand Down Expand Up @@ -79,28 +79,23 @@ public boolean lockIndexExist() {

@VisibleForTesting
void createLockIndex(ActionListener<Boolean> listener) {
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) {
if (lockIndexExist()) {
listener.onResponse(true);
} else {
final CreateIndexRequest request = new CreateIndexRequest(LOCK_INDEX_NAME).mapping(
lockMapping(),
(MediaType) XContentType.JSON
);
client.admin()
.indices()
.create(request, ActionListener.wrap(response -> listener.onResponse(response.isAcknowledged()), exception -> {
if (exception instanceof ResourceAlreadyExistsException
|| exception.getCause() instanceof ResourceAlreadyExistsException) {
listener.onResponse(true);
} else {
listener.onFailure(exception);
}
}));
}
} catch (Exception e) {
logger.error(e);
listener.onFailure(e);
if (lockIndexExist()) {
listener.onResponse(true);
} else {
final CreateIndexRequest request = new CreateIndexRequest(LOCK_INDEX_NAME).mapping(
lockMapping(),
(MediaType) XContentType.JSON
);
client.admin()
.indices()
.create(request, ActionListener.wrap(response -> listener.onResponse(response.isAcknowledged()), exception -> {
if (exception instanceof ResourceAlreadyExistsException
|| exception.getCause() instanceof ResourceAlreadyExistsException) {
listener.onResponse(true);
} else {
listener.onFailure(exception);
}
}));
}
}

Expand Down Expand Up @@ -190,7 +185,7 @@ private boolean isLockReleasedOrExpired(final LockModel lock) {
}

private void updateLock(final LockModel updateLock, ActionListener<LockModel> listener) {
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) {
try {
UpdateRequest updateRequest = new UpdateRequest().index(LOCK_INDEX_NAME)
.id(updateLock.getLockId())
.setIfSeqNo(updateLock.getSeqNo())
Expand Down Expand Up @@ -220,18 +215,16 @@ private void updateLock(final LockModel updateLock, ActionListener<LockModel> li
)
);
} catch (IOException e) {
logger.error("IOException occurred updating lock.", e);
listener.onResponse(null);
} catch (Exception e) {
logger.error(e);
logger.error("IOException occurred creating lock", e);
listener.onFailure(e);
}
}

private void createLock(final LockModel tempLock, ActionListener<LockModel> listener) {
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) {
try {
final IndexRequest request = new IndexRequest(LOCK_INDEX_NAME).id(tempLock.getLockId())
.source(tempLock.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why this is necessary here just yet.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed to resolve a race condition discovered in GetLockMultiNodeRestIT.testGetLockRestAPI. This test will create and release a lock 10 times in a loop. Without adding this RefreshPolicy, when the security plugin is installed it can fail to find the lock the was just created. By setting this RefreshPolicy to IMMEDIATE, we can guarantee that this newly created document is available for search (including GET actions)

.setIfSeqNo(SequenceNumbers.UNASSIGNED_SEQ_NO)
.setIfPrimaryTerm(SequenceNumbers.UNASSIGNED_PRIMARY_TERM)
.create(true);
Expand All @@ -256,7 +249,7 @@ private void createLock(final LockModel tempLock, ActionListener<LockModel> list
}

public void findLock(final String lockId, ActionListener<LockModel> listener) {
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) {
try {
GetRequest getRequest = new GetRequest(LOCK_INDEX_NAME).id(lockId);
client.get(getRequest, ActionListener.wrap(response -> {
if (!response.isExists()) {
Expand Down Expand Up @@ -311,7 +304,7 @@ public void release(final LockModel lock, ActionListener<Boolean> listener) {
* or not the delete was successful
*/
public void deleteLock(final String lockId, ActionListener<Boolean> listener) {
try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) {
try {
DeleteRequest deleteRequest = new DeleteRequest(LOCK_INDEX_NAME).id(lockId);
client.delete(deleteRequest, ActionListener.wrap(response -> {
listener.onResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.identity.PluginSubject;
import org.opensearch.jobscheduler.rest.action.RestGetJobDetailsAction;
import org.opensearch.jobscheduler.rest.action.RestGetLockAction;
import org.opensearch.jobscheduler.rest.action.RestReleaseLockAction;
Expand All @@ -38,9 +39,11 @@
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexModule;
import org.opensearch.indices.SystemIndexDescriptor;
import org.opensearch.jobscheduler.transport.RunAsSubjectClient;
import org.opensearch.jobscheduler.utils.JobDetailsService;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.ExtensiblePlugin;
import org.opensearch.plugins.IdentityAwarePlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SystemIndexPlugin;
import org.opensearch.repositories.RepositoriesService;
Expand All @@ -63,7 +66,7 @@

import com.google.common.collect.ImmutableList;

public class JobSchedulerPlugin extends Plugin implements ActionPlugin, ExtensiblePlugin, SystemIndexPlugin {
public class JobSchedulerPlugin extends Plugin implements ActionPlugin, ExtensiblePlugin, SystemIndexPlugin, IdentityAwarePlugin {

public static final String OPEN_DISTRO_JOB_SCHEDULER_THREAD_POOL_NAME = "open_distro_job_scheduler";
public static final String JS_BASE_URI = "/_plugins/_job_scheduler";
Expand All @@ -74,6 +77,7 @@
private LockService lockService;
private Map<String, ScheduledJobProvider> indexToJobProviders;
private Set<String> indicesToListen;
private RunAsSubjectClient pluginClient;

private JobDetailsService jobDetailsService;

Expand Down Expand Up @@ -111,7 +115,8 @@
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
this.lockService = new LockService(client, clusterService);
this.pluginClient = new RunAsSubjectClient(client);
this.lockService = new LockService(pluginClient, clusterService);

Check warning on line 119 in src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java#L118-L119

Added lines #L118 - L119 were not covered by tests
this.jobDetailsService = new JobDetailsService(client, clusterService, this.indicesToListen, this.indexToJobProviders);
this.scheduler = new JobScheduler(threadPool, this.lockService);
this.sweeper = initSweeper(
Expand Down Expand Up @@ -250,4 +255,11 @@
return ImmutableList.of(restGetJobDetailsAction, restGetLockAction, restReleaseLockAction);
}

@Override
public void assignSubject(PluginSubject pluginSubject) {
if (this.pluginClient != null) {
this.pluginClient.setSubject(pluginSubject);

Check warning on line 261 in src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java#L261

Added line #L261 was not covered by tests
}
}

Check warning on line 263 in src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java#L263

Added line #L263 was not covered by tests

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.jobscheduler.transport;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionType;
import org.opensearch.client.Client;
import org.opensearch.client.FilterClient;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.identity.Subject;

/**
* Implementation of client that will run transport actions in a stashed context and inject the name of the provided
* subject into the context.
*/
public class RunAsSubjectClient extends FilterClient {

private static final Logger logger = LogManager.getLogger(RunAsSubjectClient.class);

Check warning on line 29 in src/main/java/org/opensearch/jobscheduler/transport/RunAsSubjectClient.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/jobscheduler/transport/RunAsSubjectClient.java#L29

Added line #L29 was not covered by tests

private Subject subject;

public RunAsSubjectClient(Client delegate) {
super(delegate);
}

Check warning on line 35 in src/main/java/org/opensearch/jobscheduler/transport/RunAsSubjectClient.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/jobscheduler/transport/RunAsSubjectClient.java#L34-L35

Added lines #L34 - L35 were not covered by tests

public RunAsSubjectClient(Client delegate, Subject subject) {
super(delegate);
this.subject = subject;
}

Check warning on line 40 in src/main/java/org/opensearch/jobscheduler/transport/RunAsSubjectClient.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/jobscheduler/transport/RunAsSubjectClient.java#L38-L40

Added lines #L38 - L40 were not covered by tests

public void setSubject(Subject subject) {
this.subject = subject;
}

Check warning on line 44 in src/main/java/org/opensearch/jobscheduler/transport/RunAsSubjectClient.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/jobscheduler/transport/RunAsSubjectClient.java#L43-L44

Added lines #L43 - L44 were not covered by tests

@Override
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
ActionType<Response> action,
Request request,
ActionListener<Response> listener
) {
if (subject == null) {
throw new IllegalStateException("RunAsSubjectClient is not initialized.");

Check warning on line 53 in src/main/java/org/opensearch/jobscheduler/transport/RunAsSubjectClient.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/jobscheduler/transport/RunAsSubjectClient.java#L53

Added line #L53 was not covered by tests
}
try (ThreadContext.StoredContext ctx = threadPool().getThreadContext().newStoredContext(false)) {
subject.runAs(() -> {
logger.info("Running transport action with subject: {}", subject.getPrincipal().getName());
super.doExecute(action, request, ActionListener.runBefore(listener, ctx::restore));
return null;

Check warning on line 59 in src/main/java/org/opensearch/jobscheduler/transport/RunAsSubjectClient.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/jobscheduler/transport/RunAsSubjectClient.java#L55-L59

Added lines #L55 - L59 were not covered by tests
});
} catch (Exception e) {
throw new RuntimeException(e);
}
}

Check warning on line 64 in src/main/java/org/opensearch/jobscheduler/transport/RunAsSubjectClient.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/jobscheduler/transport/RunAsSubjectClient.java#L61-L64

Added lines #L61 - L64 were not covered by tests
}
Loading