Skip to content

Commit

Permalink
Revert "LIHADOOP-52961 LevelDB state store support for Magnet"
Browse files Browse the repository at this point in the history
This reverts commit edd2afa.
  • Loading branch information
zhouyejoe committed Aug 2, 2021
1 parent 68f1271 commit be3f5f8
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 517 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,6 @@ public void executorRemoved(String executorId, String appId) {

public void close() {
blockManager.close();
mergeManager.close();
}

private void checkAuth(TransportClient client, String appId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,4 @@ public interface MergedShuffleFileManager {
* @param appId application ID
*/
String[] getMergedBlockDirs(String appId);

/**
* Optionally close any resources associated the MergedShuffleFileManager, such as the
* leveldb for state persistence.
*/
default void close() {}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;

Expand All @@ -51,7 +46,7 @@

import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
import org.apache.spark.network.client.StreamCallbackWithID;
import org.apache.spark.network.shuffle.RemoteBlockPushResolver.*;
import org.apache.spark.network.shuffle.RemoteBlockPushResolver.MergeShuffleFile;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
import org.apache.spark.network.shuffle.protocol.MergeStatuses;
Expand All @@ -78,7 +73,7 @@ public void before() throws IOException {
MapConfigProvider provider = new MapConfigProvider(
ImmutableMap.of("spark.shuffle.server.minChunkSizeInMergedShuffleFile", "4"));
conf = new TransportConf("shuffle", provider);
pushResolver = new RemoteBlockPushResolver(conf, null);
pushResolver = new RemoteBlockPushResolver(conf);
registerExecutor(TEST_APP, prepareLocalDirs(localDirs));
}

Expand Down Expand Up @@ -381,7 +376,7 @@ public void testFailureInAStreamDoesNotInterfereWithStreamWhichIsWriting() throw
}
}

@Test(expected = IllegalArgumentException.class)
@Test(expected = NullPointerException.class)
public void testUpdateLocalDirsOnlyOnce() throws IOException {
String testApp = "updateLocalDirsOnlyOnceTest";
Path[] activeLocalDirs = createLocalDirs(1);
Expand All @@ -401,7 +396,7 @@ public void testUpdateLocalDirsOnlyOnce() throws IOException {
pushResolver.getMergedBlockDirs(testApp);
} catch (Throwable e) {
assertTrue(e.getMessage()
.startsWith("application " + testApp + " is not registered."));
.startsWith("application " + testApp + " is not registered or NM was restarted."));
Throwables.propagate(e);
}
}
Expand Down Expand Up @@ -826,30 +821,6 @@ public void testFailureAfterDuplicateBlockDoesNotInterfereActiveStream() throws
}
}

@Test
public void testJsonSerializationOfPushShufflePartitionInfo() throws IOException {
ObjectMapper mapper = new ObjectMapper();
AppShufflePartitionId partitionId = new AppShufflePartitionId("foo", 1, 5);
String partitionIdJson = mapper.writeValueAsString(partitionId);
AppShufflePartitionId parsedPartitionId = mapper.readValue(partitionIdJson,
AppShufflePartitionId.class);
assertEquals(partitionId, parsedPartitionId);

AppPathsInfo pathInfo = new AppPathsInfo("user", new String[]{"/foo", "/bar"});
String pathInfoJson = mapper.writeValueAsString(pathInfo);
AppPathsInfo parsedPathInfo = mapper.readValue(pathInfoJson, AppPathsInfo.class);
assertEquals(pathInfo, parsedPathInfo);

// Intentionally keep these hard-coded strings in here, to check backwards-compatibility.
// its not legacy yet, but keeping this here in case anybody changes it
String legacyPartitionIdJson = "{\"appId\":\"foo\", \"shuffleId\":\"1\", \"reduceId\":\"5\"}";
assertEquals(partitionId, mapper.readValue(legacyPartitionIdJson,
AppShufflePartitionId.class));
String legacyAppPathInfoJson = "{\"user\":\"user\", \"activeLocalDirs\": "
+ "[\"/foo\", \"/bar\"]}";
assertEquals(pathInfo, mapper.readValue(legacyAppPathInfoJson, AppPathsInfo.class));
}

private void useTestFiles(boolean useTestIndexFile, boolean useTestMetaFile) throws IOException {
pushResolver = new RemoteBlockPushResolver(conf) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ public class YarnShuffleService extends AuxiliaryService {

private static final String RECOVERY_FILE_NAME = "registeredExecutors.ldb";
private static final String SECRETS_RECOVERY_FILE_NAME = "sparkShuffleRecovery.ldb";
private static final String MERGE_MANAGER_FILE_NAME = "mergeManager.ldb";

// Whether failure during service initialization should stop the NM.
@VisibleForTesting
Expand Down Expand Up @@ -167,10 +166,6 @@ public class YarnShuffleService extends AuxiliaryService {
@VisibleForTesting
File secretsFile;

// Where to store & reload merge manager info for recovering state after an NM restart
@VisibleForTesting
File mergeManagerFile;

private DB db;

public YarnShuffleService() {
Expand Down Expand Up @@ -220,12 +215,11 @@ protected void serviceInit(Configuration externalConf) throws Exception {
// when it comes back
if (_recoveryPath != null) {
registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME);
mergeManagerFile = initRecoveryDb(MERGE_MANAGER_FILE_NAME);
}

TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(_conf));
MergedShuffleFileManager shuffleMergeManager = newMergedShuffleFileManagerInstance(
transportConf, mergeManagerFile);
transportConf);
blockHandler = new ExternalBlockHandler(
transportConf, registeredExecutorFile, shuffleMergeManager);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,14 @@
*/
package org.apache.spark.network.shuffle

import java.io.{DataOutputStream, File}
import java.nio.channels.FileChannel
import java.io.File
import java.util.concurrent.ConcurrentMap

import org.apache.hadoop.yarn.api.records.ApplicationId
import org.fusesource.leveldbjni.JniDBFactory
import org.iq80.leveldb.{DB, Options}

import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId
import org.apache.spark.network.shuffle.RemoteBlockPushResolver._
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo

/**
Expand All @@ -46,79 +44,14 @@ object ShuffleTestAccessor {
Option(resolver.executors.get(id))
}

def getAppPathsInfo(
appId: String,
mergeManager: RemoteBlockPushResolver): Option[AppPathsInfo] = {
Option(mergeManager.appsPathInfo.get(appId))
}

def registeredExecutorFile(resolver: ExternalShuffleBlockResolver): File = {
resolver.registeredExecutorFile
}

def recoveryFile(mergeManager: RemoteBlockPushResolver): File = {
mergeManager.recoveryFile
}

def shuffleServiceLevelDB(resolver: ExternalShuffleBlockResolver): DB = {
resolver.db
}

def mergeManagerLevelDB(mergeManager: RemoteBlockPushResolver): DB = {
mergeManager.db
}

def updateAppPathInfo(
info: AppPathsInfo,
appId: String,
localDirs: Array[String],
db: DB): Unit = {
info.updateActiveLocalDirs(appId, "", localDirs, db)
}

def getOrCreateAppShufflePartitionInfo(
mergeManager: RemoteBlockPushResolver,
partitionId: AppShufflePartitionId): AppShufflePartitionInfo = {
mergeManager.getOrCreateAppShufflePartitionInfo(partitionId)
}

def generateDataFileName(partitionId: AppShufflePartitionId): String = {
partitionId.generateFileName()
}

def generateIndexFileName(partitionId: AppShufflePartitionId): String = {
partitionId.generateIndexFileName()
}

def generateMetaFileName(partitionId: AppShufflePartitionId): String = {
partitionId.generateMetaFileName()
}

def getFile(mergeManager: RemoteBlockPushResolver, appId: String, fileName: String): File = {
mergeManager.getFile(appId, fileName)
}

def getPartitionFileHandlers(
partitionInfo: AppShufflePartitionInfo): (FileChannel, FileChannel, DataOutputStream) = {
(partitionInfo.channel, partitionInfo.metaChannel, partitionInfo.indexWriteStream)
}

def closePartitionFiles(partitionInfo: AppShufflePartitionInfo): Unit = {
partitionInfo.closeAllFiles()
}

def reloadActiveAppPathInfo(
mergeMgr: RemoteBlockPushResolver,
db: DB): ConcurrentMap[String, AppPathsInfo] = {
mergeMgr.reloadActiveAppPathInfo(db)
}

def reloadActiveAppShufflePartitions(
mergeMgr: RemoteBlockPushResolver,
db: DB): ConcurrentMap[AppShufflePartitionId, AppShufflePartitionInfo] = {
mergeMgr.reloadActiveAppShufflePartitions(db)
}

def reloadRegisteredExecutors(
file: File): ConcurrentMap[ExternalShuffleBlockResolver.AppExecId, ExecutorShuffleInfo] = {
val options: Options = new Options
Expand Down
Loading

0 comments on commit be3f5f8

Please sign in to comment.