Skip to content

Commit

Permalink
Fix split package with OneMergeHelper (elastic#78601)
Browse files Browse the repository at this point in the history
With LUCENE-10118 integrated, we can now remove the package
-private dependency with org.apache.lucene.index.OneMergeHelper, and
intercept the info/log messages coming from merge threads.

This change alters the logging a little, but the fundamental information
captured remains more or less the same. It is worth noting that since
the merges occur asynconously, the actual post-merge statistics are best
captured when the merge thread completes its operation - which is the
case with the change in this PR.

relates (elastic#78166)
  • Loading branch information
ChrisHegarty authored Oct 5, 2021
1 parent 714aa54 commit 0ca3d12
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 62 deletions.
1 change: 0 additions & 1 deletion server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,6 @@ tasks.named('splitPackagesAudit').configure {
// These are tricky because Lucene itself splits the index package,
// but this should be fixed in Lucene 9
'org.apache.lucene.index.LazySoftDeletesDirectoryReaderWrapper',
'org.apache.lucene.index.OneMergeHelper',
'org.apache.lucene.index.ShuffleForcedMergePolicy',

// Joda should own its own packages! This should be a simple move.
Expand Down
50 changes: 0 additions & 50 deletions server/src/main/java/org/apache/lucene/index/OneMergeHelper.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.apache.lucene.index.ConcurrentMergeScheduler;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.MergeScheduler;
import org.apache.lucene.index.OneMergeHelper;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
Expand Down Expand Up @@ -67,6 +66,31 @@ public Set<OnGoingMerge> onGoingMerges() {
return readOnlyOnGoingMerges;
}

/** We're currently only interested in messages with this prefix. */
private static final String MERGE_THREAD_MESSAGE_PREFIX = "merge thread";

@Override
/** Overridden to route specific MergeThread messages to our logger. */
protected boolean verbose() {
if (logger.isTraceEnabled() && Thread.currentThread() instanceof MergeThread) {
return true;
}
return super.verbose();
}

@Override
/** Overridden to route specific MergeThread messages to our logger. */
protected void message(String message) {
if (logger.isTraceEnabled() && Thread.currentThread() instanceof MergeThread && message.startsWith(MERGE_THREAD_MESSAGE_PREFIX)) {
logger.trace("{}", message);
}
super.message(message);
}

private static String getSegmentName(MergePolicy.OneMerge merge) {
return merge.getMergeInfo() != null ? merge.getMergeInfo().info.name : "_na_";
}

@Override
protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) throws IOException {
int totalNumDocs = merge.totalNumDocs();
Expand All @@ -81,7 +105,7 @@ protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) thro

if (logger.isTraceEnabled()) {
logger.trace("merge [{}] starting..., merging [{}] segments, [{}] docs, [{}] size, into [{}] estimated_size",
OneMergeHelper.getSegmentName(merge), merge.segments.size(), totalNumDocs, new ByteSizeValue(totalSizeInBytes),
getSegmentName(merge), merge.segments.size(), totalNumDocs, new ByteSizeValue(totalSizeInBytes),
new ByteSizeValue(merge.estimatedMergeBytes));
}
try {
Expand All @@ -106,23 +130,18 @@ protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) thro
long throttledMS = TimeValue.nsecToMSec(
merge.getMergeProgress().getPauseTimes().get(MergePolicy.OneMergeProgress.PauseReason.PAUSED)
);
final Thread thread = Thread.currentThread();
long totalBytesWritten = OneMergeHelper.getTotalBytesWritten(thread, merge);
double mbPerSec = OneMergeHelper.getMbPerSec(thread, merge);
totalMergeStoppedTime.inc(stoppedMS);
totalMergeThrottledTime.inc(throttledMS);

String message = String.format(Locale.ROOT,
"merge segment [%s] done: took [%s], [%,.1f MB], [%,d docs], [%s stopped], " +
"[%s throttled], [%,.1f MB written], [%,.1f MB/sec throttle]",
OneMergeHelper.getSegmentName(merge),
"[%s throttled]",
getSegmentName(merge),
TimeValue.timeValueMillis(tookMS),
totalSizeInBytes/1024f/1024f,
totalNumDocs,
TimeValue.timeValueMillis(stoppedMS),
TimeValue.timeValueMillis(throttledMS),
totalBytesWritten/1024f/1024f,
mbPerSec);
TimeValue.timeValueMillis(throttledMS));

if (tookMS > 20000) { // if more than 20 seconds, DEBUG log it
logger.debug("{}", message);
Expand Down Expand Up @@ -184,5 +203,4 @@ void refreshConfig() {
disableAutoIOThrottle();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.LiveIndexWriterConfig;
import org.apache.lucene.index.LogDocMergePolicy;
import org.apache.lucene.index.LogMergePolicy;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.NumericDocValues;
Expand Down Expand Up @@ -183,6 +184,7 @@
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsInRelativeOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.emptyArray;
Expand All @@ -195,6 +197,7 @@
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.matchesRegex;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
Expand Down Expand Up @@ -2174,6 +2177,62 @@ public void testIndexWriterInfoStream() throws IllegalAccessException, IOExcepti
}
}

private static class MockMTAppender extends AbstractAppender {
private final List<String> messages = Collections.synchronizedList(new ArrayList<>());

List<String> messages () { return messages; }

MockMTAppender(final String name) throws IllegalAccessException {
super(name, RegexFilter.createFilter(".*(\n.*)*", new String[0],
false, null, null), null);
}

@Override
public void append(LogEvent event) {
final String formattedMessage = event.getMessage().getFormattedMessage();
if (event.getLevel() == Level.TRACE && formattedMessage.startsWith("merge thread")) {
messages.add(formattedMessage);
}
}
}

public void testMergeThreadLogging() throws IllegalAccessException, IOException {
MockMTAppender mockAppender = new MockMTAppender("testMergeThreadLogging");
mockAppender.start();

Logger rootLogger = LogManager.getRootLogger();
Level savedLevel = rootLogger.getLevel();
Loggers.addAppender(rootLogger, mockAppender);
Loggers.setLevel(rootLogger, Level.TRACE);

LogMergePolicy lmp = newLogMergePolicy();
lmp.setMergeFactor(2);
try (Store store = createStore()) {
InternalEngine engine = createEngine(defaultSettings, store, createTempDir(), lmp); // fmp
engine.index(indexForDoc(testParsedDocument("1", null, testDocument(), B_1, null)));
engine.index(indexForDoc(testParsedDocument("2", null, testDocument(), B_1, null)));
engine.index(indexForDoc(testParsedDocument("3", null, testDocument(), B_1, null)));
engine.index(indexForDoc(testParsedDocument("4", null, testDocument(), B_1, null)));
engine.forceMerge(true, 1, false, UUIDs.randomBase64UUID());
engine.flushAndClose();

long merges = engine.getMergeStats().getTotal();
if (merges > 0) {
List<String> threadMsgs =
mockAppender.messages().stream()
.filter(line -> line.startsWith("merge thread"))
.collect(Collectors.toList());
assertThat("messages:" + threadMsgs + ", merges=" + merges, threadMsgs.size(), greaterThanOrEqualTo(2));
assertThat(threadMsgs,
containsInRelativeOrder(matchesRegex("^merge thread .* start$"), matchesRegex("^merge thread .* merge segment.*$")));
}
} finally {
Loggers.removeAppender(rootLogger, mockAppender);
mockAppender.stop();
Loggers.setLevel(rootLogger, savedLevel);
}
}

public void testSeqNoAndCheckpoints() throws IOException, InterruptedException {
final int opCount = randomIntBetween(1, 256);
long primarySeqNo = SequenceNumbers.NO_OPS_PERFORMED;
Expand Down

0 comments on commit 0ca3d12

Please sign in to comment.