diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java index 67acb16540f0..39a23e0a9d54 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java @@ -54,7 +54,9 @@ public DisruptorQueue(EventHandler eventHandler) { config.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes(); allocatedMemoryBlock = - PipeResourceManager.memory().tryAllocate(ringBufferSize * ringBufferEntrySizeInBytes); + PipeResourceManager.memory() + .tryAllocate( + ringBufferSize * ringBufferEntrySizeInBytes, (currentSize) -> currentSize / 2); disruptor = new Disruptor<>( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java index 97688e7b87dc..f0c8b89c735e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java @@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory; import java.util.List; +import java.util.function.Function; public class PipeMemoryManager { @@ -145,6 +146,11 @@ private long calculateTabletSizeInBytes(Tablet tablet) { } public synchronized PipeMemoryBlock tryAllocate(long sizeInBytes) { + return tryAllocate(sizeInBytes, (currentSize) -> currentSize * 2 / 3); + } + + public synchronized PipeMemoryBlock tryAllocate( + long sizeInBytes, Function customAllocateStrategy) { if (!PIPE_MEMORY_MANAGEMENT_ENABLED) { return new PipeMemoryBlock(sizeInBytes); } @@ -171,7 +177,9 @@ public synchronized PipeMemoryBlock tryAllocate(long sizeInBytes) { } sizeToAllocateInBytes = - Math.max(sizeToAllocateInBytes * 2 / 3, MEMORY_ALLOCATE_MIN_SIZE_IN_BYTES); + Math.max( + customAllocateStrategy.apply(sizeToAllocateInBytes), + MEMORY_ALLOCATE_MIN_SIZE_IN_BYTES); } LOGGER.warn(