Skip to content

Commit

Permalink
Load: Fix Memory Allocation and Release Mismatch in LoadTsFileDataCac…
Browse files Browse the repository at this point in the history
…heMemoryBlock (#14375) (#14466)
  • Loading branch information
MiniSho authored Dec 18, 2024
1 parent d0dbd73 commit e12b735
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,15 @@ public synchronized void reserveFromFreeMemoryForOperators(

public synchronized void releaseToFreeMemoryForOperators(final long memoryInBytes) {
freeMemoryForOperators += memoryInBytes;

if (freeMemoryForOperators > ALLOCATE_MEMORY_FOR_OPERATORS) {
LOGGER.error(
"The free memory {} is more than allocated memory {}, last released memory: {}",
freeMemoryForOperators,
ALLOCATE_MEMORY_FOR_OPERATORS,
memoryInBytes);
freeMemoryForOperators = ALLOCATE_MEMORY_FOR_OPERATORS;
}
}

public long getAllocateMemoryForOperators() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ public class LoadTsFileAnalyzeSchemaMemoryBlock extends LoadTsFileAbstractMemory
}

@Override
public boolean hasEnoughMemory(long memoryTobeAddedInBytes) {
public synchronized boolean hasEnoughMemory(long memoryTobeAddedInBytes) {
return memoryUsageInBytes.get() + memoryTobeAddedInBytes <= totalMemorySizeInBytes;
}

@Override
public void addMemoryUsage(long memoryInBytes) {
memoryUsageInBytes.addAndGet(memoryInBytes);
public synchronized void addMemoryUsage(long memoryInBytes) {
if (memoryUsageInBytes.addAndGet(memoryInBytes) > totalMemorySizeInBytes) {
LOGGER.warn("{} has exceed total memory size", this);
}

MetricService.getInstance()
.getOrCreateGauge(
Expand All @@ -63,7 +65,7 @@ public void addMemoryUsage(long memoryInBytes) {
}

@Override
public void reduceMemoryUsage(long memoryInBytes) {
public synchronized void reduceMemoryUsage(long memoryInBytes) {
if (memoryUsageInBytes.addAndGet(-memoryInBytes) < 0) {
LOGGER.warn("{} has reduce memory usage to negative", this);
}
Expand All @@ -78,7 +80,7 @@ public void reduceMemoryUsage(long memoryInBytes) {
}

@Override
protected void releaseAllMemory() {
protected synchronized void releaseAllMemory() {
if (memoryUsageInBytes.get() != 0) {
LOGGER.warn(
"Try to release memory from a memory block {} which has not released all memory", this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,9 @@ public class LoadTsFileDataCacheMemoryBlock extends LoadTsFileAbstractMemoryBloc
private static final Logger LOGGER =
LoggerFactory.getLogger(LoadTsFileDataCacheMemoryBlock.class);
private static final long MINIMUM_MEMORY_SIZE_IN_BYTES = 1024 * 1024L; // 1 MB
private static final int MAX_ASK_FOR_MEMORY_COUNT = 256; // must be a power of 2
private static final long EACH_ASK_MEMORY_SIZE_IN_BYTES =
Math.max(
MINIMUM_MEMORY_SIZE_IN_BYTES,
LoadTsFileMemoryManager.MEMORY_TOTAL_SIZE_FROM_QUERY_IN_BYTES >> 4);

private final AtomicLong limitedMemorySizeInBytes;
private final AtomicLong memoryUsageInBytes;
private final AtomicInteger askForMemoryCount;
private final AtomicInteger referenceCount;

LoadTsFileDataCacheMemoryBlock(long initialLimitedMemorySizeInBytes) {
Expand All @@ -54,7 +48,6 @@ public class LoadTsFileDataCacheMemoryBlock extends LoadTsFileAbstractMemoryBloc

this.limitedMemorySizeInBytes = new AtomicLong(initialLimitedMemorySizeInBytes);
this.memoryUsageInBytes = new AtomicLong(0L);
this.askForMemoryCount = new AtomicInteger(1);
this.referenceCount = new AtomicInteger(0);
}

Expand All @@ -64,29 +57,17 @@ public boolean hasEnoughMemory(long memoryTobeAddedInBytes) {
}

@Override
public void addMemoryUsage(long memoryInBytes) {
memoryUsageInBytes.addAndGet(memoryInBytes);

askForMemoryCount.getAndUpdate(
count -> {
if ((count & (count - 1)) == 0) {
// count is a power of 2
long actuallyAllocateMemorySizeInBytes =
MEMORY_MANAGER.tryAllocateFromQuery(EACH_ASK_MEMORY_SIZE_IN_BYTES);
limitedMemorySizeInBytes.addAndGet(actuallyAllocateMemorySizeInBytes);
if (actuallyAllocateMemorySizeInBytes < EACH_ASK_MEMORY_SIZE_IN_BYTES) {
return (count & (MAX_ASK_FOR_MEMORY_COUNT - 1)) + 1;
} else {
return 1;
}
}
return (count & (MAX_ASK_FOR_MEMORY_COUNT - 1)) + 1;
});
public synchronized void addMemoryUsage(long memoryInBytes) {
if (memoryUsageInBytes.addAndGet(memoryInBytes) > limitedMemorySizeInBytes.get()) {
LOGGER.warn("{} has exceed total memory size", this);
}
}

@Override
public void reduceMemoryUsage(long memoryInBytes) {
memoryUsageInBytes.addAndGet(-memoryInBytes);
public synchronized void reduceMemoryUsage(long memoryInBytes) {
if (memoryUsageInBytes.addAndGet(-memoryInBytes) < 0) {
LOGGER.warn("{} has reduce memory usage to negative", this);
}
}

@Override
Expand All @@ -113,6 +94,7 @@ public boolean doShrink(long shrinkMemoryInBytes) {
return false;
}

MEMORY_MANAGER.releaseToQuery(shrinkMemoryInBytes);
limitedMemorySizeInBytes.addAndGet(-shrinkMemoryInBytes);
return true;
}
Expand Down Expand Up @@ -140,8 +122,6 @@ public String toString() {
+ limitedMemorySizeInBytes.get()
+ ", memoryUsageInBytes="
+ memoryUsageInBytes.get()
+ ", askForMemoryCount="
+ askForMemoryCount.get()
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class LoadTsFileMemoryManager {
private final AtomicLong usedMemorySizeInBytes = new AtomicLong(0);
private LoadTsFileDataCacheMemoryBlock dataCacheMemoryBlock;

private synchronized void forceAllocatedFromQuery(long sizeInBytes)
private synchronized void forceAllocateFromQuery(long sizeInBytes)
throws LoadRuntimeOutOfMemoryException {
for (int i = 0; i < MEMORY_ALLOCATE_MAX_RETRIES; i++) {
// allocate memory from queryEngine
Expand Down Expand Up @@ -82,6 +82,12 @@ public synchronized long tryAllocateFromQuery(long sizeInBytes) {
}

public synchronized void releaseToQuery(long sizeInBytes) {
if (usedMemorySizeInBytes.get() < sizeInBytes) {
LOGGER.error(
"Load: Attempting to release more memory ({}) than allocated ({})",
sizeInBytes,
usedMemorySizeInBytes.get());
}
usedMemorySizeInBytes.addAndGet(-sizeInBytes);
QUERY_ENGINE_MEMORY_MANAGER.releaseToFreeMemoryForOperators(sizeInBytes);
this.notifyAll();
Expand All @@ -90,7 +96,7 @@ public synchronized void releaseToQuery(long sizeInBytes) {
public synchronized LoadTsFileAnalyzeSchemaMemoryBlock allocateAnalyzeSchemaMemoryBlock(
long sizeInBytes) throws LoadRuntimeOutOfMemoryException {
try {
forceAllocatedFromQuery(sizeInBytes);
forceAllocateFromQuery(sizeInBytes);
} catch (LoadRuntimeOutOfMemoryException e) {
if (dataCacheMemoryBlock != null && dataCacheMemoryBlock.doShrink(sizeInBytes)) {
return new LoadTsFileAnalyzeSchemaMemoryBlock(sizeInBytes);
Expand Down

0 comments on commit e12b735

Please sign in to comment.