Skip to content

Commit

Permalink
Remove flaky threadpool terminate test
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Oct 4, 2023
1 parent e22e6ed commit dfc8110
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.IOUtils;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.common.SuppressForbidden;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;

import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand All @@ -30,7 +31,6 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.UnaryOperator;

import static org.opensearch.common.blobstore.stream.read.listener.ListenerTestUtils.CountingCompletionListener;
Expand All @@ -44,19 +44,19 @@
public class ReadContextListenerTests extends OpenSearchTestCase {

private Path path;
private ThreadPool threadPool;
private static ThreadPool threadPool;
private static final int NUMBER_OF_PARTS = 5;
private static final int PART_SIZE = 10;
private static final String TEST_SEGMENT_FILE = "test_segment_file";
private static final int MAX_CONCURRENT_STREAMS = 10;

@Before
public void setup() {
@BeforeClass
public static void setup() {
threadPool = new TestThreadPool(ReadContextListenerTests.class.getName());
}

@After
public void cleanup() {
@AfterClass
public static void cleanup() {
threadPool.shutdown();
}

Expand Down Expand Up @@ -212,46 +212,6 @@ public void testWriteToTempFile_alreadyExists_replacesFile() throws Exception {
assertFalse(Files.exists(readContextListener.getTmpFileLocation()));
}

/**
* Simulate a node drop by invoking shutDownNow on the thread pool while writing a part.
*/
public void testTerminateThreadsWhileWritingParts() throws Exception {
final String fileName = UUID.randomUUID().toString();
Path fileLocation = path.resolve(fileName);
List<ReadContext.StreamPartCreator> blobPartStreams = initializeBlobPartStreams();
CountDownLatch countDownLatch = new CountDownLatch(1);
ActionListener<String> completionListener = new LatchedActionListener<>(new PlainActionFuture<>(), countDownLatch);
ReadContextListener readContextListener = new ReadContextListener(
TEST_SEGMENT_FILE,
fileLocation,
completionListener,
threadPool,
UnaryOperator.identity(),
MAX_CONCURRENT_STREAMS
);
ByteArrayInputStream assertingStream = new ByteArrayInputStream(randomByteArrayOfLength(PART_SIZE)) {
@Override
public int read(byte[] b) throws IOException {
assertTrue("parts written to temp file location", Files.exists(readContextListener.getTmpFileLocation()));
threadPool.shutdownNow();
return super.read(b);
}
};
blobPartStreams.add(
NUMBER_OF_PARTS,
() -> CompletableFuture.supplyAsync(
() -> new InputStreamContainer(assertingStream, PART_SIZE, PART_SIZE * NUMBER_OF_PARTS + 1),
threadPool.generic()
)
);
ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS + 1, blobPartStreams, null);
readContextListener.onResponse(readContext);
countDownLatch.await(5, TimeUnit.SECONDS);
assertTrue(terminate(threadPool));
assertFalse(Files.exists(fileLocation));
assertFalse(Files.exists(readContextListener.getTmpFileLocation()));
}

private List<ReadContext.StreamPartCreator> initializeBlobPartStreams() {
List<ReadContext.StreamPartCreator> blobPartStreams = new ArrayList<>();
for (int partNumber = 0; partNumber < NUMBER_OF_PARTS; partNumber++) {
Expand Down

0 comments on commit dfc8110

Please sign in to comment.