Skip to content

Commit

Permalink
Configure segment sync modes
Browse files Browse the repository at this point in the history
  • Loading branch information
nemanja-m committed Nov 21, 2024
1 parent 6e54195 commit ccf0ce2
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 1 deletion.
10 changes: 9 additions & 1 deletion src/main/java/kiwi/storage/bitcask/log/LogSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,18 @@ public Path file() {
return file;
}

public void sync() {
try {
channel.force(false);
} catch (IOException ex) {
logger.error("Failed to sync log segment {}", file, ex);
}
}

public void close() {
try {
if (channel.isOpen()) {
channel.force(true);
sync();
channel.close();
}
} catch (IOException ex) {
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/kiwi/storage/bitcask/log/config/LogConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,40 @@

import java.nio.file.Path;
import java.time.Duration;
import java.util.Set;

public class LogConfig {
public final Path dir;
public final long segmentBytes;
public final int keyDirBuilderThreads;
public final Sync sync;
public final Compaction compaction;

public LogConfig(Config config) {
this.dir = Path.of(config.getString("dir"));
this.segmentBytes = config.getLong("segment.bytes");
this.keyDirBuilderThreads = config.getInt("keydir.builder.threads");
this.sync = new Sync(config.getConfig("sync"));
this.compaction = new Compaction(config.getConfig("compaction"));
}

public static class Sync {
public final String mode;
public final Duration interval;
public final Duration window;

private static final Set<String> MODES = Set.of("periodic", "batch", "lazy");

public Sync(Config config) {
this.mode = config.getString("mode").toLowerCase();
if (!MODES.contains(this.mode)) {
throw new IllegalArgumentException("Invalid sync mode: " + this.mode);
}
this.interval = config.getDuration("periodic.interval");
this.window = config.getDuration("batch.window");
}
}

public static class Compaction {
public final Duration interval;
public final double minDirtyRatio;
Expand Down
23 changes: 23 additions & 0 deletions src/main/java/kiwi/storage/bitcask/sync/LazySegmentWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package kiwi.storage.bitcask.sync;

import kiwi.error.KiwiWriteException;
import kiwi.storage.bitcask.log.LogSegment;
import kiwi.storage.bitcask.log.Record;

import java.util.function.Supplier;

/**
* A {@link SegmentWriter} that appends records to the active segment and defers
* fsync to operating system. This is useful for performance sensitive applications but
* may result in data loss in case of a crash.
*/
public class LazySegmentWriter extends SegmentWriter {
public LazySegmentWriter(Supplier<LogSegment> activeSegmentSupplier) {
super(activeSegmentSupplier);
}

@Override
protected void append(Record record) throws KiwiWriteException {
activeSegment().append(record);
}
}
33 changes: 33 additions & 0 deletions src/main/java/kiwi/storage/bitcask/sync/SegmentWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package kiwi.storage.bitcask.sync;

import kiwi.error.KiwiWriteException;
import kiwi.storage.bitcask.log.LogSegment;
import kiwi.storage.bitcask.log.Record;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

public abstract class SegmentWriter implements AutoCloseable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Supplier<LogSegment> activeSegmentSupplier;

public SegmentWriter(Supplier<LogSegment> activeSegmentSupplier) {
this.activeSegmentSupplier = activeSegmentSupplier;
}

abstract protected void append(Record record) throws KiwiWriteException;

protected void sync() {
activeSegment().sync();
}

protected LogSegment activeSegment() {
return activeSegmentSupplier.get();
}

public void close() {
if (closed.compareAndSet(false, true)) {
activeSegment().close();
}
}
}
18 changes: 18 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,24 @@ kiwi {
// The number of threads used to build the keydir.
keydir.builder.threads = 8

sync {
// The log sync mode. Can be "periodic", "batch", or "lazy".
// "periodic" syncs the log at a fixed interval, controlled by "interval" setting.
// "batch" syncs the log when the time between writes exceeds the "window" setting.
// "lazy" rely on the OS flushing mechanisms.
mode = "periodic"

periodic {
// The interval between syncs in milliseconds for the "periodic" mode.
interval = 10000ms // 10s
}

batch {
// How long engine waits for other writes before performing a sync in the "batch" mode.
window = 5ms
}
}

compaction {
// How often the compaction process is triggered.
interval = 10m
Expand Down

0 comments on commit ccf0ce2

Please sign in to comment.