diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java index aa4d28bbd029..eff5f790bdb3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java @@ -65,10 +65,11 @@ private synchronized void forceAllocatedFromQuery(long sizeInBytes) throw new LoadRuntimeOutOfMemoryException( String.format( "forceAllocate: failed to allocate memory from query engine after %d retries, " - + "total query memory %s, used memory size %d bytes, " - + "requested memory size %d bytes", + + "total query memory %s, available memory for load %s bytes, " + + "used memory size %d bytes, requested memory size %d bytes", MEMORY_ALLOCATE_MAX_RETRIES, IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForOperators(), + LocalExecutionPlanner.getInstance().getFreeMemoryForOperators(), usedMemorySizeInBytes.get(), sizeInBytes)); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java index 7f36a0d099c2..d82b348bd984 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java @@ -2682,8 +2682,13 @@ private InsertBaseStatement removeLogicalView( @Override public Analysis visitLoadFile(LoadTsFileStatement loadTsFileStatement, MPPQueryContext context) { - return new LoadTsfileAnalyzer(loadTsFileStatement, context, partitionFetcher, schemaFetcher) - .analyzeFileByFile(); + LoadTsfileAnalyzer loadTsfileAnalyzer = + new LoadTsfileAnalyzer(loadTsFileStatement, context, partitionFetcher, schemaFetcher); + try { + return loadTsfileAnalyzer.analyzeFileByFile(); + } finally { + loadTsfileAnalyzer.close(); + } } /** get analysis according to statement and params */ diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java index 84860f5d847e..1c4b45d559e1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java @@ -177,16 +177,13 @@ public Analysis analyzeFileByFile() { i + 1, tsfileNum, String.format("%.3f", (i + 1) * 100.00 / tsfileNum)); } } catch (IllegalArgumentException e) { - schemaAutoCreatorAndVerifier.close(); LOGGER.warn( "Parse file {} to resource error, this TsFile maybe empty.", tsFile.getPath(), e); throw new SemanticException( String.format("TsFile %s is empty or incomplete.", tsFile.getPath())); } catch (AuthException e) { - schemaAutoCreatorAndVerifier.close(); return createFailAnalysisForAuthException(e); } catch (Exception e) { - schemaAutoCreatorAndVerifier.close(); LOGGER.warn("Parse file {} to resource error.", tsFile.getPath(), e); throw new SemanticException( String.format( @@ -198,8 +195,6 @@ public Analysis analyzeFileByFile() { schemaAutoCreatorAndVerifier.flush(); } catch (AuthException e) { return createFailAnalysisForAuthException(e); - } finally { - schemaAutoCreatorAndVerifier.close(); } LOGGER.info("Load - Analysis Stage: all tsfiles have been analyzed."); @@ -210,6 +205,10 @@ public Analysis analyzeFileByFile() { return analysis; } + public void close() { + schemaAutoCreatorAndVerifier.close(); + } + private void analyzeSingleTsFile(File tsFile) throws IOException, AuthException { try (final TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) { // can be reused when constructing tsfile resource diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java index 9a11b05e2dfd..579fc2ad2638 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java @@ -143,69 +143,72 @@ public LoadTsFileScheduler( @Override public void start() { - stateMachine.transitionToRunning(); - int tsFileNodeListSize = tsFileNodeList.size(); - boolean isLoadSuccess = true; - - for (int i = 0; i < tsFileNodeListSize; ++i) { - LoadSingleTsFileNode node = tsFileNodeList.get(i); - boolean isLoadSingleTsFileSuccess = true; - try { - if (node.isTsFileEmpty()) { - LOGGER.info( - "Load skip TsFile {}, because it has no data.", - node.getTsFileResource().getTsFilePath()); - - } else if (!node.needDecodeTsFile( - slotList -> - partitionFetcher.queryDataPartition( - slotList, - queryContext.getSession().getUserName()))) { // do not decode, load locally - isLoadSingleTsFileSuccess = loadLocally(node); - node.clean(); - - } else { // need decode, load locally or remotely, use two phases method - String uuid = UUID.randomUUID().toString(); - dispatcher.setUuid(uuid); - allReplicaSets.clear(); - - boolean isFirstPhaseSuccess = firstPhase(node); - boolean isSecondPhaseSuccess = - secondPhase(isFirstPhaseSuccess, uuid, node.getTsFileResource()); - - node.clean(); - if (!isFirstPhaseSuccess || !isSecondPhaseSuccess) { - isLoadSingleTsFileSuccess = false; + try { + stateMachine.transitionToRunning(); + int tsFileNodeListSize = tsFileNodeList.size(); + boolean isLoadSuccess = true; + + for (int i = 0; i < tsFileNodeListSize; ++i) { + LoadSingleTsFileNode node = tsFileNodeList.get(i); + boolean isLoadSingleTsFileSuccess = true; + try { + if (node.isTsFileEmpty()) { + LOGGER.info( + "Load skip TsFile {}, because it has no data.", + node.getTsFileResource().getTsFilePath()); + + } else if (!node.needDecodeTsFile( + slotList -> + partitionFetcher.queryDataPartition( + slotList, + queryContext.getSession().getUserName()))) { // do not decode, load locally + isLoadSingleTsFileSuccess = loadLocally(node); + node.clean(); + + } else { // need decode, load locally or remotely, use two phases method + String uuid = UUID.randomUUID().toString(); + dispatcher.setUuid(uuid); + allReplicaSets.clear(); + + boolean isFirstPhaseSuccess = firstPhase(node); + boolean isSecondPhaseSuccess = + secondPhase(isFirstPhaseSuccess, uuid, node.getTsFileResource()); + + node.clean(); + if (!isFirstPhaseSuccess || !isSecondPhaseSuccess) { + isLoadSingleTsFileSuccess = false; + } } - } - if (isLoadSingleTsFileSuccess) { - LOGGER.info( - "Load TsFile {} Successfully, load process [{}/{}]", - node.getTsFileResource().getTsFilePath(), - i + 1, - tsFileNodeListSize); - } else { + if (isLoadSingleTsFileSuccess) { + LOGGER.info( + "Load TsFile {} Successfully, load process [{}/{}]", + node.getTsFileResource().getTsFilePath(), + i + 1, + tsFileNodeListSize); + } else { + isLoadSuccess = false; + LOGGER.warn( + "Can not Load TsFile {}, load process [{}/{}]", + node.getTsFileResource().getTsFilePath(), + i + 1, + tsFileNodeListSize); + } + } catch (Exception e) { isLoadSuccess = false; + stateMachine.transitionToFailed(e); LOGGER.warn( - "Can not Load TsFile {}, load process [{}/{}]", + "LoadTsFileScheduler loads TsFile {} error", node.getTsFileResource().getTsFilePath(), - i + 1, - tsFileNodeListSize); + e); } - } catch (Exception e) { - isLoadSuccess = false; - stateMachine.transitionToFailed(e); - LOGGER.warn( - String.format( - "LoadTsFileScheduler loads TsFile %s error", - node.getTsFileResource().getTsFilePath()), - e); } + if (isLoadSuccess) { + stateMachine.transitionToFinished(); + } + + } finally { + LoadTsFileMemoryManager.getInstance().releaseDataCacheMemoryBlock(); } - if (isLoadSuccess) { - stateMachine.transitionToFinished(); - } - LoadTsFileMemoryManager.getInstance().releaseDataCacheMemoryBlock(); } private boolean firstPhase(LoadSingleTsFileNode node) {