-
Notifications
You must be signed in to change notification settings - Fork 25.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix security index auto-create and state recovery race #39582
Changes from 4 commits
9c2e1de
4db7e68
99a1a05
ac82dd4
f734038
1cd74ec
6363365
b358996
ea90cc1
0851725
9146fdb
974fc47
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -45,11 +45,13 @@ | |
|
||
import java.nio.charset.StandardCharsets; | ||
import java.util.HashSet; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.Set; | ||
import java.util.concurrent.CopyOnWriteArrayList; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.function.BiConsumer; | ||
import java.util.function.Consumer; | ||
import java.util.function.Predicate; | ||
|
@@ -105,6 +107,11 @@ public boolean indexExists() { | |
return this.indexState.indexExists; | ||
} | ||
|
||
public boolean stateRecovered() { | ||
// concrete security index name is a proxy for the cluster state recovery | ||
return this.indexState.concreteIndexName != null; | ||
} | ||
|
||
/** | ||
* Returns whether the index is on the current format if it exists. If the index does not exist | ||
* we treat the index as up to date as we expect it to be created with the current format. | ||
|
@@ -167,8 +174,10 @@ public void clusterChanged(ClusterChangedEvent event) { | |
this.indexState = newState; | ||
|
||
if (newState.equals(previousState) == false) { | ||
for (BiConsumer<State, State> listener : stateChangeListeners) { | ||
listener.accept(previousState, newState); | ||
// point in time iterator | ||
final Iterator<BiConsumer<State, State>> stateListenerIterator = stateChangeListeners.iterator(); | ||
while (stateListenerIterator.hasNext()) { | ||
stateListenerIterator.next().accept(previousState, newState); | ||
} | ||
} | ||
} | ||
|
@@ -281,7 +290,10 @@ private static Version readMappingVersion(String indexName, MappingMetaData mapp | |
*/ | ||
public void checkIndexVersionThenExecute(final Consumer<Exception> consumer, final Runnable andThen) { | ||
final State indexState = this.indexState; // use a local copy so all checks execute against the same state! | ||
if (indexState.indexExists && indexState.isIndexUpToDate == false) { | ||
if (indexState.concreteIndexName == null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I went with your suggestion, and then reverted. I have kept on checking |
||
// index not recovered from gateway | ||
delayUntilStateRecovered(consumer, () -> checkIndexVersionThenExecute(consumer, andThen)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This worries me. We don't do that now. We don't queue if the security index is red, we currently fail if the index is not recovered. From a usability point of view it's a nice idea, but it feels like we're jumping in without thinking it through. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, understood! I just wonder if there is some alternative for "delaying requests" that would be acceptable beyond feature freeze? |
||
} else if (indexState.indexExists && indexState.isIndexUpToDate == false) { | ||
consumer.accept(new IllegalStateException( | ||
"Security index is not on the current version. Security features relying on the index will not be available until " + | ||
"the upgrade API is run on the security index")); | ||
|
@@ -297,14 +309,17 @@ public void checkIndexVersionThenExecute(final Consumer<Exception> consumer, fin | |
public void prepareIndexIfNeededThenExecute(final Consumer<Exception> consumer, final Runnable andThen) { | ||
final State indexState = this.indexState; // use a local copy so all checks execute against the same state! | ||
// TODO we should improve this so we don't fire off a bunch of requests to do the same thing (create or update mappings) | ||
if (indexState.indexExists && indexState.isIndexUpToDate == false) { | ||
if (indexState.concreteIndexName == null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto.
|
||
// index not recovered from gateway | ||
delayUntilStateRecovered(consumer, () -> prepareIndexIfNeededThenExecute(consumer, andThen)); | ||
} else if (indexState.indexExists && indexState.isIndexUpToDate == false) { | ||
consumer.accept(new IllegalStateException( | ||
"Security index is not on the current version. Security features relying on the index will not be available until " + | ||
"the upgrade API is run on the security index")); | ||
} else if (indexState.indexExists == false) { | ||
LOGGER.info("security index does not exist. Creating [{}] with alias [{}]", INTERNAL_SECURITY_INDEX, SECURITY_INDEX_NAME); | ||
LOGGER.info("security index does not exist. Creating [{}] with alias [{}]", indexState.concreteIndexName, SECURITY_INDEX_NAME); | ||
Tuple<String, Settings> mappingAndSettings = loadMappingAndSettingsSourceFromTemplate(); | ||
CreateIndexRequest request = new CreateIndexRequest(INTERNAL_SECURITY_INDEX) | ||
CreateIndexRequest request = new CreateIndexRequest(indexState.concreteIndexName) | ||
.alias(new Alias(SECURITY_INDEX_NAME)) | ||
.mapping("doc", mappingAndSettings.v1(), XContentType.JSON) | ||
.waitForActiveShards(ActiveShardCount.ALL) | ||
|
@@ -373,6 +388,41 @@ public static boolean isIndexDeleted(State previousState, State currentState) { | |
return previousState.indexStatus != null && currentState.indexStatus == null; | ||
} | ||
|
||
/** | ||
* Delay the {@code runnable} invocation until cluster state recovered. | ||
*/ | ||
private void delayUntilStateRecovered(final Consumer<Exception> consumer, final Runnable runnable) { | ||
// context preserving one shoot runnable | ||
final AtomicBoolean done = new AtomicBoolean(false); | ||
final Runnable delayedRunnable = client.threadPool().getThreadContext().preserveContext(() -> { | ||
if (false == done.get()) { | ||
done.set(true); | ||
runnable.run(); | ||
} | ||
}); | ||
final BiConsumer<State, State> gatewayRecoveryListener = new BiConsumer<State, State>() { | ||
@Override | ||
public void accept(State prevState, State newState) { | ||
// any cluster state update is a sign that the state recovered | ||
if (newState.concreteIndexName != null) { | ||
stateChangeListeners.remove(this); | ||
client.threadPool().generic().execute(delayedRunnable); | ||
} else { | ||
consumer.accept(new IllegalStateException("State has been recovered, but the security index name is unknown.")); | ||
} | ||
} | ||
}; | ||
// enqueue and wait for the first cluster state update | ||
stateChangeListeners.add(gatewayRecoveryListener); | ||
// maybe state recovered in the meantime since we last checked | ||
final State indexState = this.indexState; | ||
if (indexState.concreteIndexName != null) { | ||
// state indeed recovered and we _might_ have lost the notification | ||
stateChangeListeners.remove(gatewayRecoveryListener); | ||
delayedRunnable.run(); | ||
} | ||
} | ||
|
||
/** | ||
* State of the security index. | ||
*/ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I can tell this change doesn't actually do anything.
A
for
loop is just syntactic sugar overiterator()
andnext()
, you haven't change the semantics. ThestateListenerIterator
is still backed by the sameList
and is at risk ofConcurrentModificationException
issues if a listener is added/removed.Assuming that's the problem you're trying to solve, then the options come down to:
synchronize
whenever we use the listCopyOnWriteArrayList
(2) is probably the best option. I don't think we add listeners very often, so making a list copy each time is probably OK, but you'd need to audit the places we modify the list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a rewrite that unwraps the syntactic sugar because I find it confusing to use that when the list is concurrently modified.
The list is already of the
CopyOnWriteArrayList
type.