Skip to content

Commit

Permalink
Add command for checksum validation
Browse files Browse the repository at this point in the history
  • Loading branch information
nemanja-m committed Dec 1, 2024
1 parent 5c61d9f commit ff3f5c1
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ build/
**/.DS_Store

# Ignore logs and temporary files
*.log
**/*.log
*.tmp
*.swp

Expand Down
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@ HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 CMD nc -z

COPY --from=build /home/gradle/src/kiwi-server/build/install/kiwi-server/ /app/

RUN echo -e '#!/bin/sh\njava -cp "/app/lib/*" kiwi.core.checksum.Run "$@"' > /app/checksum && chmod +x /app/checksum

CMD ["/app/bin/kiwi-server"]
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,31 @@ Default values and environment variables:
redis-cli -h localhost -p 6379
```

## Checksums

KiWi uses CRC32 checksums to ensure data integrity. The checksum is stored alongside the data.
There is a special command that can be used to verify the data integrity.

Checksum command uses available CPU cores to parallelize the checksum calculation. If checksum
fails,
the following error message will be displayed:

```text
Checksum failed: segment=00000000000000000000 position=444636640 checksum=2005447726 timestamp=1733002903067 ttl=0 keySize=16 valueSize=0
```

### Docker

```bash
docker run -it --rm -p 6379:6379 nemanjam/kiwi:latest sh checksum --dir [log dir] --threads [threads]
```

### Java

```bash
java -cp "kiwi-server/build/install/kiwi-server/lib/*" kiwi.core.checksum.Run --dir [log dir] --threads [threads]
```

## Benchmarks

KiWi can be evaluated with [redis-benchmark](https://redis.io/topics/benchmarks) utility command.
Expand Down
124 changes: 124 additions & 0 deletions kiwi-core/src/main/java/kiwi/core/checksum/Run.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package kiwi.core.checksum;

import kiwi.core.storage.bitcask.log.LogSegment;
import kiwi.core.storage.bitcask.log.Record;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.InvalidPathException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Stream;

public class Run {

public static void main(String[] args) throws IOException {
Path logDir = null;
int numThreads = Runtime.getRuntime().availableProcessors();

if (args.length == 0) {
System.err.println("Usage: COMMAND --dir <log-dir> [--threads <num-threads>]");
System.exit(1);
}

for (int i = 0; i < args.length; i++) {
switch (args[i]) {
case "--dir":
case "-d":
if (i + 1 < args.length) {
try {
logDir = Paths.get(args[++i]);

if (!logDir.toFile().isDirectory()) {
System.err.println("Path for --dir or -d is not a directory");
System.exit(1);
}
} catch (InvalidPathException e) {
System.err.println("Invalid path for --dir or -d");
System.exit(1);
}
} else {
System.err.println("Missing value for --dir or -d");
System.exit(1);
}
break;
case "--threads":
case "-t":
if (i + 1 < args.length) {
try {
numThreads = Integer.parseInt(args[++i]);
} catch (NumberFormatException e) {
System.err.println("Invalid number format for --threads or -t");
System.exit(1);
}
} else {
System.err.println("Missing value for --threads or -t");
System.exit(1);
}
break;
default:
System.err.println("Unknown argument: " + args[i]);
System.exit(1);
}
}

if (logDir == null) {
System.err.println("Argument '--dir' is required");
System.exit(1);
}

checksum(logDir, numThreads);

System.exit(0);
}

static void checksum(Path logDir, int numThreads) throws IOException {
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
List<Future<?>> futures = new ArrayList<>();

try (Stream<Path> paths = Files.walk(logDir)) {
List<LogSegment> segments = paths.filter(Files::isRegularFile)
.filter(path -> path.getFileName().toString().endsWith(".log"))
.map((path) -> LogSegment.open(path, true))
.toList();

for (LogSegment segment : segments) {
futures.add(executor.submit(() -> checkLogSegment(segment)));
}
}

futures.forEach(future -> {
try {
future.get();
} catch (Exception e) {
System.err.println("Error processing log: " + e.getMessage());
}
});

executor.shutdown();
}

static void checkLogSegment(LogSegment segment) {
long position = 0;
for (Record record : segment.getRecords()) {
if (!record.isValidChecksum()) {
String message = String.format(
"Checksum failed: segment=%s position=%s checksum=%d timestamp=%d ttl=%d keySize=%d valueSize=%d",
segment.name(),
position,
record.header().checksum(),
record.header().timestamp(),
record.header().ttl(),
record.header().keySize(),
record.header().valueSize());
System.out.println(message);
}
position += record.size();
}
}
}
4 changes: 3 additions & 1 deletion kiwi-core/src/main/java/kiwi/core/storage/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ public class Utils {
private static final Logger logger = LoggerFactory.getLogger(Utils.class);

public static long checksum(long timestamp, long ttl, Bytes key, Bytes value) {
ByteBuffer buffer = ByteBuffer.allocate(2 * Long.BYTES + key.size() + value.size());
ByteBuffer buffer = ByteBuffer.allocate(2 * (Long.BYTES + Integer.BYTES) + key.size() + value.size());
buffer.putLong(timestamp);
buffer.putLong(ttl);
buffer.putInt(key.size());
buffer.putInt(value.size());
buffer.put(key.get());
buffer.put(value.get());
CRC32 crc = new CRC32();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,10 @@ public double dirtyRatio(Map<Bytes, Long> keyTimestampMap) {
return (double) dirtyCount / total;
}

public Iterable<Record> getRecords() {
return () -> new RecordIterator(channel, keyHeader -> true);
}

public Iterable<Record> getActiveRecords(Map<Bytes, Long> keyTimestampMap) {
return () -> new RecordIterator(channel, keyHeader -> isActiveRecord(keyHeader, keyTimestampMap));
}
Expand Down

0 comments on commit ff3f5c1

Please sign in to comment.