Skip to content

Commit

Permalink
[IOTDB-6240] Pipe: Fix bug with PipeMemoryManager allocating memory f…
Browse files Browse the repository at this point in the history
…or the DisruptorQueue (#11485)

Allocating memory for the DisruptorQueue requires ensuring that the memory size is a power of 2, so the Memory Framework provides `Function` method for users to customize their own reallocate strategy.
  • Loading branch information
MiniSho authored Nov 7, 2023
1 parent 4e686b9 commit 256194a
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ public DisruptorQueue(EventHandler<PipeRealtimeEvent> eventHandler) {
config.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes();

allocatedMemoryBlock =
PipeResourceManager.memory().tryAllocate(ringBufferSize * ringBufferEntrySizeInBytes);
PipeResourceManager.memory()
.tryAllocate(
ringBufferSize * ringBufferEntrySizeInBytes, (currentSize) -> currentSize / 2);

disruptor =
new Disruptor<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.function.Function;

public class PipeMemoryManager {

Expand Down Expand Up @@ -145,6 +146,11 @@ private long calculateTabletSizeInBytes(Tablet tablet) {
}

public synchronized PipeMemoryBlock tryAllocate(long sizeInBytes) {
return tryAllocate(sizeInBytes, (currentSize) -> currentSize * 2 / 3);
}

public synchronized PipeMemoryBlock tryAllocate(
long sizeInBytes, Function<Long, Long> customAllocateStrategy) {
if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
return new PipeMemoryBlock(sizeInBytes);
}
Expand All @@ -171,7 +177,9 @@ public synchronized PipeMemoryBlock tryAllocate(long sizeInBytes) {
}

sizeToAllocateInBytes =
Math.max(sizeToAllocateInBytes * 2 / 3, MEMORY_ALLOCATE_MIN_SIZE_IN_BYTES);
Math.max(
customAllocateStrategy.apply(sizeToAllocateInBytes),
MEMORY_ALLOCATE_MIN_SIZE_IN_BYTES);
}

LOGGER.warn(
Expand Down

0 comments on commit 256194a

Please sign in to comment.