Skip to content

Commit

Permalink
Load: Correctly release memory when system is in READ_ONLY mode (#11961)
Browse files Browse the repository at this point in the history
  • Loading branch information
MiniSho authored Jan 24, 2024
1 parent 39b5893 commit 7f7285c
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,11 @@ private synchronized void forceAllocatedFromQuery(long sizeInBytes)
throw new LoadRuntimeOutOfMemoryException(
String.format(
"forceAllocate: failed to allocate memory from query engine after %d retries, "
+ "total query memory %s, used memory size %d bytes, "
+ "requested memory size %d bytes",
+ "total query memory %s, available memory for load %s bytes, "
+ "used memory size %d bytes, requested memory size %d bytes",
MEMORY_ALLOCATE_MAX_RETRIES,
IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForOperators(),
LocalExecutionPlanner.getInstance().getFreeMemoryForOperators(),
usedMemorySizeInBytes.get(),
sizeInBytes));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2682,8 +2682,13 @@ private InsertBaseStatement removeLogicalView(

@Override
public Analysis visitLoadFile(LoadTsFileStatement loadTsFileStatement, MPPQueryContext context) {
return new LoadTsfileAnalyzer(loadTsFileStatement, context, partitionFetcher, schemaFetcher)
.analyzeFileByFile();
LoadTsfileAnalyzer loadTsfileAnalyzer =
new LoadTsfileAnalyzer(loadTsFileStatement, context, partitionFetcher, schemaFetcher);
try {
return loadTsfileAnalyzer.analyzeFileByFile();
} finally {
loadTsfileAnalyzer.close();
}
}

/** get analysis according to statement and params */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,16 +177,13 @@ public Analysis analyzeFileByFile() {
i + 1, tsfileNum, String.format("%.3f", (i + 1) * 100.00 / tsfileNum));
}
} catch (IllegalArgumentException e) {
schemaAutoCreatorAndVerifier.close();
LOGGER.warn(
"Parse file {} to resource error, this TsFile maybe empty.", tsFile.getPath(), e);
throw new SemanticException(
String.format("TsFile %s is empty or incomplete.", tsFile.getPath()));
} catch (AuthException e) {
schemaAutoCreatorAndVerifier.close();
return createFailAnalysisForAuthException(e);
} catch (Exception e) {
schemaAutoCreatorAndVerifier.close();
LOGGER.warn("Parse file {} to resource error.", tsFile.getPath(), e);
throw new SemanticException(
String.format(
Expand All @@ -198,8 +195,6 @@ public Analysis analyzeFileByFile() {
schemaAutoCreatorAndVerifier.flush();
} catch (AuthException e) {
return createFailAnalysisForAuthException(e);
} finally {
schemaAutoCreatorAndVerifier.close();
}

LOGGER.info("Load - Analysis Stage: all tsfiles have been analyzed.");
Expand All @@ -210,6 +205,10 @@ public Analysis analyzeFileByFile() {
return analysis;
}

public void close() {
schemaAutoCreatorAndVerifier.close();
}

private void analyzeSingleTsFile(File tsFile) throws IOException, AuthException {
try (final TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) {
// can be reused when constructing tsfile resource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,69 +143,72 @@ public LoadTsFileScheduler(

@Override
public void start() {
stateMachine.transitionToRunning();
int tsFileNodeListSize = tsFileNodeList.size();
boolean isLoadSuccess = true;

for (int i = 0; i < tsFileNodeListSize; ++i) {
LoadSingleTsFileNode node = tsFileNodeList.get(i);
boolean isLoadSingleTsFileSuccess = true;
try {
if (node.isTsFileEmpty()) {
LOGGER.info(
"Load skip TsFile {}, because it has no data.",
node.getTsFileResource().getTsFilePath());

} else if (!node.needDecodeTsFile(
slotList ->
partitionFetcher.queryDataPartition(
slotList,
queryContext.getSession().getUserName()))) { // do not decode, load locally
isLoadSingleTsFileSuccess = loadLocally(node);
node.clean();

} else { // need decode, load locally or remotely, use two phases method
String uuid = UUID.randomUUID().toString();
dispatcher.setUuid(uuid);
allReplicaSets.clear();

boolean isFirstPhaseSuccess = firstPhase(node);
boolean isSecondPhaseSuccess =
secondPhase(isFirstPhaseSuccess, uuid, node.getTsFileResource());

node.clean();
if (!isFirstPhaseSuccess || !isSecondPhaseSuccess) {
isLoadSingleTsFileSuccess = false;
try {
stateMachine.transitionToRunning();
int tsFileNodeListSize = tsFileNodeList.size();
boolean isLoadSuccess = true;

for (int i = 0; i < tsFileNodeListSize; ++i) {
LoadSingleTsFileNode node = tsFileNodeList.get(i);
boolean isLoadSingleTsFileSuccess = true;
try {
if (node.isTsFileEmpty()) {
LOGGER.info(
"Load skip TsFile {}, because it has no data.",
node.getTsFileResource().getTsFilePath());

} else if (!node.needDecodeTsFile(
slotList ->
partitionFetcher.queryDataPartition(
slotList,
queryContext.getSession().getUserName()))) { // do not decode, load locally
isLoadSingleTsFileSuccess = loadLocally(node);
node.clean();

} else { // need decode, load locally or remotely, use two phases method
String uuid = UUID.randomUUID().toString();
dispatcher.setUuid(uuid);
allReplicaSets.clear();

boolean isFirstPhaseSuccess = firstPhase(node);
boolean isSecondPhaseSuccess =
secondPhase(isFirstPhaseSuccess, uuid, node.getTsFileResource());

node.clean();
if (!isFirstPhaseSuccess || !isSecondPhaseSuccess) {
isLoadSingleTsFileSuccess = false;
}
}
}
if (isLoadSingleTsFileSuccess) {
LOGGER.info(
"Load TsFile {} Successfully, load process [{}/{}]",
node.getTsFileResource().getTsFilePath(),
i + 1,
tsFileNodeListSize);
} else {
if (isLoadSingleTsFileSuccess) {
LOGGER.info(
"Load TsFile {} Successfully, load process [{}/{}]",
node.getTsFileResource().getTsFilePath(),
i + 1,
tsFileNodeListSize);
} else {
isLoadSuccess = false;
LOGGER.warn(
"Can not Load TsFile {}, load process [{}/{}]",
node.getTsFileResource().getTsFilePath(),
i + 1,
tsFileNodeListSize);
}
} catch (Exception e) {
isLoadSuccess = false;
stateMachine.transitionToFailed(e);
LOGGER.warn(
"Can not Load TsFile {}, load process [{}/{}]",
"LoadTsFileScheduler loads TsFile {} error",
node.getTsFileResource().getTsFilePath(),
i + 1,
tsFileNodeListSize);
e);
}
} catch (Exception e) {
isLoadSuccess = false;
stateMachine.transitionToFailed(e);
LOGGER.warn(
String.format(
"LoadTsFileScheduler loads TsFile %s error",
node.getTsFileResource().getTsFilePath()),
e);
}
if (isLoadSuccess) {
stateMachine.transitionToFinished();
}

} finally {
LoadTsFileMemoryManager.getInstance().releaseDataCacheMemoryBlock();
}
if (isLoadSuccess) {
stateMachine.transitionToFinished();
}
LoadTsFileMemoryManager.getInstance().releaseDataCacheMemoryBlock();
}

private boolean firstPhase(LoadSingleTsFileNode node) {
Expand Down

0 comments on commit 7f7285c

Please sign in to comment.