From dfc8110cf39f03b7f7b5f6e4cf026cf80b5694d8 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Wed, 4 Oct 2023 16:54:00 -0700 Subject: [PATCH] Remove flaky threadpool terminate test Signed-off-by: Marc Handalian --- .../read/listener/ReadContextListener.java | 1 - .../listener/ReadContextListenerTests.java | 54 +++---------------- 2 files changed, 7 insertions(+), 48 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListener.java b/server/src/main/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListener.java index 6ab9cff3fa458..c77f2384ace0d 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListener.java +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListener.java @@ -10,7 +10,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.IOUtils; import org.opensearch.action.support.GroupedActionListener; import org.opensearch.common.SuppressForbidden; diff --git a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java b/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java index 3d4fab992d93e..0163c2275e7f4 100644 --- a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java +++ b/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java @@ -17,8 +17,9 @@ import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; -import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -30,7 +31,6 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.function.UnaryOperator; import static org.opensearch.common.blobstore.stream.read.listener.ListenerTestUtils.CountingCompletionListener; @@ -44,19 +44,19 @@ public class ReadContextListenerTests extends OpenSearchTestCase { private Path path; - private ThreadPool threadPool; + private static ThreadPool threadPool; private static final int NUMBER_OF_PARTS = 5; private static final int PART_SIZE = 10; private static final String TEST_SEGMENT_FILE = "test_segment_file"; private static final int MAX_CONCURRENT_STREAMS = 10; - @Before - public void setup() { + @BeforeClass + public static void setup() { threadPool = new TestThreadPool(ReadContextListenerTests.class.getName()); } - @After - public void cleanup() { + @AfterClass + public static void cleanup() { threadPool.shutdown(); } @@ -212,46 +212,6 @@ public void testWriteToTempFile_alreadyExists_replacesFile() throws Exception { assertFalse(Files.exists(readContextListener.getTmpFileLocation())); } - /** - * Simulate a node drop by invoking shutDownNow on the thread pool while writing a part. - */ - public void testTerminateThreadsWhileWritingParts() throws Exception { - final String fileName = UUID.randomUUID().toString(); - Path fileLocation = path.resolve(fileName); - List blobPartStreams = initializeBlobPartStreams(); - CountDownLatch countDownLatch = new CountDownLatch(1); - ActionListener completionListener = new LatchedActionListener<>(new PlainActionFuture<>(), countDownLatch); - ReadContextListener readContextListener = new ReadContextListener( - TEST_SEGMENT_FILE, - fileLocation, - completionListener, - threadPool, - UnaryOperator.identity(), - MAX_CONCURRENT_STREAMS - ); - ByteArrayInputStream assertingStream = new ByteArrayInputStream(randomByteArrayOfLength(PART_SIZE)) { - @Override - public int read(byte[] b) throws IOException { - assertTrue("parts written to temp file location", Files.exists(readContextListener.getTmpFileLocation())); - threadPool.shutdownNow(); - return super.read(b); - } - }; - blobPartStreams.add( - NUMBER_OF_PARTS, - () -> CompletableFuture.supplyAsync( - () -> new InputStreamContainer(assertingStream, PART_SIZE, PART_SIZE * NUMBER_OF_PARTS + 1), - threadPool.generic() - ) - ); - ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS + 1, blobPartStreams, null); - readContextListener.onResponse(readContext); - countDownLatch.await(5, TimeUnit.SECONDS); - assertTrue(terminate(threadPool)); - assertFalse(Files.exists(fileLocation)); - assertFalse(Files.exists(readContextListener.getTmpFileLocation())); - } - private List initializeBlobPartStreams() { List blobPartStreams = new ArrayList<>(); for (int partNumber = 0; partNumber < NUMBER_OF_PARTS; partNumber++) {