diff --git a/src/main/java/kiwi/storage/bitcask/log/LogSegment.java b/src/main/java/kiwi/storage/bitcask/log/LogSegment.java index f58f08c..0b548e7 100644 --- a/src/main/java/kiwi/storage/bitcask/log/LogSegment.java +++ b/src/main/java/kiwi/storage/bitcask/log/LogSegment.java @@ -114,10 +114,18 @@ public Path file() { return file; } + public void sync() { + try { + channel.force(false); + } catch (IOException ex) { + logger.error("Failed to sync log segment {}", file, ex); + } + } + public void close() { try { if (channel.isOpen()) { - channel.force(true); + sync(); channel.close(); } } catch (IOException ex) { diff --git a/src/main/java/kiwi/storage/bitcask/log/config/LogConfig.java b/src/main/java/kiwi/storage/bitcask/log/config/LogConfig.java index 38ea645..4398209 100644 --- a/src/main/java/kiwi/storage/bitcask/log/config/LogConfig.java +++ b/src/main/java/kiwi/storage/bitcask/log/config/LogConfig.java @@ -4,20 +4,40 @@ import java.nio.file.Path; import java.time.Duration; +import java.util.Set; public class LogConfig { public final Path dir; public final long segmentBytes; public final int keyDirBuilderThreads; + public final Sync sync; public final Compaction compaction; public LogConfig(Config config) { this.dir = Path.of(config.getString("dir")); this.segmentBytes = config.getLong("segment.bytes"); this.keyDirBuilderThreads = config.getInt("keydir.builder.threads"); + this.sync = new Sync(config.getConfig("sync")); this.compaction = new Compaction(config.getConfig("compaction")); } + public static class Sync { + public final String mode; + public final Duration interval; + public final Duration window; + + private static final Set MODES = Set.of("periodic", "batch", "lazy"); + + public Sync(Config config) { + this.mode = config.getString("mode").toLowerCase(); + if (!MODES.contains(this.mode)) { + throw new IllegalArgumentException("Invalid sync mode: " + this.mode); + } + this.interval = config.getDuration("periodic.interval"); + this.window = config.getDuration("batch.window"); + } + } + public static class Compaction { public final Duration interval; public final double minDirtyRatio; diff --git a/src/main/java/kiwi/storage/bitcask/sync/LazySegmentWriter.java b/src/main/java/kiwi/storage/bitcask/sync/LazySegmentWriter.java new file mode 100644 index 0000000..45e06e9 --- /dev/null +++ b/src/main/java/kiwi/storage/bitcask/sync/LazySegmentWriter.java @@ -0,0 +1,23 @@ +package kiwi.storage.bitcask.sync; + +import kiwi.error.KiwiWriteException; +import kiwi.storage.bitcask.log.LogSegment; +import kiwi.storage.bitcask.log.Record; + +import java.util.function.Supplier; + +/** + * A {@link SegmentWriter} that appends records to the active segment and defers + * fsync to operating system. This is useful for performance sensitive applications but + * may result in data loss in case of a crash. + */ +public class LazySegmentWriter extends SegmentWriter { + public LazySegmentWriter(Supplier activeSegmentSupplier) { + super(activeSegmentSupplier); + } + + @Override + protected void append(Record record) throws KiwiWriteException { + activeSegment().append(record); + } +} diff --git a/src/main/java/kiwi/storage/bitcask/sync/SegmentWriter.java b/src/main/java/kiwi/storage/bitcask/sync/SegmentWriter.java new file mode 100644 index 0000000..ed388e2 --- /dev/null +++ b/src/main/java/kiwi/storage/bitcask/sync/SegmentWriter.java @@ -0,0 +1,33 @@ +package kiwi.storage.bitcask.sync; + +import kiwi.error.KiwiWriteException; +import kiwi.storage.bitcask.log.LogSegment; +import kiwi.storage.bitcask.log.Record; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; + +public abstract class SegmentWriter implements AutoCloseable { + private final AtomicBoolean closed = new AtomicBoolean(false); + private final Supplier activeSegmentSupplier; + + public SegmentWriter(Supplier activeSegmentSupplier) { + this.activeSegmentSupplier = activeSegmentSupplier; + } + + abstract protected void append(Record record) throws KiwiWriteException; + + protected void sync() { + activeSegment().sync(); + } + + protected LogSegment activeSegment() { + return activeSegmentSupplier.get(); + } + + public void close() { + if (closed.compareAndSet(false, true)) { + activeSegment().close(); + } + } +} diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 4b36d7f..50c848e 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -10,6 +10,24 @@ kiwi { // The number of threads used to build the keydir. keydir.builder.threads = 8 + sync { + // The log sync mode. Can be "periodic", "batch", or "lazy". + // "periodic" syncs the log at a fixed interval, controlled by "interval" setting. + // "batch" syncs the log when the time between writes exceeds the "window" setting. + // "lazy" rely on the OS flushing mechanisms. + mode = "periodic" + + periodic { + // The interval between syncs in milliseconds for the "periodic" mode. + interval = 10000ms // 10s + } + + batch { + // How long engine waits for other writes before performing a sync in the "batch" mode. + window = 5ms + } + } + compaction { // How often the compaction process is triggered. interval = 10m