Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into dot-prefix-validation
Browse files Browse the repository at this point in the history
  • Loading branch information
dakrone committed Sep 10, 2024
2 parents 07084ee + f6ace50 commit aab6a56
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 17 deletions.
7 changes: 7 additions & 0 deletions .ci/scripts/resolve-dra-manifest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,14 @@ LATEST_VERSION=$(strip_version $LATEST_BUILD)
if [ "$LATEST_VERSION" != "$ES_VERSION" ]; then
echo "Latest build for '$ARTIFACT' is version $LATEST_VERSION but expected version $ES_VERSION." 1>&2
NEW_BRANCH=$(echo $ES_VERSION | sed -E "s/([0-9]+\.[0-9]+)\.[0-9]/\1/g")

# Temporary
if [[ "$ES_VERSION" == "8.16.0" ]]; then
NEW_BRANCH="8.x"
fi

echo "Using branch $NEW_BRANCH instead of $BRANCH." 1>&2
echo "https://artifacts-$WORKFLOW.elastic.co/$ARTIFACT/latest/$NEW_BRANCH.json"
LATEST_BUILD=$(fetch_build $WORKFLOW $ARTIFACT $NEW_BRANCH)
fi

Expand Down
2 changes: 0 additions & 2 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,6 @@ tests:
- class: org.elasticsearch.xpack.ml.integration.MlJobIT
method: testPutJob_GivenFarequoteConfig
issue: https://github.com/elastic/elasticsearch/issues/112382
- class: org.elasticsearch.xpack.eql.EqlClientYamlIT
issue: https://github.com/elastic/elasticsearch/issues/112617
- class: org.elasticsearch.xpack.security.authc.kerberos.KerberosTicketValidatorTests
method: testWhenKeyTabWithInvalidContentFailsValidation
issue: https://github.com/elastic/elasticsearch/issues/112631
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy {
private final SoftDeletesPolicy softDeletesPolicy;
private final LongSupplier globalCheckpointSupplier;
private final Map<IndexCommit, Integer> acquiredIndexCommits; // Number of references held against each commit point.
// Index commits internally acquired by the commits listener. We want to track them separately to be able to disregard them
// when checking for externally acquired index commits that haven't been released
private final Set<IndexCommit> internallyAcquiredIndexCommits;

interface CommitsListener {

Expand Down Expand Up @@ -72,6 +75,7 @@ interface CommitsListener {
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.commitsListener = commitsListener;
this.acquiredIndexCommits = new HashMap<>();
this.internallyAcquiredIndexCommits = new HashSet<>();
}

@Override
Expand Down Expand Up @@ -114,7 +118,7 @@ public void onCommit(List<? extends IndexCommit> commits) throws IOException {
this.maxSeqNoOfNextSafeCommit = Long.parseLong(commits.get(keptPosition + 1).getUserData().get(SequenceNumbers.MAX_SEQ_NO));
}
if (commitsListener != null && previousLastCommit != this.lastCommit) {
newCommit = acquireIndexCommit(false);
newCommit = acquireIndexCommit(false, true);
} else {
newCommit = null;
}
Expand Down Expand Up @@ -210,15 +214,25 @@ SafeCommitInfo getSafeCommitInfo() {
* @param acquiringSafeCommit captures the most recent safe commit point if true; otherwise captures the most recent commit point.
*/
synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) {
return acquireIndexCommit(acquiringSafeCommit, false);
}

private synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit, boolean acquiredInternally) {
assert safeCommit != null : "Safe commit is not initialized yet";
assert lastCommit != null : "Last commit is not initialized yet";
final IndexCommit snapshotting = acquiringSafeCommit ? safeCommit : lastCommit;
acquiredIndexCommits.merge(snapshotting, 1, Integer::sum); // increase refCount
return wrapCommit(snapshotting);
assert acquiredInternally == false || internallyAcquiredIndexCommits.add(snapshotting)
: "commit [" + snapshotting + "] already added";
return wrapCommit(snapshotting, acquiredInternally);
}

protected IndexCommit wrapCommit(IndexCommit indexCommit) {
return new SnapshotIndexCommit(indexCommit);
return wrapCommit(indexCommit, false);
}

protected IndexCommit wrapCommit(IndexCommit indexCommit, boolean acquiredInternally) {
return new SnapshotIndexCommit(indexCommit, acquiredInternally);
}

/**
Expand All @@ -227,7 +241,8 @@ protected IndexCommit wrapCommit(IndexCommit indexCommit) {
* @return true if the acquired commit can be clean up.
*/
synchronized boolean releaseCommit(final IndexCommit acquiredCommit) {
final IndexCommit releasingCommit = ((SnapshotIndexCommit) acquiredCommit).getIndexCommit();
final SnapshotIndexCommit snapshotIndexCommit = (SnapshotIndexCommit) acquiredCommit;
final IndexCommit releasingCommit = snapshotIndexCommit.getIndexCommit();
assert acquiredIndexCommits.containsKey(releasingCommit)
: "Release non-acquired commit;"
+ "acquired commits ["
Expand All @@ -242,6 +257,8 @@ synchronized boolean releaseCommit(final IndexCommit acquiredCommit) {
}
return count - 1;
});
assert snapshotIndexCommit.acquiredInternally == false || internallyAcquiredIndexCommits.remove(releasingCommit)
: "Trying to release a commit [" + releasingCommit + "] that hasn't been previously acquired internally";

assert refCount == null || refCount > 0 : "Number of references for acquired commit can not be negative [" + refCount + "]";
// The commit can be clean up only if no refCount and it is neither the safe commit nor last commit.
Expand Down Expand Up @@ -296,10 +313,16 @@ private static Set<String> listOfNewFileNames(IndexCommit previous, IndexCommit
}

/**
* Checks whether the deletion policy is holding on to acquired index commits
* Checks whether the deletion policy is holding on to externally acquired index commits
*/
synchronized boolean hasAcquiredIndexCommits() {
return acquiredIndexCommits.isEmpty() == false;
synchronized boolean hasAcquiredIndexCommitsForTesting() {
// We explicitly check only external commits and disregard internal commits acquired by the commits listener
for (var e : acquiredIndexCommits.entrySet()) {
if (internallyAcquiredIndexCommits.contains(e.getKey()) == false || e.getValue() > 1) {
return true;
}
}
return false;
}

/**
Expand All @@ -320,8 +343,12 @@ public static String commitDescription(IndexCommit commit) throws IOException {
* A wrapper of an index commit that prevents it from being deleted.
*/
private static class SnapshotIndexCommit extends FilterIndexCommit {
SnapshotIndexCommit(IndexCommit delegate) {

private final boolean acquiredInternally;

SnapshotIndexCommit(IndexCommit delegate, boolean acquiredInternally) {
super(delegate);
this.acquiredInternally = acquiredInternally;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -669,8 +669,8 @@ Translog getTranslog() {
}

// Package private for testing purposes only
boolean hasAcquiredIndexCommits() {
return combinedDeletionPolicy.hasAcquiredIndexCommits();
boolean hasAcquiredIndexCommitsForTesting() {
return combinedDeletionPolicy.hasAcquiredIndexCommitsForTesting();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1440,10 +1440,10 @@ public static void waitForOpsToComplete(InternalEngine engine, long seqNo) throw
assertBusy(() -> assertThat(engine.getLocalCheckpointTracker().getProcessedCheckpoint(), greaterThanOrEqualTo(seqNo)));
}

public static boolean hasAcquiredIndexCommits(Engine engine) {
public static boolean hasAcquiredIndexCommitsForTesting(Engine engine) {
assert engine instanceof InternalEngine : "only InternalEngines have snapshotted commits, got: " + engine.getClass();
InternalEngine internalEngine = (InternalEngine) engine;
return internalEngine.hasAcquiredIndexCommits();
return internalEngine.hasAcquiredIndexCommitsForTesting();
}

public static final class PrimaryTermSupplier implements LongSupplier {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1369,7 +1369,7 @@ private void assertNoAcquiredIndexCommit() throws Exception {
if (engine instanceof InternalEngine) {
assertFalse(
indexShard.routingEntry().toString() + " has unreleased snapshotted index commits",
EngineTestCase.hasAcquiredIndexCommits(engine)
EngineTestCase.hasAcquiredIndexCommitsForTesting(engine)
);
}
} catch (AlreadyClosedException ignored) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,9 @@ public void testGetSessionDoesNotLeakFileIfClosed() throws IOException {
sessionReader.readFileBytes(files.get(1).name(), MockBigArrays.NON_RECYCLING_INSTANCE.newByteArray(10, false));
}

assertTrue(EngineTestCase.hasAcquiredIndexCommits(IndexShardTestCase.getEngine(indexShard)));
assertTrue(EngineTestCase.hasAcquiredIndexCommitsForTesting(IndexShardTestCase.getEngine(indexShard)));
restoreSourceService.closeSession(sessionUUID);
assertFalse(EngineTestCase.hasAcquiredIndexCommits(IndexShardTestCase.getEngine(indexShard)));
assertFalse(EngineTestCase.hasAcquiredIndexCommitsForTesting(IndexShardTestCase.getEngine(indexShard)));

closeShards(indexShard);
// Exception will be thrown if file is not closed.
Expand Down
8 changes: 8 additions & 0 deletions x-pack/plugin/eql/qa/rest/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ tasks.named('yamlRestTestV7CompatTest') {
usesDefaultDistribution()
}

tasks.named("yamlRestTestV7CompatTransform").configure {task ->
task.skipTest("eql/10_basic/Execute EQL events query with wildcard (*) fields filtering.", "Change of locale with Java 23 makes these tests non deterministic")
task.skipTest("eql/10_basic/Execute EQL sequence with fields filtering.", "Change of locale with Java 23 makes these tests non deterministic")
task.skipTest("eql/10_basic/Execute EQL sequence with custom format for timestamp field.", "Change of locale with Java 23 makes these tests non deterministic")
task.skipTest("eql/10_basic/Execute EQL events query with fields filtering", "Change of locale with Java 23 makes these tests non deterministic")
task.skipTest("eql/10_basic/Execute EQL sequence with wildcard (*) fields filtering.", "Change of locale with Java 23 makes these tests non deterministic")
}

if (BuildParams.inFipsJvm){
// This test cluster is using a BASIC license and FIPS 140 mode is not supported in BASIC
tasks.named("javaRestTest").configure{enabled = false }
Expand Down

0 comments on commit aab6a56

Please sign in to comment.