Skip to content

Commit

Permalink
[Backport 2.x] Fix error in RemoteSegmentStoreDirectory when debug lo…
Browse files Browse the repository at this point in the history
…gging is enabled (#12328) (#12347)
  • Loading branch information
peternied authored Feb 16, 2024
1 parent 5d5a998 commit b7f8278
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add community_id ingest processor ([#12121](https://github.com/opensearch-project/OpenSearch/pull/12121))
- Introduce query level setting `index.query.max_nested_depth` limiting nested queries ([#3268](https://github.com/opensearch-project/OpenSearch/issues/3268)
- Add toString methods to MultiSearchRequest, MultiGetRequest and CreateIndexRequest ([#12163](https://github.com/opensearch-project/OpenSearch/pull/12163))
- Fix error in RemoteSegmentStoreDirectory when debug logging is enabled ([#12328](https://github.com/opensearch-project/OpenSearch/pull/12328))

### Dependencies
- Bump `com.squareup.okio:okio` from 3.7.0 to 3.8.0 ([#12290](https://github.com/opensearch-project/OpenSearch/pull/12290))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -745,9 +746,8 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
return;
}

List<String> metadataFilesEligibleToDelete = sortedMetadataFileList.subList(
lastNMetadataFilesToKeep,
sortedMetadataFileList.size()
List<String> metadataFilesEligibleToDelete = new ArrayList<>(
sortedMetadataFileList.subList(lastNMetadataFilesToKeep, sortedMetadataFileList.size())
);
Set<String> allLockFiles;
try {
Expand All @@ -764,7 +764,7 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
logger.debug(
"metadataFilesEligibleToDelete={} metadataFilesToBeDeleted={}",
metadataFilesEligibleToDelete,
metadataFilesEligibleToDelete
metadataFilesToBeDeleted
);

Map<String, UploadedSegmentMetadata> activeSegmentFilesMetadataMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.index.store;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.CodecUtil;
Expand Down Expand Up @@ -41,6 +42,8 @@
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.MockLogAppender;
import org.opensearch.test.junit.annotations.TestLogging;
import org.opensearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
Expand All @@ -58,6 +61,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.mockito.Mockito;

Expand Down Expand Up @@ -971,21 +975,38 @@ public void testDeleteStaleCommitsWithinThreshold() throws Exception {
verify(remoteMetadataDirectory, times(0)).openInput(any(String.class), eq(IOContext.DEFAULT));
}

@TestLogging(value = "_root:debug", reason = "Validate logging output")
public void testDeleteStaleCommitsActualDelete() throws Exception {
Map<String, Map<String, String>> metadataFilenameContentMapping = populateMetadata();
remoteSegmentStoreDirectory.init();

// popluateMetadata() adds stub to return 3 metadata files
// We are passing lastNMetadataFilesToKeep=2 here so that oldest 1 metadata file will be deleted
remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2);

for (String metadata : metadataFilenameContentMapping.get(metadataFilename3).values()) {
String uploadedFilename = metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1];
verify(remoteDataDirectory).deleteFile(uploadedFilename);
try (final MockLogAppender appender = MockLogAppender.createForLoggers(LogManager.getRootLogger())) {
appender.addExpectation(
new MockLogAppender.PatternSeenWithLoggerPrefixExpectation(
"Metadata files to delete message",
"org.opensearch.index.store.RemoteSegmentStoreDirectory",
Level.DEBUG,
"metadataFilesEligibleToDelete=\\[" + metadataFilename3 + "\\] metadataFilesToBeDeleted=\\[" + metadataFilename3 + "\\]"
)
);

final Map<String, Map<String, String>> metadataFilenameContentMapping = populateMetadata();
final List<String> filesToBeDeleted = metadataFilenameContentMapping.get(metadataFilename3)
.values()
.stream()
.map(metadata -> metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1])
.collect(Collectors.toList());

remoteSegmentStoreDirectory.init();

// popluateMetadata() adds stub to return 3 metadata files
// We are passing lastNMetadataFilesToKeep=2 here so that oldest 1 metadata file will be deleted
remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2);

for (final String file : filesToBeDeleted) {
verify(remoteDataDirectory).deleteFile(file);
}
assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true)));
verify(remoteMetadataDirectory).deleteFile(metadataFilename3);
appender.assertAllExpectationsMatched();
}
;
assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true)));
verify(remoteMetadataDirectory).deleteFile(metadataFilename3);
}

public void testDeleteStaleCommitsActualDeleteWithLocks() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,19 @@
import org.apache.logging.log4j.core.filter.RegexFilter;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.regex.Regex;
import org.opensearch.test.junit.annotations.TestLogging;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.fail;

/**
* Test appender that can be used to verify that certain events were logged correctly
Expand Down Expand Up @@ -259,6 +263,59 @@ public void assertMatched() {

}

/**
* Used for cases when the logger is dynamically named such as to include an index name or shard id
*
* Best used in conjunction with the root logger:
* {@code @TestLogging(value = "_root:debug", reason = "Validate logging output");}
* @see TestLogging
* */
public static class PatternSeenWithLoggerPrefixExpectation implements LoggingExpectation {
private final String expectationName;
private final String loggerPrefix;
private final Level level;
private final String messageMatchingRegex;

private final List<String> loggerMatches = new ArrayList<>();
private final AtomicBoolean eventSeen = new AtomicBoolean(false);

public PatternSeenWithLoggerPrefixExpectation(
final String expectationName,
final String loggerPrefix,
final Level level,
final String messageMatchingRegex
) {
this.expectationName = expectationName;
this.loggerPrefix = loggerPrefix;
this.level = level;
this.messageMatchingRegex = messageMatchingRegex;
}

@Override
public void match(final LogEvent event) {
if (event.getLevel() == level && event.getLoggerName().startsWith(loggerPrefix)) {
final String formattedMessage = event.getMessage().getFormattedMessage();
loggerMatches.add(formattedMessage);
if (formattedMessage.matches(messageMatchingRegex)) {
eventSeen.set(true);
}
}
}

@Override
public void assertMatched() {
if (!eventSeen.get()) {
final StringBuilder failureMessage = new StringBuilder();
failureMessage.append(expectationName + " was not seen, found " + loggerMatches.size() + " messages matching the logger.");
failureMessage.append("\r\nMessage matching regex: " + messageMatchingRegex);
if (!loggerMatches.isEmpty()) {
failureMessage.append("\r\nMessage details:\r\n" + String.join("\r\n", loggerMatches));
}
fail(failureMessage.toString());
}
}
}

private static String getLoggerName(String name) {
if (name.startsWith("org.opensearch.")) {
name = name.substring("org.opensearch.".length());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@

import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.math.BigInteger;
import java.net.InetAddress;
import java.net.UnknownHostException;
Expand All @@ -170,6 +172,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand Down Expand Up @@ -639,7 +642,32 @@ protected static void checkStaticState(boolean afterClass) throws Exception {
try {
// ensure that there are no status logger messages which would indicate a problem with our Log4j usage; we map the
// StatusData instances to Strings as otherwise their toString output is useless

final Function<StatusData, String> statusToString = (statusData) -> {
try (final StringWriter sw = new StringWriter(); final PrintWriter pw = new PrintWriter(sw)) {

pw.print(statusData.getLevel());
pw.print(":");
pw.print(statusData.getMessage().getFormattedMessage());

if (statusData.getStackTraceElement() != null) {
final var messageSource = statusData.getStackTraceElement();
pw.println("Source:");
pw.println(messageSource.getFileName() + "@" + messageSource.getLineNumber());
}

if (statusData.getThrowable() != null) {
pw.println("Throwable:");
statusData.getThrowable().printStackTrace(pw);
}
return sw.toString();
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
};

assertThat(
statusData.stream().map(statusToString::apply).collect(Collectors.joining("\r\n")),
statusData.stream().map(status -> status.getMessage().getFormattedMessage()).collect(Collectors.toList()),
empty()
);
Expand Down

0 comments on commit b7f8278

Please sign in to comment.