Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load: Correctly release memory when system is in READ_ONLY mode #11961

Merged
merged 2 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,11 @@ private synchronized void forceAllocatedFromQuery(long sizeInBytes)
throw new LoadRuntimeOutOfMemoryException(
String.format(
"forceAllocate: failed to allocate memory from query engine after %d retries, "
+ "total query memory %s, used memory size %d bytes, "
+ "requested memory size %d bytes",
+ "total query memory %s, available memory for load %s bytes, "
+ "used memory size %d bytes, requested memory size %d bytes",
MEMORY_ALLOCATE_MAX_RETRIES,
IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForOperators(),
LocalExecutionPlanner.getInstance().getFreeMemoryForOperators(),
usedMemorySizeInBytes.get(),
sizeInBytes));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2682,8 +2682,13 @@ private InsertBaseStatement removeLogicalView(

@Override
public Analysis visitLoadFile(LoadTsFileStatement loadTsFileStatement, MPPQueryContext context) {
return new LoadTsfileAnalyzer(loadTsFileStatement, context, partitionFetcher, schemaFetcher)
.analyzeFileByFile();
LoadTsfileAnalyzer loadTsfileAnalyzer =
new LoadTsfileAnalyzer(loadTsFileStatement, context, partitionFetcher, schemaFetcher);
try {
return loadTsfileAnalyzer.analyzeFileByFile();
} finally {
loadTsfileAnalyzer.close();
}
}

/** get analysis according to statement and params */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,16 +177,13 @@ public Analysis analyzeFileByFile() {
i + 1, tsfileNum, String.format("%.3f", (i + 1) * 100.00 / tsfileNum));
}
} catch (IllegalArgumentException e) {
schemaAutoCreatorAndVerifier.close();
LOGGER.warn(
"Parse file {} to resource error, this TsFile maybe empty.", tsFile.getPath(), e);
throw new SemanticException(
String.format("TsFile %s is empty or incomplete.", tsFile.getPath()));
} catch (AuthException e) {
schemaAutoCreatorAndVerifier.close();
return createFailAnalysisForAuthException(e);
} catch (Exception e) {
schemaAutoCreatorAndVerifier.close();
LOGGER.warn("Parse file {} to resource error.", tsFile.getPath(), e);
throw new SemanticException(
String.format(
Expand All @@ -198,8 +195,6 @@ public Analysis analyzeFileByFile() {
schemaAutoCreatorAndVerifier.flush();
} catch (AuthException e) {
return createFailAnalysisForAuthException(e);
} finally {
schemaAutoCreatorAndVerifier.close();
}

LOGGER.info("Load - Analysis Stage: all tsfiles have been analyzed.");
Expand All @@ -210,6 +205,10 @@ public Analysis analyzeFileByFile() {
return analysis;
}

public void close() {
schemaAutoCreatorAndVerifier.close();
}

private void analyzeSingleTsFile(File tsFile) throws IOException, AuthException {
try (final TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) {
// can be reused when constructing tsfile resource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,69 +143,72 @@ public LoadTsFileScheduler(

@Override
public void start() {
stateMachine.transitionToRunning();
int tsFileNodeListSize = tsFileNodeList.size();
boolean isLoadSuccess = true;

for (int i = 0; i < tsFileNodeListSize; ++i) {
LoadSingleTsFileNode node = tsFileNodeList.get(i);
boolean isLoadSingleTsFileSuccess = true;
try {
if (node.isTsFileEmpty()) {
LOGGER.info(
"Load skip TsFile {}, because it has no data.",
node.getTsFileResource().getTsFilePath());

} else if (!node.needDecodeTsFile(
slotList ->
partitionFetcher.queryDataPartition(
slotList,
queryContext.getSession().getUserName()))) { // do not decode, load locally
isLoadSingleTsFileSuccess = loadLocally(node);
node.clean();

} else { // need decode, load locally or remotely, use two phases method
String uuid = UUID.randomUUID().toString();
dispatcher.setUuid(uuid);
allReplicaSets.clear();

boolean isFirstPhaseSuccess = firstPhase(node);
boolean isSecondPhaseSuccess =
secondPhase(isFirstPhaseSuccess, uuid, node.getTsFileResource());

node.clean();
if (!isFirstPhaseSuccess || !isSecondPhaseSuccess) {
isLoadSingleTsFileSuccess = false;
try {
stateMachine.transitionToRunning();
int tsFileNodeListSize = tsFileNodeList.size();
boolean isLoadSuccess = true;

for (int i = 0; i < tsFileNodeListSize; ++i) {
LoadSingleTsFileNode node = tsFileNodeList.get(i);
boolean isLoadSingleTsFileSuccess = true;
try {
if (node.isTsFileEmpty()) {
LOGGER.info(
"Load skip TsFile {}, because it has no data.",
node.getTsFileResource().getTsFilePath());

} else if (!node.needDecodeTsFile(
slotList ->
partitionFetcher.queryDataPartition(
slotList,
queryContext.getSession().getUserName()))) { // do not decode, load locally
isLoadSingleTsFileSuccess = loadLocally(node);
node.clean();

} else { // need decode, load locally or remotely, use two phases method
String uuid = UUID.randomUUID().toString();
dispatcher.setUuid(uuid);
allReplicaSets.clear();

boolean isFirstPhaseSuccess = firstPhase(node);
boolean isSecondPhaseSuccess =
secondPhase(isFirstPhaseSuccess, uuid, node.getTsFileResource());

node.clean();
if (!isFirstPhaseSuccess || !isSecondPhaseSuccess) {
isLoadSingleTsFileSuccess = false;
}
}
}
if (isLoadSingleTsFileSuccess) {
LOGGER.info(
"Load TsFile {} Successfully, load process [{}/{}]",
node.getTsFileResource().getTsFilePath(),
i + 1,
tsFileNodeListSize);
} else {
if (isLoadSingleTsFileSuccess) {
LOGGER.info(
"Load TsFile {} Successfully, load process [{}/{}]",
node.getTsFileResource().getTsFilePath(),
i + 1,
tsFileNodeListSize);
} else {
isLoadSuccess = false;
LOGGER.warn(
"Can not Load TsFile {}, load process [{}/{}]",
node.getTsFileResource().getTsFilePath(),
i + 1,
tsFileNodeListSize);
}
} catch (Exception e) {
isLoadSuccess = false;
stateMachine.transitionToFailed(e);
LOGGER.warn(
"Can not Load TsFile {}, load process [{}/{}]",
"LoadTsFileScheduler loads TsFile {} error",
node.getTsFileResource().getTsFilePath(),
i + 1,
tsFileNodeListSize);
e);
}
} catch (Exception e) {
isLoadSuccess = false;
stateMachine.transitionToFailed(e);
LOGGER.warn(
String.format(
"LoadTsFileScheduler loads TsFile %s error",
node.getTsFileResource().getTsFilePath()),
e);
}
if (isLoadSuccess) {
stateMachine.transitionToFinished();
}

} finally {
LoadTsFileMemoryManager.getInstance().releaseDataCacheMemoryBlock();
}
if (isLoadSuccess) {
stateMachine.transitionToFinished();
}
LoadTsFileMemoryManager.getInstance().releaseDataCacheMemoryBlock();
}

private boolean firstPhase(LoadSingleTsFileNode node) {
Expand Down
Loading