Skip to content

Commit

Permalink
Merge branch '6.x' into ccr-6.x
Browse files Browse the repository at this point in the history
* 6.x:
  Fix lock accounting in releasable lock
  text fixes (#28136)
  Update getting-started.asciidoc (#28145)
  [Docs] Delete incorrect migration notes (#27915)
  [Docs] Spelling fix in painless-getting-started.asciidoc (#28187)
  Fixed the cat.health REST test to accept 4ms, not just 4.0ms (#28186)
  Do not keep 5.x commits once having 6.x commits (#28188)
  • Loading branch information
jasontedor committed Jan 12, 2018
2 parents 9b1a28b + 0c0dc3c commit 16812cb
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 23 deletions.
2 changes: 1 addition & 1 deletion docs/painless/painless-getting-started.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ POST hockey/player/_update_by_query
----------------------------------------------------------------
// CONSOLE

Using the match operator (`==~`) you can update all the hockey players who's
Using the match operator (`==~`) you can update all the hockey players whose
names start with a consonant and end with a vowel:

[source,js]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ in the parent multi-bucket aggregation. The specified metric must be numeric and
If the script language is `expression` then a numeric return value is permitted. In this case 0.0 will be evaluated as `false`
and all other values will evaluate to true.

NOTE: The bucket_selector aggregation, like all pipeline aggregations, executions after all other sibling aggregations. This means that
NOTE: The bucket_selector aggregation, like all pipeline aggregations, executes after all other sibling aggregations. This means that
using the bucket_selector aggregation to filter the returned buckets in the response does not save on execution time running the aggregations.

==== Syntax
Expand Down
4 changes: 2 additions & 2 deletions docs/reference/getting-started.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -669,8 +669,8 @@ You can download the sample dataset (accounts.json) from https://github.com/elas

[source,sh]
--------------------------------------------------
curl -H "Content-Type: application/json" -XPOST 'localhost:9200/bank/account/_bulk?pretty&refresh' --data-binary "@accounts.json"
curl 'localhost:9200/_cat/indices?v'
curl -H "Content-Type: application/json" -XPOST "localhost:9200/bank/account/_bulk?pretty&refresh" --data-binary "@accounts.json"
curl "localhost:9200/_cat/indices?v"
--------------------------------------------------
// NOTCONSOLE

Expand Down
6 changes: 0 additions & 6 deletions docs/reference/migration/migrate_6_0/rest.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@ In previous versions of Elasticsearch, Analyze API is requiring a `tokenizer` or
In Elasticsearch 6.0.0, Analyze API can analyze a text as a keyword field with custom normalizer
or if `char_filter`/`filter` is set and `tokenizer`/`analyzer` is not set.

==== Indices exists API

The `ignore_unavailable` and `allow_no_indices` options are no longer accepted
as they could cause undesired results when their values differed from their
defaults.

==== `timestamp` and `ttl` in index requests

`timestamp` and `ttl` are not accepted anymore as parameters of index/update
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
\d+ \s+ # init
\d+ \s+ # unassign
\d+ \s+ # pending_tasks
(-|\d+[.]\d+ms|s) \s+ # max task waiting time
(-|\d+(?:[.]\d+)?m?s) \s+ # max task waiting time
\d+\.\d+% # active shards percent
\n
)+
Expand All @@ -72,7 +72,7 @@
\d+ \s+ # init
\d+ \s+ # unassign
\d+ \s+ # pending_tasks
(-|\d+[.]\d+ms|s) \s+ # max task waiting time
(-|\d+(?:[.]\d+)?m?s) \s+ # max task waiting time
\d+\.\d+% # active shards percent
\n
)+
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@
public class ReleasableLock implements Releasable {
private final Lock lock;

/* a per thread boolean indicating the lock is held by it. only works when assertions are enabled */
private final ThreadLocal<Boolean> holdingThreads;

// a per-thread count indicating how many times the thread has entered the lock; only works if assertions are enabled
private final ThreadLocal<Integer> holdingThreads;

public ReleasableLock(Lock lock) {
this.lock = lock;
Expand All @@ -57,20 +58,27 @@ public ReleasableLock acquire() throws EngineException {
}

private boolean addCurrentThread() {
holdingThreads.set(true);
final Integer current = holdingThreads.get();
holdingThreads.set(current == null ? 1 : current + 1);
return true;
}

private boolean removeCurrentThread() {
holdingThreads.remove();
final Integer count = holdingThreads.get();
assert count != null && count > 0;
if (count == 1) {
holdingThreads.remove();
} else {
holdingThreads.set(count - 1);
}
return true;
}

public Boolean isHeldByCurrentThread() {
if (holdingThreads == null) {
throw new UnsupportedOperationException("asserts must be enabled");
}
Boolean b = holdingThreads.get();
return b != null && b.booleanValue();
final Integer count = holdingThreads.get();
return count != null && count > 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ private static int indexOfKeptCommits(List<? extends IndexCommit> commits, long
if (expectedTranslogUUID.equals(commitUserData.get(Translog.TRANSLOG_UUID_KEY)) == false) {
return i + 1;
}
// 5.x commits do not contain MAX_SEQ_NO.
// 5.x commits do not contain MAX_SEQ_NO, we should not keep it and the older commits.
if (commitUserData.containsKey(SequenceNumbers.MAX_SEQ_NO) == false) {
return i;
return Math.min(commits.size() - 1, i + 1);
}
final long maxSeqNoFromCommit = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO));
if (maxSeqNoFromCommit <= globalCheckpoint) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common.util.concurrent;

import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.test.ESTestCase;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReleasableLockTests extends ESTestCase {

/**
* Test that accounting on whether or not a thread holds a releasable lock is correct. Previously we had a bug where on a re-entrant
* lock that if a thread entered the lock twice we would declare that it does not hold the lock after it exits its first entrance but
* not its second entrance.
*
* @throws BrokenBarrierException if awaiting on the synchronization barrier breaks
* @throws InterruptedException if awaiting on the synchronization barrier is interrupted
*/
public void testIsHeldByCurrentThread() throws BrokenBarrierException, InterruptedException {
final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
final ReleasableLock readLock = new ReleasableLock(readWriteLock.readLock());
final ReleasableLock writeLock = new ReleasableLock(readWriteLock.writeLock());

final int numberOfThreads = scaledRandomIntBetween(1, 32);
final int iterations = scaledRandomIntBetween(1, 32);
final CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
final List<Thread> threads = new ArrayList<>();
for (int i = 0; i < numberOfThreads; i++) {
final Thread thread = new Thread(() -> {
try {
barrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
for (int j = 0; j < iterations; j++) {
if (randomBoolean()) {
acquire(readLock, writeLock);
} else {
acquire(writeLock, readLock);
}
}
try {
barrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
});
threads.add(thread);
thread.start();
}

barrier.await();
barrier.await();
for (final Thread thread : threads) {
thread.join();
}
}

private void acquire(final ReleasableLock lockToAcquire, final ReleasableLock otherLock) {
try (@SuppressWarnings("unused") Releasable outer = lockToAcquire.acquire()) {
assertTrue(lockToAcquire.isHeldByCurrentThread());
assertFalse(otherLock.isHeldByCurrentThread());
try (@SuppressWarnings("unused") Releasable inner = lockToAcquire.acquire()) {
assertTrue(lockToAcquire.isHeldByCurrentThread());
assertFalse(otherLock.isHeldByCurrentThread());
}
// previously there was a bug here and this would return false
assertTrue(lockToAcquire.isHeldByCurrentThread());
assertFalse(otherLock.isHeldByCurrentThread());
}
assertFalse(lockToAcquire.isHeldByCurrentThread());
assertFalse(otherLock.isHeldByCurrentThread());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,15 @@ public void testLegacyIndex() throws Exception {

globalCheckpoint.set(randomLongBetween(0, maxSeqNo - 1));
indexPolicy.onCommit(Arrays.asList(legacyCommit, freshCommit));
verify(legacyCommit, times(0)).delete();
verify(legacyCommit, times(1)).delete(); // Do not keep the legacy commit once we have a new commit.
verify(freshCommit, times(0)).delete();
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(legacyTranslogGen));
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(safeTranslogGen));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(safeTranslogGen));

// Make the fresh commit safe.
globalCheckpoint.set(randomLongBetween(maxSeqNo, Long.MAX_VALUE));
indexPolicy.onCommit(Arrays.asList(legacyCommit, freshCommit));
verify(legacyCommit, times(1)).delete();
verify(legacyCommit, times(2)).delete();
verify(freshCommit, times(0)).delete();
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(safeTranslogGen));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(safeTranslogGen));
Expand Down

0 comments on commit 16812cb

Please sign in to comment.