Skip to content

Commit

Permalink
Move test (#9)
Browse files Browse the repository at this point in the history
* sync

* sync

* MonitoredFileConsumerTest

* clean-up tests

* move test

* revert MonitoredFileConsumer

* static import fix

* refactor record offset calculation

* 2 sec delay before delete is enough for catchup

* optimize statestore update
  • Loading branch information
kortemik authored Jun 14, 2023
1 parent 4b1d7bf commit fe48797
Show file tree
Hide file tree
Showing 10 changed files with 976 additions and 131 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
<dependency>
<groupId>com.teragrep</groupId>
<artifactId>rlo_12</artifactId>
<version>1.0.3</version>
<version>1.0.4</version>
</dependency>
<!-- caches -->
<dependency>
Expand Down
128 changes: 91 additions & 37 deletions src/main/java/com/teragrep/rlo_13/MonitoredFileConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.function.Consumer;

class MonitoredFileConsumer implements Consumer<MonitoredFile> {
Expand All @@ -43,71 +44,115 @@ public MonitoredFileConsumer(FileChannelCache fileChannelCache, StateStore state
this.fileRecordConsumer = fileRecordConsumer;
}

void readFile(MonitoredFile monitoredFile) {
void readFile(Path filePath) {

// object to pass metadata within
FileRecord fileRecord = new FileRecord(monitoredFile.getPath());
FileRecord fileRecord = new FileRecord(filePath);

long absolutePosition = stateStore.getOffset(monitoredFile.getPath());
long lastRecordStart = absolutePosition;
// !! trace log existing position
FileChannel fileChannel = fileChannelCache.acquire(monitoredFile.getPath());
FileChannel fileChannel = fileChannelCache.acquire(filePath);
if (fileChannel == null) {
if(LOGGER.isTraceEnabled()) {
LOGGER.trace("Gave up on <[{}]> due to null FileChannel.", monitoredFile.getPath());
LOGGER.trace("Gave up on <[{}]> due to null FileChannel.",filePath);
}
stateStore.deleteOffset(monitoredFile.getPath());
stateStore.deleteOffset(filePath);
return;
}

if (LOGGER.isTraceEnabled()) {
try {
LOGGER.trace("fileChannel position <{}> size <{}>", fileChannel.position(), fileChannel.size());
} catch (IOException ignored) {

}
}


ByteBuffer byteBuffer = ByteBuffer.allocateDirect(32*1024);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ByteBuffer outputBuffer = ByteBuffer.allocateDirect(1024*1024); // todo configurable


long lastRecordEnd = stateStore.getOffset(filePath);
try {
long fileChannelSize = fileChannel.size();
if (fileChannelSize < absolutePosition) {
if (fileChannel.size() < lastRecordEnd) {
if(LOGGER.isTraceEnabled()) {
LOGGER.trace(
"Path <[{}]> truncated: size: <{}> is LT position <{}>.",
monitoredFile.getPath(),
fileChannelSize,
absolutePosition
filePath,
fileChannel.size(),
lastRecordEnd
);
}
absolutePosition = 0;
stateStore.setOffset(monitoredFile.getPath(), absolutePosition);
lastRecordEnd = 0;
stateStore.setOffset(filePath, lastRecordEnd);
}
fileChannel.position(absolutePosition);
fileRecord.setStartOffset(absolutePosition); // set initial startingPosition
// note that this does not read all the changes, it will be re-read when something happens during read
while (fileChannel.position() < fileChannelSize) {
fileChannel.read(byteBuffer);

LOGGER.trace("lastRecordEnd <{}> for <{}>", lastRecordEnd, filePath);

fileChannel.position(lastRecordEnd);
fileRecord.setStartOffset(lastRecordEnd); // set initial startingPosition

long bytesRead = 0;
while (fileChannel.position() < fileChannel.size()) {
bytesRead = fileChannel.read(byteBuffer);

if (bytesRead < 1) {
return;
}

byteBuffer.flip(); // reading
while (byteBuffer.hasRemaining()) {
byte b = byteBuffer.get();
absolutePosition++;
if (b != 10) { // newline
byteArrayOutputStream.write(b);

boolean maximumRecordSize = outputBuffer.position() == outputBuffer.capacity() - 1;

if (b != '\n' && !maximumRecordSize) {
outputBuffer.put(b);
}
else {
byteArrayOutputStream.write(10);
LOGGER.trace("Produced fileRecord at <{}>", absolutePosition);
fileRecord.setEndOffset(absolutePosition);
fileRecord.setRecord(byteArrayOutputStream.toByteArray());
outputBuffer.put(b);
long recordEnd = lastRecordEnd + outputBuffer.position();
outputBuffer.flip();


if (LOGGER.isTraceEnabled()) {
LOGGER.trace(
"Produced fileRecord from start <{}> end <{}>, maximumRecordSize <{}>",
lastRecordEnd,
recordEnd,
maximumRecordSize
);
}

byte[] bytes = new byte[outputBuffer.remaining()];
outputBuffer.get(bytes);

fileRecord.setEndOffset(recordEnd);
fileRecord.setRecord(bytes);
//LOGGER.info("bytesRead <{}>", bytesRead);

fileRecordConsumer.accept(fileRecord);
byteArrayOutputStream.reset();
fileRecord.setStartOffset(absolutePosition); // next if any
lastRecordStart = absolutePosition;

// record complete


// for next one
fileRecord.setStartOffset(recordEnd); // next if any
lastRecordEnd = recordEnd;
fileRecord.setRecord(new byte[0]); // clear record
outputBuffer.clear();
}
bytesRead--;
}
byteBuffer.clear();
}
// persistence at lastRecordStart, partial ones will be re-read
stateStore.setOffset(monitoredFile.getPath(), lastRecordStart);
stateStore.setOffset(filePath, lastRecordEnd);
}
catch (IOException ioException) {
throw new UncheckedIOException(ioException);
}
finally {
fileChannelCache.release(monitoredFile.getPath());
fileChannelCache.release(filePath);
}
}

Expand All @@ -121,24 +166,33 @@ public void accept(MonitoredFile monitoredFile) {
);
}

if (LOGGER.isTraceEnabled()) {
LOGGER.trace("<{}> entry for <{}>", monitoredFile.getStatus(), monitoredFile.getPath());
}
switch (monitoredFile.getStatus()) {
case SYNC_NEW:
readFile(monitoredFile);
readFile(monitoredFile.getPath());
break;
case SYNC_MODIFIED:
readFile(monitoredFile);
readFile(monitoredFile.getPath());
break;
case SYNC_DELETED:
readFile(monitoredFile.getPath());
fileChannelCache.invalidate(monitoredFile.getPath());
stateStore.deleteOffset(monitoredFile.getPath());
break;
case SYNC_RECREATED:
readFile(monitoredFile.getPath());
fileChannelCache.invalidate(monitoredFile.getPath());
stateStore.deleteOffset(monitoredFile.getPath());
readFile(monitoredFile);
readFile(monitoredFile.getPath());
break;
default:
throw new IllegalStateException("monitoredFile.getStatus() provided invalid state <" + monitoredFile.getStatus() + ">");
}

if(LOGGER.isTraceEnabled()) {
LOGGER.trace("<{}> exit for <{}>", monitoredFile.getStatus(), monitoredFile.getPath());
}
}
}
3 changes: 2 additions & 1 deletion src/main/java/com/teragrep/rlo_13/StatefulFileReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.teragrep.rlo_13;

import com.teragrep.rlo_12.MonitoredFile;
import com.teragrep.rlo_13.statestore.LMDBStateStore;
import com.teragrep.rlo_13.statestore.StateStore;

import java.nio.file.Path;
Expand All @@ -32,7 +33,7 @@ public class StatefulFileReader implements Supplier<Consumer<MonitoredFile>>, Au
private final Supplier<Consumer<FileRecord>> fileRecordConsumerSupplier;

public StatefulFileReader(Path stateStorePath, Supplier<Consumer<FileRecord>> fileRecordConsumerSupplier) {
this.stateStore = new StateStore(stateStorePath);
this.stateStore = new LMDBStateStore(stateStorePath);
this.fileChannelCache = new FileChannelCache();
this.fileRecordConsumerSupplier = fileRecordConsumerSupplier;
}
Expand Down
119 changes: 119 additions & 0 deletions src/main/java/com/teragrep/rlo_13/statestore/LMDBStateStore.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
Java Stateful File Reader rlo_13
Copyright (C) 2023 Suomen Kanuuna Oy
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.teragrep.rlo_13.statestore;

import org.lmdbjava.Dbi;
import org.lmdbjava.Env;
import org.lmdbjava.Txn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.nio.file.Path;

import static java.nio.ByteBuffer.allocateDirect;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.lmdbjava.DbiFlags.MDB_CREATE;
import static org.lmdbjava.Env.create;

public class LMDBStateStore implements StateStore {
private static final Logger LOGGER = LoggerFactory.getLogger(LMDBStateStore.class);

private final Env<ByteBuffer> env;
private final Dbi<ByteBuffer> db;

private final KeyHashProvider keyHashProvider = new KeyHashProvider();
public LMDBStateStore(Path stateStorePath) {
env =
Env.create()
// 2 * 1024 * 1024 * 1024 / (64 (sha-256) + 8 (Long.BYTES) = 29M files
.setMapSize((long) 2 * 1024 * 1024 * 1024) //
.setMaxDbs(2)
.open(stateStorePath.toFile());

db = env.openDbi("StateStore", MDB_CREATE);
getVersion();
}

private long getVersion() {
Dbi<ByteBuffer> versionDb = env.openDbi("VersionStore", MDB_CREATE);

byte[] versionStringBytes = "StateStoreVersion".getBytes(UTF_8);
final ByteBuffer key = allocateDirect(env.getMaxKeySize());
key.put(versionStringBytes).flip();

try (Txn<ByteBuffer> txn = env.txnRead()) {
final ByteBuffer found = versionDb.get(txn, key);
if (found == null) {
// new DB
long version = 1;
LOGGER.trace("Created new StateStore with version <{}>", version);
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(Long.BYTES);
byteBuffer.putLong(version).flip();
versionDb.put(key,byteBuffer);
return version;
}
else {
final ByteBuffer fetchedVal = txn.val();
long version = fetchedVal.getLong();
LOGGER.trace("Found existing StateStore with version <{}>", version);
return version;
}
}
}

@Override
public void setOffset(Path path, long offset) {

final ByteBuffer val = allocateDirect(Long.BYTES);
val.putLong(offset).flip();

ByteBuffer key = keyHashProvider.getKey(path);
db.put(key, val);
}

@Override
public void deleteOffset(Path path) {
ByteBuffer key = keyHashProvider.getKey(path);
db.delete(key);
}

@Override
public long getOffset(Path path) {
ByteBuffer key = keyHashProvider.getKey(path);

try (Txn<ByteBuffer> txn = env.txnRead()) {
final ByteBuffer found = db.get(txn, key);
if (found == null) {
return 0;
}
else {
final ByteBuffer fetchedVal = txn.val();
return fetchedVal.getLong();
}
}

}



@Override
public void close() {
env.close();
}
}
Loading

0 comments on commit fe48797

Please sign in to comment.