Skip to content

Commit

Permalink
Optimized wal file deletion algorithm (apache#11948)
Browse files Browse the repository at this point in the history
Co-authored-by: Zhijia Cao <caozhijia@126.com>
  • Loading branch information
HeimingZ and caozj1011 authored Jan 23, 2024
1 parent 1416217 commit b07fafe
Show file tree
Hide file tree
Showing 17 changed files with 944 additions and 200 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -390,25 +390,25 @@ public class IoTDBConfig {
private boolean enableTimedFlushSeqMemtable = true;

/**
* If a memTable's created time is older than current time minus this, the memtable will be
* If a memTable's last update time is older than current time minus this, the memtable will be
* flushed to disk.(only check sequence tsfiles' memtables) Unit: ms
*/
private long seqMemtableFlushInterval = 3 * 60 * 60 * 1000L;
private long seqMemtableFlushInterval = 10 * 60 * 1000L;

/** The interval to check whether sequence memtables need flushing. Unit: ms */
private long seqMemtableFlushCheckInterval = 10 * 60 * 1000L;
private long seqMemtableFlushCheckInterval = 30 * 1000L;

/** Whether to timed flush unsequence tsfiles' memtables. */
private boolean enableTimedFlushUnseqMemtable = true;

/**
* If a memTable's created time is older than current time minus this, the memtable will be
* If a memTable's last update time is older than current time minus this, the memtable will be
* flushed to disk.(only check unsequence tsfiles' memtables) Unit: ms
*/
private long unseqMemtableFlushInterval = 3 * 60 * 60 * 1000L;
private long unseqMemtableFlushInterval = 10 * 60 * 1000L;

/** The interval to check whether unsequence memtables need flushing. Unit: ms */
private long unseqMemtableFlushCheckInterval = 10 * 60 * 1000L;
private long unseqMemtableFlushCheckInterval = 30 * 1000L;

/** The sort algorithm used in TVList */
private TVListSortAlgorithm tvListSortAlgorithm = TVListSortAlgorithm.TIM;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ public void incrementRecoveredFilesNum() {
if (lastLogTime + config.getRecoveryLogIntervalInMs() < System.currentTimeMillis()) {
logger.info(
"The data region {}[{}] has recovered {}%, please wait a moment.",
databaseName, dataRegionId, recoveredFilesNum * 1.0 / numOfFilesToRecover);
databaseName, dataRegionId, recoveredFilesNum * 100.0 / numOfFilesToRecover);
lastLogTime = System.currentTimeMillis();
}
}
Expand Down Expand Up @@ -1680,7 +1680,7 @@ public void timedFlushSeqMemTable() {
new ArrayList<>(workSequenceTsFileProcessors.values());
long timeLowerBound = System.currentTimeMillis() - config.getSeqMemtableFlushInterval();
for (TsFileProcessor tsFileProcessor : tsFileProcessors) {
if (tsFileProcessor.getWorkMemTableCreatedTime() < timeLowerBound) {
if (tsFileProcessor.getWorkMemTableUpdateTime() < timeLowerBound) {
logger.info(
"Exceed sequence memtable flush interval, so flush working memtable of time partition {} in database {}[{}]",
tsFileProcessor.getTimeRangeId(),
Expand All @@ -1706,7 +1706,7 @@ public void timedFlushUnseqMemTable() {
long timeLowerBound = System.currentTimeMillis() - config.getUnseqMemtableFlushInterval();

for (TsFileProcessor tsFileProcessor : tsFileProcessors) {
if (tsFileProcessor.getWorkMemTableCreatedTime() < timeLowerBound) {
if (tsFileProcessor.getWorkMemTableUpdateTime() < timeLowerBound) {
logger.info(
"Exceed unsequence memtable flush interval, so flush working memtable of time partition {} in database {}[{}]",
tsFileProcessor.getTimeRangeId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,14 @@ public abstract class AbstractMemTable implements IMemTable {

private final long createdTime = System.currentTimeMillis();

/** this time is updated by the timed flush, same as createdTime when the feature is disabled. */
private long updateTime = createdTime;
/**
* check whether this memTable has been updated since last timed flush check, update updateTime
* when changed
*/
private long lastTotalPointsNum = totalPointsNum;

private String database;
private String dataRegionId;

Expand Down Expand Up @@ -589,6 +597,16 @@ public long getCreatedTime() {
return createdTime;
}

/** Check whether updated since last get method */
@Override
public long getUpdateTime() {
if (lastTotalPointsNum != totalPointsNum) {
lastTotalPointsNum = totalPointsNum;
updateTime = System.currentTimeMillis();
}
return updateTime;
}

@Override
public FlushStatus getFlushStatus() {
return flushStatus;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ ReadOnlyMemChunk query(

long getCreatedTime();

long getUpdateTime();

FlushStatus getFlushStatus();

void setFlushStatus(FlushStatus flushStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1576,6 +1576,11 @@ public long getWorkMemTableCreatedTime() {
return workMemTable != null ? workMemTable.getCreatedTime() : Long.MAX_VALUE;
}

/** Return Long.MAX_VALUE if workMemTable is null */
public long getWorkMemTableUpdateTime() {
return workMemTable != null ? workMemTable.getUpdateTime() : Long.MAX_VALUE;
}

public long getLastWorkMemtableFlushTime() {
return lastWorkMemtableFlushTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,31 @@
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALNodeClosedException;
import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALMetaData;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileStatus;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileUtils;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener;
import org.apache.iotdb.db.utils.MmapUtil;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
Expand Down Expand Up @@ -102,6 +112,9 @@ public class WALBuffer extends AbstractWALBuffer {
// single thread to sync syncingBuffer to disk
private final ExecutorService syncBufferThread;

// manage wal files which have MemTableIds
private final Map<Long, Set<Long>> memTableIdsOfWal = new ConcurrentHashMap<>();

public WALBuffer(String identifier, String logDirectory) throws FileNotFoundException {
this(identifier, logDirectory, new CheckpointManager(identifier, logDirectory), 0, 0L);
}
Expand Down Expand Up @@ -183,6 +196,7 @@ public void write(WALEntry walEntry) {
/** This info class traverses some extra info from serializeThread to syncBufferThread. */
private static class SerializeInfo {
final WALMetaData metaData = new WALMetaData();
final Map<Long, Long> memTableId2WalDiskUsage = new HashMap<>();
final List<Checkpoint> checkpoints = new ArrayList<>();
final List<WALFlushListener> fsyncListeners = new ArrayList<>();
WALFlushListener rollWALFileWriterListener = null;
Expand Down Expand Up @@ -279,10 +293,11 @@ private void handleInfoEntry(WALEntry walEntry) {
return;
}

int size = byteBufferView.position();
int startPosition = byteBufferView.position();
int size;
try {
walEntry.serialize(byteBufferView);
size = byteBufferView.position() - size;
size = byteBufferView.position() - startPosition;
} catch (Exception e) {
logger.error(
"Fail to serialize WALEntry to wal node-{}'s buffer, discard it.", identifier, e);
Expand All @@ -304,7 +319,9 @@ private void handleInfoEntry(WALEntry walEntry) {
}
// update related info
totalSize += size;
info.metaData.add(size, searchIndex);
info.metaData.add(size, searchIndex, walEntry.getMemTableId());
info.memTableId2WalDiskUsage.compute(
walEntry.getMemTableId(), (k, v) -> v == null ? size : v + size);
walEntry.getWalFlushListener().getWalEntryHandler().setSize(size);
info.fsyncListeners.add(walEntry.getWalFlushListener());
}
Expand Down Expand Up @@ -509,6 +526,12 @@ public void run() {
switchSyncingBufferToIdle();
}

// update info
memTableIdsOfWal
.computeIfAbsent(currentWALFileVersion, memTableIds -> new HashSet<>())
.addAll(info.metaData.getMemTablesId());
checkpointManager.updateCostOfActiveMemTables(info.memTableId2WalDiskUsage);

boolean forceSuccess = false;
// try to roll log writer
if (info.rollWALFileWriterListener != null
Expand Down Expand Up @@ -682,4 +705,32 @@ public boolean isAllWALEntriesConsumed() {
public CheckpointManager getCheckpointManager() {
return checkpointManager;
}

public void removeMemTableIdsOfWal(Long walVersionId) {
this.memTableIdsOfWal.remove(walVersionId);
}

public Set<Long> getMemTableIds(long fileVersionId) {
if (fileVersionId >= currentWALFileVersion) {
return Collections.emptySet();
}
return memTableIdsOfWal.computeIfAbsent(
fileVersionId,
id -> {
try {
File file = WALFileUtils.getWALFile(new File(logDirectory), id);
return WALMetaData.readFromWALFile(
file, FileChannel.open(file.toPath(), StandardOpenOption.READ))
.getMemTablesId();
} catch (IOException e) {
logger.error("Fail to read memTable ids from the wal file {}.", id);
return new HashSet<>();
}
});
}

@TestOnly
public Map<Long, Set<Long>> getMemTableIdsOfWal() {
return memTableIdsOfWal;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

Expand All @@ -61,7 +62,7 @@ public class CheckpointManager implements AutoCloseable {
private final Lock infoLock = new ReentrantLock();
// region these variables should be protected by infoLock
// memTable id -> memTable info
private final Map<Long, MemTableInfo> memTableId2Info = new HashMap<>();
private final Map<Long, MemTableInfo> memTableId2Info = new ConcurrentHashMap<>();
// cache the biggest byte buffer to serialize checkpoint
// it's safe to use volatile here to make this reference thread-safe.
@SuppressWarnings("squid:S3077")
Expand All @@ -88,7 +89,7 @@ public CheckpointManager(String identifier, String logDirectory) throws FileNotF
logHeader();
}

public List<MemTableInfo> snapshotMemTableInfos() {
public List<MemTableInfo> activeOrPinnedMemTables() {
infoLock.lock();
try {
return new ArrayList<>(memTableId2Info.values());
Expand Down Expand Up @@ -121,7 +122,7 @@ private void logHeader() {
*/
private void makeGlobalInfoCP() {
long start = System.nanoTime();
List<MemTableInfo> memTableInfos = snapshotMemTableInfos();
List<MemTableInfo> memTableInfos = activeOrPinnedMemTables();
memTableInfos.removeIf(MemTableInfo::isFlushed);
Checkpoint checkpoint = new Checkpoint(CheckpointType.GLOBAL_MEMORY_TABLE_INFO, memTableInfos);
logByCachedByteBuffer(checkpoint);
Expand Down Expand Up @@ -315,20 +316,13 @@ public void unpinMemTable(long memTableId) throws MemTablePinException {
}
// endregion

/** Get MemTableInfo of oldest MemTable, whose first version id is smallest. */
public MemTableInfo getOldestMemTableInfo() {
/** Get MemTableInfo of oldest unpinned MemTable, whose first version id is smallest. */
public MemTableInfo getOldestUnpinnedMemTableInfo() {
// find oldest memTable
List<MemTableInfo> memTableInfos = snapshotMemTableInfos();
if (memTableInfos.isEmpty()) {
return null;
}
MemTableInfo oldestMemTableInfo = memTableInfos.get(0);
for (MemTableInfo memTableInfo : memTableInfos) {
if (oldestMemTableInfo.getFirstFileVersionId() > memTableInfo.getFirstFileVersionId()) {
oldestMemTableInfo = memTableInfo;
}
}
return oldestMemTableInfo;
return activeOrPinnedMemTables().stream()
.filter(memTableInfo -> !memTableInfo.isPinned())
.min(Comparator.comparingLong(MemTableInfo::getMemTableId))
.orElse(null);
}

/**
Expand All @@ -337,28 +331,36 @@ public MemTableInfo getOldestMemTableInfo() {
* @return Return {@link Long#MIN_VALUE} if no file is valid
*/
public long getFirstValidWALVersionId() {
List<MemTableInfo> memTableInfos = snapshotMemTableInfos();
List<MemTableInfo> memTableInfos = activeOrPinnedMemTables();
long firstValidVersionId = memTableInfos.isEmpty() ? Long.MIN_VALUE : Long.MAX_VALUE;
for (MemTableInfo memTableInfo : memTableInfos) {
firstValidVersionId = Math.min(firstValidVersionId, memTableInfo.getFirstFileVersionId());
}
return firstValidVersionId;
}

/** Update wal disk cost of active memTables. */
public void updateCostOfActiveMemTables(Map<Long, Long> memTableId2WalDiskUsage) {
for (Map.Entry<Long, Long> memTableWalUsage : memTableId2WalDiskUsage.entrySet()) {
memTableId2Info.computeIfPresent(
memTableWalUsage.getKey(),
(k, v) -> {
v.addWalDiskUsage(memTableWalUsage.getValue());
return v;
});
}
}

/** Get total cost of active memTables. */
public long getTotalCostOfActiveMemTables() {
List<MemTableInfo> memTableInfos = snapshotMemTableInfos();
List<MemTableInfo> memTableInfos = activeOrPinnedMemTables();
long totalCost = 0;
for (MemTableInfo memTableInfo : memTableInfos) {
// flushed memTables are not active
if (memTableInfo.isFlushed()) {
continue;
}
if (config.isEnableMemControl()) {
totalCost += memTableInfo.getMemTable().getTVListsRamCost();
} else {
totalCost++;
}
totalCost += memTableInfo.getWalDiskUsage();
}

return totalCost;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,23 @@
public class MemTableInfo implements WALEntryValue {
// memTable id 8 bytes, first version id 8 bytes
private static final int FIXED_SERIALIZED_SIZE = Long.BYTES * 2;
// memTable id
private long memTableId;
// path of the tsFile which this memTable will be flushed to
private String tsFilePath;
// version id of the file where this memTable's first WALEntry is located
private volatile long firstFileVersionId;

// memTable
private IMemTable memTable;
// memTable pin count
private int pinCount;
// memTable is flushed or not
private boolean flushed;
// memTable id
private long memTableId;
// data region id
private int dataRegionId;
// path of the tsFile which this memTable will be flushed to
private String tsFilePath;
// version id of the file where this memTable's first WALEntry is located
private volatile long firstFileVersionId;
// total wal usage of this memTable
private long walDiskUsage;

private MemTableInfo() {}

Expand Down Expand Up @@ -158,4 +160,12 @@ public long getFirstFileVersionId() {
public void setFirstFileVersionId(long firstFileVersionId) {
this.firstFileVersionId = firstFileVersionId;
}

public long getWalDiskUsage() {
return walDiskUsage;
}

public void addWalDiskUsage(long size) {
walDiskUsage += size;
}
}
Loading

0 comments on commit b07fafe

Please sign in to comment.