Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Test] Cleanup tests with Files#createTempDir() #492

Merged
merged 11 commits into from
Jan 17, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.TreeMap;

import com.google.common.collect.Lists;
import com.google.common.io.Files;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
Expand All @@ -47,19 +46,18 @@
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.util.Progress;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class RssInMemoryRemoteMergerTest {

@Test
public void mergerTest() throws IOException {
public void mergerTest(@TempDir File tmpDir) throws IOException {
JobConf jobConf = new JobConf();
FileSystem fs = FileSystem.getLocal(jobConf);
LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR);
File tmpDir = Files.createTempDir();
tmpDir.deleteOnExit();
JobID jobId = new JobID("a", 0);
final TaskAttemptID mapId1 = new TaskAttemptID(
new TaskID(jobId, TaskType.MAP, 1), 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.TreeMap;

import com.google.common.collect.Lists;
import com.google.common.io.Files;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
Expand All @@ -47,6 +46,7 @@
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.util.Progress;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import static org.junit.jupiter.api.Assertions.assertEquals;

Expand All @@ -62,13 +62,11 @@ public class RssRemoteMergeManagerTest {
new TaskID(jobId, TaskType.REDUCE, 0), 0);

@Test
public void mergerTest() throws Throwable {
public void mergerTest(@TempDir File tmpDir) throws Throwable {
JobConf jobConf = new JobConf();
final FileSystem fs = FileSystem.getLocal(jobConf);
final LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR);

File tmpDir = Files.createTempDir();
tmpDir.deleteOnExit();
jobConf.set("mapreduce.reduce.memory.totalbytes", "1024");
jobConf.set("mapreduce.reduce.shuffle.memory.limit.percent", "0.01");
jobConf.set("mapreduce.reduce.shuffle.merge.percent", "0.1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
import java.util.stream.Collectors;

import com.google.common.collect.Sets;
import com.google.common.io.Files;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -77,13 +77,10 @@ private static List<Integer> findAvailablePorts(int num) throws IOException {
return ports;
}

private static void createAndStartShuffleServerWithTags(Set<String> tags) throws Exception {
private static void createAndStartShuffleServerWithTags(Set<String> tags, File tmpDir) throws Exception {
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 4000);

File tmpDir = Files.createTempDir();
tmpDir.deleteOnExit();

File dataDir1 = new File(tmpDir, "data1");
File dataDir2 = new File(tmpDir, "data2");
String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath();
Expand Down Expand Up @@ -112,24 +109,27 @@ private static void createAndStartShuffleServerWithTags(Set<String> tags) throws
}

@BeforeAll
public static void setupServers() throws Exception {
public static void setupServers(@TempDir File tmpDir) throws Exception {
CoordinatorConf coordinatorConf = getCoordinatorConf();
createCoordinatorServer(coordinatorConf);

for (CoordinatorServer coordinator : coordinators) {
coordinator.start();
}

File dir1 = new File(tmpDir, "server1");
for (int i = 0; i < 2; i++) {
createAndStartShuffleServerWithTags(Sets.newHashSet());
createAndStartShuffleServerWithTags(Sets.newHashSet(), dir1);
}

File dir2 = new File(tmpDir, "server2");
for (int i = 0; i < 2; i++) {
createAndStartShuffleServerWithTags(Sets.newHashSet("fixed"));
createAndStartShuffleServerWithTags(Sets.newHashSet("fixed"), dir2);
}

File dir3 = new File(tmpDir, "server3");
for (int i = 0; i < 2; i++) {
createAndStartShuffleServerWithTags(Sets.newHashSet("elastic"));
createAndStartShuffleServerWithTags(Sets.newHashSet("elastic"), dir3);
}

// Wait all shuffle servers registering to coordinator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import java.util.HashSet;

import com.google.common.collect.Sets;
import com.google.common.io.Files;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -50,7 +50,7 @@ public class CoordinatorAssignmentTest extends CoordinatorTestBase {
LOCALHOST + ":" + COORDINATOR_PORT_1 + "," + LOCALHOST + ":" + COORDINATOR_PORT_2;

@BeforeAll
public static void setupServers() throws Exception {
public static void setupServers(@TempDir File tmpDir) throws Exception {
CoordinatorConf coordinatorConf1 = getCoordinatorConf();
coordinatorConf1.setLong(CoordinatorConf.COORDINATOR_APP_EXPIRED, 2000);
coordinatorConf1.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, SHUFFLE_NODES_MAX);
Expand All @@ -65,7 +65,6 @@ public static void setupServers() throws Exception {

for (int i = 0; i < SERVER_NUM; i++) {
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
File tmpDir = Files.createTempDir();
File dataDir1 = new File(tmpDir, "data1");
String basePath = dataDir1.getAbsolutePath();
shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE_HDFS.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.roaringbitmap.longlong.Roaring64NavigableMap;

import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
Expand All @@ -56,15 +56,14 @@
public class DiskErrorToleranceTest extends ShuffleReadWriteBase {
private ShuffleServerGrpcClient shuffleServerClient;

private static File serverTmpDir = Files.createTempDir();
@TempDir private static File serverTmpDir;
private static File data1 = new File(serverTmpDir, "data1");
private static File data2 = new File(serverTmpDir, "data2");
private List<ShuffleServerInfo> shuffleServerInfo =
Lists.newArrayList(new ShuffleServerInfo("127.0.0.1-20001", LOCALHOST, SHUFFLE_SERVER_PORT));

@BeforeAll
public static void setupServers() throws Exception {
serverTmpDir.deleteOnExit();
CoordinatorConf coordinatorConf = getCoordinatorConf();
createCoordinatorServer(coordinatorConf);
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import java.util.concurrent.TimeUnit;

import com.google.common.collect.Sets;
import com.google.common.io.Files;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import org.apache.uniffle.client.request.RssGetShuffleAssignmentsRequest;
import org.apache.uniffle.client.response.ResponseStatusCode;
Expand All @@ -46,13 +46,12 @@

public class HealthCheckCoordinatorGrpcTest extends CoordinatorTestBase {

private static File serverTmpDir = Files.createTempDir();
@TempDir private static File serverTmpDir;
private static File tempDataFile = new File(serverTmpDir, "data");
private static int writeDataSize;

@BeforeAll
public static void setupServers() throws Exception {
serverTmpDir.deleteOnExit();
File data1 = new File(serverTmpDir, "data1");
data1.mkdirs();
File data2 = new File(serverTmpDir, "data2");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.uniffle.test;

import java.io.File;
import java.util.Arrays;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.io.TempDir;

import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
Expand All @@ -30,10 +32,10 @@
public class MultiStorageHdfsFallbackTest extends MultiStorageFaultToleranceBase {

@BeforeAll
public static void setupServers() throws Exception {
public static void setupServers(@TempDir File tmpDir) throws Exception {
final CoordinatorConf coordinatorConf = getCoordinatorConf();
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
String basePath = generateBasePath();
String basePath = generateBasePath(tmpDir);
shuffleServerConf.setDouble(ShuffleServerConf.CLEANUP_THRESHOLD, 0.0);
shuffleServerConf.setDouble(ShuffleServerConf.HIGH_WATER_MARK_OF_WRITE, 100.0);
shuffleServerConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 100);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.uniffle.test;

import java.io.File;
import java.util.Arrays;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.io.TempDir;

import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
Expand All @@ -30,13 +32,13 @@
import org.apache.uniffle.storage.common.Storage;
import org.apache.uniffle.storage.util.StorageType;

public class MultiStorageLocalfileFallbackTest extends MultiStorageFaultToleranceBase {
public class MultiStorageLocalFileFallbackTest extends MultiStorageFaultToleranceBase {

@BeforeAll
public static void setupServers() throws Exception {
public static void setupServers(@TempDir File tmpDir) throws Exception {
final CoordinatorConf coordinatorConf = getCoordinatorConf();
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
String basePath = generateBasePath();
String basePath = generateBasePath(tmpDir);
shuffleServerConf.setDouble(ShuffleServerConf.CLEANUP_THRESHOLD, 0.0);
shuffleServerConf.setDouble(ShuffleServerConf.HIGH_WATER_MARK_OF_WRITE, 100.0);
shuffleServerConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 100);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.roaringbitmap.longlong.Roaring64NavigableMap;

import org.apache.uniffle.client.factory.ShuffleServerClientFactory;
Expand Down Expand Up @@ -72,12 +71,10 @@ public class QuorumTest extends ShuffleReadWriteBase {
private static ShuffleServerInfo fakedShuffleServerInfo4;
private MockedShuffleWriteClientImpl shuffleWriteClientImpl;

public static MockedShuffleServer createServer(int id) throws Exception {
public static MockedShuffleServer createServer(int id, File tmpDir) throws Exception {
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 8000);
shuffleServerConf.setLong("rss.server.heartbeat.interval", 5000);
File tmpDir = Files.createTempDir();
tmpDir.deleteOnExit();
File dataDir1 = new File(tmpDir, id + "_1");
File dataDir2 = new File(tmpDir, id + "_2");
String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath();
Expand All @@ -88,21 +85,19 @@ public static MockedShuffleServer createServer(int id) throws Exception {
return new MockedShuffleServer(shuffleServerConf);
}

@BeforeAll
public static void initCluster() throws Exception {
@BeforeEach
public void initCluster(@TempDir File tmpDir) throws Exception {
CoordinatorConf coordinatorConf = getCoordinatorConf();
createCoordinatorServer(coordinatorConf);

ShuffleServerConf shuffleServerConf = getShuffleServerConf();
shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 8000);
File tmpDir = Files.createTempDir();
tmpDir.deleteOnExit();

shuffleServers.add(createServer(0));
shuffleServers.add(createServer(1));
shuffleServers.add(createServer(2));
shuffleServers.add(createServer(3));
shuffleServers.add(createServer(4));
shuffleServers.add(createServer(0, tmpDir));
shuffleServers.add(createServer(1, tmpDir));
shuffleServers.add(createServer(2, tmpDir));
shuffleServers.add(createServer(3, tmpDir));
shuffleServers.add(createServer(4, tmpDir));

shuffleServerInfo0 =
new ShuffleServerInfo("127.0.0.1-20001", shuffleServers.get(0).getIp(), SHUFFLE_SERVER_PORT + 0);
Expand Down Expand Up @@ -132,6 +127,17 @@ public static void initCluster() throws Exception {
new ShuffleServerInfo("127.0.0.1-20004", shuffleServers.get(2).getIp(), SHUFFLE_SERVER_PORT + 400);
fakedShuffleServerInfo4 =
new ShuffleServerInfo("127.0.0.1-20005", shuffleServers.get(2).getIp(), SHUFFLE_SERVER_PORT + 500);

// spark.rss.data.replica=3
// spark.rss.data.replica.write=2
// spark.rss.data.replica.read=2
((ShuffleServerGrpcClient)ShuffleServerClientFactory
.getInstance().getShuffleServerClient("GRPC", shuffleServerInfo0)).adjustTimeout(200);
((ShuffleServerGrpcClient)ShuffleServerClientFactory
.getInstance().getShuffleServerClient("GRPC", shuffleServerInfo1)).adjustTimeout(200);
((ShuffleServerGrpcClient)ShuffleServerClientFactory
.getInstance().getShuffleServerClient("GRPC", shuffleServerInfo2)).adjustTimeout(200);

Thread.sleep(2000);
}

Expand All @@ -146,26 +152,12 @@ public static void cleanCluster() throws Exception {
coordinators = Lists.newArrayList();
}

@BeforeEach
public void initEnv() throws Exception {
// spark.rss.data.replica=3
// spark.rss.data.replica.write=2
// spark.rss.data.replica.read=2
((ShuffleServerGrpcClient)ShuffleServerClientFactory
.getInstance().getShuffleServerClient("GRPC", shuffleServerInfo0)).adjustTimeout(200);
((ShuffleServerGrpcClient)ShuffleServerClientFactory
.getInstance().getShuffleServerClient("GRPC", shuffleServerInfo1)).adjustTimeout(200);
((ShuffleServerGrpcClient)ShuffleServerClientFactory
.getInstance().getShuffleServerClient("GRPC", shuffleServerInfo2)).adjustTimeout(200);
}

@AfterEach
public void cleanEnv() throws Exception {
if (shuffleWriteClientImpl != null) {
shuffleWriteClientImpl.close();
}
cleanCluster();
initCluster();
// we need recovery `rpcTime`, or some unit tests may fail
((ShuffleServerGrpcClient)ShuffleServerClientFactory
kaijchen marked this conversation as resolved.
Show resolved Hide resolved
.getInstance().getShuffleServerClient("GRPC", shuffleServerInfo0)).adjustTimeout(60000);
Expand Down Expand Up @@ -471,7 +463,7 @@ public void case4() throws Exception {
}

@Test
public void case5() throws Exception {
public void case5(@TempDir File tmpDir) throws Exception {
// this case is to simulate server restarting.
String testAppId = "case5";
registerShuffleServer(testAppId, 3, 2, 2, true);
Expand Down Expand Up @@ -505,7 +497,7 @@ public void case5() throws Exception {

// when one server is restarted, getShuffleResult should success
shuffleServers.get(1).stopServer();
shuffleServers.set(1, createServer(1));
shuffleServers.set(1, createServer(1, tmpDir));
shuffleServers.get(1).start();
report = shuffleWriteClientImpl.getShuffleResult("GRPC",
Sets.newHashSet(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2),
Expand All @@ -514,7 +506,7 @@ public void case5() throws Exception {

// when two servers are restarted, getShuffleResult should fail
shuffleServers.get(2).stopServer();
shuffleServers.set(2, createServer(2));
shuffleServers.set(2, createServer(2, tmpDir));
shuffleServers.get(2).start();
try {
report = shuffleWriteClientImpl.getShuffleResult("GRPC",
Expand Down
Loading