diff --git a/pom.xml b/pom.xml
index ccb0edd..48eabfe 100644
--- a/pom.xml
+++ b/pom.xml
@@ -40,7 +40,7 @@
com.teragrep
rlo_12
- 1.0.3
+ 1.0.4
diff --git a/src/main/java/com/teragrep/rlo_13/MonitoredFileConsumer.java b/src/main/java/com/teragrep/rlo_13/MonitoredFileConsumer.java
index 5825d9b..97c6f84 100644
--- a/src/main/java/com/teragrep/rlo_13/MonitoredFileConsumer.java
+++ b/src/main/java/com/teragrep/rlo_13/MonitoredFileConsumer.java
@@ -22,11 +22,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
import java.util.function.Consumer;
class MonitoredFileConsumer implements Consumer {
@@ -43,71 +44,115 @@ public MonitoredFileConsumer(FileChannelCache fileChannelCache, StateStore state
this.fileRecordConsumer = fileRecordConsumer;
}
- void readFile(MonitoredFile monitoredFile) {
+ void readFile(Path filePath) {
+
// object to pass metadata within
- FileRecord fileRecord = new FileRecord(monitoredFile.getPath());
+ FileRecord fileRecord = new FileRecord(filePath);
- long absolutePosition = stateStore.getOffset(monitoredFile.getPath());
- long lastRecordStart = absolutePosition;
- // !! trace log existing position
- FileChannel fileChannel = fileChannelCache.acquire(monitoredFile.getPath());
+ FileChannel fileChannel = fileChannelCache.acquire(filePath);
if (fileChannel == null) {
if(LOGGER.isTraceEnabled()) {
- LOGGER.trace("Gave up on <[{}]> due to null FileChannel.", monitoredFile.getPath());
+ LOGGER.trace("Gave up on <[{}]> due to null FileChannel.",filePath);
}
- stateStore.deleteOffset(monitoredFile.getPath());
+ stateStore.deleteOffset(filePath);
return;
}
+ if (LOGGER.isTraceEnabled()) {
+ try {
+ LOGGER.trace("fileChannel position <{}> size <{}>", fileChannel.position(), fileChannel.size());
+ } catch (IOException ignored) {
+
+ }
+ }
+
+
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(32*1024);
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ ByteBuffer outputBuffer = ByteBuffer.allocateDirect(1024*1024); // todo configurable
+
+
+ long lastRecordEnd = stateStore.getOffset(filePath);
try {
- long fileChannelSize = fileChannel.size();
- if (fileChannelSize < absolutePosition) {
+ if (fileChannel.size() < lastRecordEnd) {
if(LOGGER.isTraceEnabled()) {
LOGGER.trace(
"Path <[{}]> truncated: size: <{}> is LT position <{}>.",
- monitoredFile.getPath(),
- fileChannelSize,
- absolutePosition
+ filePath,
+ fileChannel.size(),
+ lastRecordEnd
);
}
- absolutePosition = 0;
- stateStore.setOffset(monitoredFile.getPath(), absolutePosition);
+ lastRecordEnd = 0;
+ stateStore.setOffset(filePath, lastRecordEnd);
}
- fileChannel.position(absolutePosition);
- fileRecord.setStartOffset(absolutePosition); // set initial startingPosition
- // note that this does not read all the changes, it will be re-read when something happens during read
- while (fileChannel.position() < fileChannelSize) {
- fileChannel.read(byteBuffer);
+
+ LOGGER.trace("lastRecordEnd <{}> for <{}>", lastRecordEnd, filePath);
+
+ fileChannel.position(lastRecordEnd);
+ fileRecord.setStartOffset(lastRecordEnd); // set initial startingPosition
+
+ long bytesRead = 0;
+ while (fileChannel.position() < fileChannel.size()) {
+ bytesRead = fileChannel.read(byteBuffer);
+
+ if (bytesRead < 1) {
+ return;
+ }
+
byteBuffer.flip(); // reading
while (byteBuffer.hasRemaining()) {
byte b = byteBuffer.get();
- absolutePosition++;
- if (b != 10) { // newline
- byteArrayOutputStream.write(b);
+
+ boolean maximumRecordSize = outputBuffer.position() == outputBuffer.capacity() - 1;
+
+ if (b != '\n' && !maximumRecordSize) {
+ outputBuffer.put(b);
}
else {
- byteArrayOutputStream.write(10);
- LOGGER.trace("Produced fileRecord at <{}>", absolutePosition);
- fileRecord.setEndOffset(absolutePosition);
- fileRecord.setRecord(byteArrayOutputStream.toByteArray());
+ outputBuffer.put(b);
+ long recordEnd = lastRecordEnd + outputBuffer.position();
+ outputBuffer.flip();
+
+
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace(
+ "Produced fileRecord from start <{}> end <{}>, maximumRecordSize <{}>",
+ lastRecordEnd,
+ recordEnd,
+ maximumRecordSize
+ );
+ }
+
+ byte[] bytes = new byte[outputBuffer.remaining()];
+ outputBuffer.get(bytes);
+
+ fileRecord.setEndOffset(recordEnd);
+ fileRecord.setRecord(bytes);
+ //LOGGER.info("bytesRead <{}>", bytesRead);
+
fileRecordConsumer.accept(fileRecord);
- byteArrayOutputStream.reset();
- fileRecord.setStartOffset(absolutePosition); // next if any
- lastRecordStart = absolutePosition;
+
+ // record complete
+
+
+ // for next one
+ fileRecord.setStartOffset(recordEnd); // next if any
+ lastRecordEnd = recordEnd;
+ fileRecord.setRecord(new byte[0]); // clear record
+ outputBuffer.clear();
}
+ bytesRead--;
}
byteBuffer.clear();
}
// persistence at lastRecordStart, partial ones will be re-read
- stateStore.setOffset(monitoredFile.getPath(), lastRecordStart);
+ stateStore.setOffset(filePath, lastRecordEnd);
}
catch (IOException ioException) {
throw new UncheckedIOException(ioException);
}
finally {
- fileChannelCache.release(monitoredFile.getPath());
+ fileChannelCache.release(filePath);
}
}
@@ -121,24 +166,33 @@ public void accept(MonitoredFile monitoredFile) {
);
}
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("<{}> entry for <{}>", monitoredFile.getStatus(), monitoredFile.getPath());
+ }
switch (monitoredFile.getStatus()) {
case SYNC_NEW:
- readFile(monitoredFile);
+ readFile(monitoredFile.getPath());
break;
case SYNC_MODIFIED:
- readFile(monitoredFile);
+ readFile(monitoredFile.getPath());
break;
case SYNC_DELETED:
+ readFile(monitoredFile.getPath());
fileChannelCache.invalidate(monitoredFile.getPath());
stateStore.deleteOffset(monitoredFile.getPath());
break;
case SYNC_RECREATED:
+ readFile(monitoredFile.getPath());
fileChannelCache.invalidate(monitoredFile.getPath());
stateStore.deleteOffset(monitoredFile.getPath());
- readFile(monitoredFile);
+ readFile(monitoredFile.getPath());
break;
default:
throw new IllegalStateException("monitoredFile.getStatus() provided invalid state <" + monitoredFile.getStatus() + ">");
}
+
+ if(LOGGER.isTraceEnabled()) {
+ LOGGER.trace("<{}> exit for <{}>", monitoredFile.getStatus(), monitoredFile.getPath());
+ }
}
}
diff --git a/src/main/java/com/teragrep/rlo_13/StatefulFileReader.java b/src/main/java/com/teragrep/rlo_13/StatefulFileReader.java
index e95d085..574a208 100644
--- a/src/main/java/com/teragrep/rlo_13/StatefulFileReader.java
+++ b/src/main/java/com/teragrep/rlo_13/StatefulFileReader.java
@@ -18,6 +18,7 @@
package com.teragrep.rlo_13;
import com.teragrep.rlo_12.MonitoredFile;
+import com.teragrep.rlo_13.statestore.LMDBStateStore;
import com.teragrep.rlo_13.statestore.StateStore;
import java.nio.file.Path;
@@ -32,7 +33,7 @@ public class StatefulFileReader implements Supplier>, Au
private final Supplier> fileRecordConsumerSupplier;
public StatefulFileReader(Path stateStorePath, Supplier> fileRecordConsumerSupplier) {
- this.stateStore = new StateStore(stateStorePath);
+ this.stateStore = new LMDBStateStore(stateStorePath);
this.fileChannelCache = new FileChannelCache();
this.fileRecordConsumerSupplier = fileRecordConsumerSupplier;
}
diff --git a/src/main/java/com/teragrep/rlo_13/statestore/LMDBStateStore.java b/src/main/java/com/teragrep/rlo_13/statestore/LMDBStateStore.java
new file mode 100644
index 0000000..c82494a
--- /dev/null
+++ b/src/main/java/com/teragrep/rlo_13/statestore/LMDBStateStore.java
@@ -0,0 +1,119 @@
+/*
+ Java Stateful File Reader rlo_13
+ Copyright (C) 2023 Suomen Kanuuna Oy
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package com.teragrep.rlo_13.statestore;
+
+import org.lmdbjava.Dbi;
+import org.lmdbjava.Env;
+import org.lmdbjava.Txn;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+
+import static java.nio.ByteBuffer.allocateDirect;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.lmdbjava.DbiFlags.MDB_CREATE;
+import static org.lmdbjava.Env.create;
+
+public class LMDBStateStore implements StateStore {
+ private static final Logger LOGGER = LoggerFactory.getLogger(LMDBStateStore.class);
+
+ private final Env env;
+ private final Dbi db;
+
+ private final KeyHashProvider keyHashProvider = new KeyHashProvider();
+ public LMDBStateStore(Path stateStorePath) {
+ env =
+ Env.create()
+ // 2 * 1024 * 1024 * 1024 / (64 (sha-256) + 8 (Long.BYTES) = 29M files
+ .setMapSize((long) 2 * 1024 * 1024 * 1024) //
+ .setMaxDbs(2)
+ .open(stateStorePath.toFile());
+
+ db = env.openDbi("StateStore", MDB_CREATE);
+ getVersion();
+ }
+
+ private long getVersion() {
+ Dbi versionDb = env.openDbi("VersionStore", MDB_CREATE);
+
+ byte[] versionStringBytes = "StateStoreVersion".getBytes(UTF_8);
+ final ByteBuffer key = allocateDirect(env.getMaxKeySize());
+ key.put(versionStringBytes).flip();
+
+ try (Txn txn = env.txnRead()) {
+ final ByteBuffer found = versionDb.get(txn, key);
+ if (found == null) {
+ // new DB
+ long version = 1;
+ LOGGER.trace("Created new StateStore with version <{}>", version);
+ ByteBuffer byteBuffer = ByteBuffer.allocateDirect(Long.BYTES);
+ byteBuffer.putLong(version).flip();
+ versionDb.put(key,byteBuffer);
+ return version;
+ }
+ else {
+ final ByteBuffer fetchedVal = txn.val();
+ long version = fetchedVal.getLong();
+ LOGGER.trace("Found existing StateStore with version <{}>", version);
+ return version;
+ }
+ }
+ }
+
+ @Override
+ public void setOffset(Path path, long offset) {
+
+ final ByteBuffer val = allocateDirect(Long.BYTES);
+ val.putLong(offset).flip();
+
+ ByteBuffer key = keyHashProvider.getKey(path);
+ db.put(key, val);
+ }
+
+ @Override
+ public void deleteOffset(Path path) {
+ ByteBuffer key = keyHashProvider.getKey(path);
+ db.delete(key);
+ }
+
+ @Override
+ public long getOffset(Path path) {
+ ByteBuffer key = keyHashProvider.getKey(path);
+
+ try (Txn txn = env.txnRead()) {
+ final ByteBuffer found = db.get(txn, key);
+ if (found == null) {
+ return 0;
+ }
+ else {
+ final ByteBuffer fetchedVal = txn.val();
+ return fetchedVal.getLong();
+ }
+ }
+
+ }
+
+
+
+ @Override
+ public void close() {
+ env.close();
+ }
+}
diff --git a/src/main/java/com/teragrep/rlo_13/statestore/StateStore.java b/src/main/java/com/teragrep/rlo_13/statestore/StateStore.java
index c705ed9..433157f 100644
--- a/src/main/java/com/teragrep/rlo_13/statestore/StateStore.java
+++ b/src/main/java/com/teragrep/rlo_13/statestore/StateStore.java
@@ -14,104 +14,17 @@
See the License for the specific language governing permissions and
limitations under the License.
*/
-
package com.teragrep.rlo_13.statestore;
-import org.lmdbjava.Dbi;
-import org.lmdbjava.Env;
-import org.lmdbjava.Txn;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
import java.nio.file.Path;
-import java.nio.file.Paths;
-
-import static java.nio.ByteBuffer.allocateDirect;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.lmdbjava.DbiFlags.MDB_CREATE;
-import static org.lmdbjava.Env.create;
-
-public class StateStore implements AutoCloseable {
- private static final Logger LOGGER = LoggerFactory.getLogger(StateStore.class);
-
- private final Env env;
- private final Dbi db;
-
- private final KeyHashProvider keyHashProvider = new KeyHashProvider();
- public StateStore(Path stateStorePath) {
- env =
- create()
- // 2 * 1024 * 1024 * 1024 / (64 (sha-256) + 8 (Long.BYTES) = 29M files
- .setMapSize((long) 2 * 1024 * 1024 * 1024) //
- .setMaxDbs(2)
- .open(stateStorePath.toFile());
-
- db = env.openDbi("StateStore", MDB_CREATE);
- getVersion();
- }
-
- private long getVersion() {
- Dbi versionDb = env.openDbi("VersionStore", MDB_CREATE);
-
- byte[] versionStringBytes = "StateStoreVersion".getBytes(UTF_8);
- final ByteBuffer key = allocateDirect(env.getMaxKeySize());
- key.put(versionStringBytes).flip();
-
- try (Txn txn = env.txnRead()) {
- final ByteBuffer found = versionDb.get(txn, key);
- if (found == null) {
- // new DB
- long version = 1;
- LOGGER.trace("Created new StateStore with version <{}>", version);
- ByteBuffer byteBuffer = ByteBuffer.allocateDirect(Long.BYTES);
- byteBuffer.putLong(version).flip();
- versionDb.put(key,byteBuffer);
- return version;
- }
- else {
- final ByteBuffer fetchedVal = txn.val();
- long version = fetchedVal.getLong();
- LOGGER.trace("Found existing StateStore with version <{}>", version);
- return version;
- }
- }
- }
-
- public void setOffset(Path path, long offset) {
-
- final ByteBuffer val = allocateDirect(Long.BYTES);
- val.putLong(offset).flip();
-
- ByteBuffer key = keyHashProvider.getKey(path);
- db.put(key, val);
- }
-
- public void deleteOffset(Path path) {
- ByteBuffer key = keyHashProvider.getKey(path);
- db.delete(key);
- }
-
- public long getOffset(Path path) {
- ByteBuffer key = keyHashProvider.getKey(path);
-
- try (Txn txn = env.txnRead()) {
- final ByteBuffer found = db.get(txn, key);
- if (found == null) {
- return 0;
- }
- else {
- final ByteBuffer fetchedVal = txn.val();
- return fetchedVal.getLong();
- }
- }
- }
+public interface StateStore extends AutoCloseable {
+ void setOffset(Path path, long offset);
+ void deleteOffset(Path path);
+ long getOffset(Path path);
@Override
- public void close() {
- env.close();
- }
+ void close();
}
diff --git a/src/test/java/com/teragrep/rlo_13/ManualMoveTest.java b/src/test/java/com/teragrep/rlo_13/ManualMoveTest.java
new file mode 100644
index 0000000..f3f854d
--- /dev/null
+++ b/src/test/java/com/teragrep/rlo_13/ManualMoveTest.java
@@ -0,0 +1,219 @@
+/*
+ Java Stateful File Reader rlo_13
+ Copyright (C) 2023 Suomen Kanuuna Oy
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package com.teragrep.rlo_13;
+
+import com.teragrep.rlo_12.DirectoryEventWatcher;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+
+
+public class ManualMoveTest {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ManualMoveTest.class);
+ @Test
+ @EnabledIfSystemProperty(named = "runManualMoveTest", matches = "true")
+ public void manualTmpTest() throws InterruptedException, IOException {
+ AtomicLong readCounter = new AtomicLong();
+ AtomicLong readA = new AtomicLong();
+ AtomicLong readB = new AtomicLong();
+ AtomicLong readC = new AtomicLong();
+ AtomicLong readD = new AtomicLong();
+ AtomicLong readE = new AtomicLong();
+ AtomicLong readF = new AtomicLong();
+ AtomicLong readG = new AtomicLong();
+ AtomicLong readH = new AtomicLong();
+
+ AtomicLong writeCounter = new AtomicLong();
+
+ final String[] testMessage =
+ {
+ "START" + new String(new char[1000]).replace("\0", "A") + "END\n",
+ "START" + new String(new char[1000]).replace("\0", "B") + "END\n",
+ "START" + new String(new char[1000]).replace("\0", "C") + "END\n",
+ "START" + new String(new char[1000]).replace("\0", "D") + "END\n",
+ "START" + new String(new char[1000]).replace("\0", "E") + "END\n",
+ "START" + new String(new char[1000]).replace("\0", "F") + "END\n",
+ "START" + new String(new char[1000]).replace("\0", "G") + "END\n",
+ "START" + new String(new char[1000]).replace("\0", "H") + "END\n"
+ };
+ final int messageSize = testMessage[0].length();
+
+ final Supplier> consumerSupplier = () -> fileRecord -> {
+ if(fileRecord.getRecord().length != messageSize) {
+ throw new RuntimeException(
+ "Got an unexpected message: <["
+ + new String(fileRecord.getRecord(), StandardCharsets.UTF_8)
+ + "]>, size "
+ + fileRecord.getRecord().length
+ + " so <"+fileRecord.getStartOffset()+">"
+ + " eo <"+fileRecord.getEndOffset()+">"
+ );
+ }
+ else {
+ if (new String(fileRecord.getRecord(), StandardCharsets.US_ASCII).equals(testMessage[0])) {
+ readA.incrementAndGet();
+ }
+ else if (new String(fileRecord.getRecord(), StandardCharsets.US_ASCII).equals(testMessage[1])) {
+ readB.incrementAndGet();
+ }
+ else if (new String(fileRecord.getRecord(), StandardCharsets.US_ASCII).equals(testMessage[2])) {
+ readC.incrementAndGet();
+ }
+ else if (new String(fileRecord.getRecord(), StandardCharsets.US_ASCII).equals(testMessage[3])) {
+ readD.incrementAndGet();
+ }
+ else if (new String(fileRecord.getRecord(), StandardCharsets.US_ASCII).equals(testMessage[4])) {
+ readE.incrementAndGet();
+ }
+ else if (new String(fileRecord.getRecord(), StandardCharsets.US_ASCII).equals(testMessage[5])) {
+ readF.incrementAndGet();
+ }
+ else if (new String(fileRecord.getRecord(), StandardCharsets.US_ASCII).equals(testMessage[6])) {
+ readG.incrementAndGet();
+ }
+ else if (new String(fileRecord.getRecord(), StandardCharsets.US_ASCII).equals(testMessage[7])) {
+ readH.incrementAndGet();
+ }
+
+ readCounter.incrementAndGet();
+ }
+ };
+
+ Thread writer = new Thread(() -> {
+ LOGGER.info("Starting writer thread");
+ try {
+ Path path = Paths.get("/tmp/rlo_13_move/log");
+ Files.createDirectories(path);
+ FileWriter fileWriter = new FileWriter(path + "/input.txt");
+
+ int file = 0;
+
+ for(int i=0; i {
+ LOGGER.info("Starting move thread");
+ try {
+ Path path1 = Paths.get("/tmp/rlo_13_move/log/input.txt");
+ Path path2 = Paths.get("/tmp/rlo_13_move/log/input.txt.1");
+ while(true) {
+ LOGGER.warn("written <{}> read <{}>, A <{}>, B <{}>, C <{}>, D <{}>, E <{}>, F <{}>, G <{}>, H <{}>",
+ writeCounter.get(),
+ readCounter.get(),
+ readA.get(),
+ readB.get(),
+ readC.get(),
+ readD.get(),
+ readE.get(),
+ readF.get(),
+ readG.get(),
+ readH.get()
+ );
+ try {
+ Thread.sleep(1000);
+ Files.move(path1, path2, REPLACE_EXISTING);
+
+ }
+ catch (Exception ignored) {
+
+ }
+ }
+ }
+ catch (Exception e){
+ throw new RuntimeException(e);
+ }
+ });
+ mover.start();
+
+ Files.createDirectories(Paths.get("/tmp/rlo_13_move/state"));
+ try (StatefulFileReader statefulFileReader =
+ new StatefulFileReader(
+ Paths.get("/tmp/rlo_13_move/state"),
+ consumerSupplier
+ )
+ ) {
+ DirectoryEventWatcher dew = new DirectoryEventWatcher(
+ Paths.get("/tmp/rlo_13_move/log"),
+ false,
+ Pattern.compile("^.*txt$"),
+ statefulFileReader
+ );
+
+ dew.watch();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/src/test/java/com/teragrep/rlo_13/ManualRaceConditionTest.java b/src/test/java/com/teragrep/rlo_13/ManualRaceConditionTest.java
new file mode 100644
index 0000000..52f014b
--- /dev/null
+++ b/src/test/java/com/teragrep/rlo_13/ManualRaceConditionTest.java
@@ -0,0 +1,191 @@
+/*
+ Java Stateful File Reader rlo_13
+ Copyright (C) 2023 Suomen Kanuuna Oy
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package com.teragrep.rlo_13;
+
+import com.teragrep.rlo_12.DirectoryEventWatcher;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.WRITE;
+
+public class ManualRaceConditionTest {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ManualRaceConditionTest.class);
+ @Test
+ @EnabledIfSystemProperty(named = "runManualRaceConditionTest", matches = "true")
+ public void manualTmpTest() throws InterruptedException, IOException {
+
+ String testMessage = "START" + new String(new char[1000]).replace("\0", "X") + "END\n";
+ int messageSize = testMessage.length();
+
+ Supplier> consumerSupplier = () -> fileRecord -> {
+ if(fileRecord.getRecord().length != messageSize) {
+ throw new RuntimeException(
+ "Got an unexpected message: <["
+ + new String(fileRecord.getRecord(), StandardCharsets.UTF_8)
+ + "]>, size "
+ + fileRecord.getRecord().length
+ + " so <"+fileRecord.getStartOffset()+">"
+ + " eo <"+fileRecord.getEndOffset()+">"
+ );
+ }
+ };
+
+ Thread writer = new Thread(() -> {
+ LOGGER.info("Starting writer thread");
+ try {
+ Path path = Paths.get("/tmp/rlo_13_race/log");
+ Files.createDirectories(path);
+ FileWriter fileWriter = new FileWriter(path + "/input.txt");
+ for(int i=0; i {
+ LOGGER.info("Starting truncator thread");
+ try {
+ Path path = Paths.get("/tmp/rlo_13_race/log/input.txt");
+ while(true) {
+ try (FileChannel fc = FileChannel.open(path, WRITE)) {
+ fc.truncate(0);
+ }
+ Thread.sleep(1000);
+ }
+ }
+ catch (Exception e){
+ throw new RuntimeException(e);
+ }
+ });
+ truncator.start();
+
+ /*
+ Files.createDirectories(Paths.get("/tmp/rlo_13_race/state"));
+ try (StatefulFileReader statefulFileReader =
+ new StatefulFileReader(
+ Paths.get("/tmp/rlo_13_race/state"),
+ consumerSupplier
+ )
+ ) {
+ DirectoryEventWatcher dew = new DirectoryEventWatcher(
+ Paths.get("/tmp/rlo_13_race/log"),
+ false,
+ Pattern.compile("^.*$"),
+ statefulFileReader
+ );
+
+ dew.watch();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }*/
+ while (true) {
+ readFile();
+ }
+ }
+
+ private void readFile() {
+ Path path = Paths.get("/tmp/rlo_13_race/log/input.txt");
+ FileRecord fileRecord = new FileRecord(path);
+
+ ByteBuffer byteBuffer = ByteBuffer.allocateDirect(32*1024);
+ ByteBuffer outputBuffer = ByteBuffer.allocateDirect(1024*1024); // todo configurable
+
+
+ // !! trace log existing position
+
+
+ try (FileChannel fileChannel = FileChannel.open(path, READ)) {
+ fileChannel.position(0);
+
+ long bytesRead = 0;
+ while (fileChannel.position() < fileChannel.size()) {
+ bytesRead = fileChannel.read(byteBuffer);
+
+ if (bytesRead < 1) {
+ return;
+ }
+
+ byteBuffer.flip(); // reading
+ while (byteBuffer.hasRemaining()) {
+ byte b = byteBuffer.get();
+
+ if (b == '\0') {
+ throw new RuntimeException("nullia ja monta " + bytesRead);
+ }
+
+ if (b != '\n' && outputBuffer.position() != outputBuffer.capacity() - 1) {
+ outputBuffer.put(b);
+ }
+ else {
+ outputBuffer.put((byte) '\n');
+ outputBuffer.flip();
+ LOGGER.trace("Produced fileRecord at <{}>", fileChannel.position());
+
+ byte[] bytes = new byte[outputBuffer.remaining()];
+ outputBuffer.get(bytes);
+
+ fileRecord.setEndOffset(fileChannel.position());
+ fileRecord.setRecord(bytes);
+ //LOGGER.info("bytesRead <{}>", bytesRead);
+
+ // for next one
+ fileRecord.setStartOffset(fileChannel.position()); // next if any
+ fileRecord.setRecord(new byte[0]);
+ outputBuffer.clear();
+ }
+ bytesRead--;
+ }
+ byteBuffer.clear();
+ }
+ // persistence at lastRecordStart, partial ones will be re-read
+ }
+ catch (IOException ioException) {
+ throw new UncheckedIOException(ioException);
+ }
+ }
+}
diff --git a/src/test/java/com/teragrep/rlo_13/MonitoredFileConsumerTest.java b/src/test/java/com/teragrep/rlo_13/MonitoredFileConsumerTest.java
new file mode 100644
index 0000000..031351f
--- /dev/null
+++ b/src/test/java/com/teragrep/rlo_13/MonitoredFileConsumerTest.java
@@ -0,0 +1,181 @@
+/*
+ Java Stateful File Reader rlo_13
+ Copyright (C) 2023 Suomen Kanuuna Oy
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package com.teragrep.rlo_13;
+
+import com.teragrep.rlo_13.statestore.InMemoryStateStore;
+import com.teragrep.rlo_13.statestore.StateStore;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+public class MonitoredFileConsumerTest {
+ private void createTestFile(Path filePath, int records, boolean append) throws IOException {
+ try (FileWriter fileWriter = new FileWriter(filePath.toFile(), append)) {
+ for (int i = 0; i < records; i++) {
+ fileWriter.write(i+"\n");
+ fileWriter.flush();
+ }
+ }
+ }
+ @Test
+ public void testReadFile() throws IOException {
+ AtomicLong recordCounter = new AtomicLong();
+ Path testFilePath = Paths.get("target/MonitoredFileConsumerTest#createTestFile");
+
+ int records = 10;
+
+ try (FileChannelCache fcc = new FileChannelCache()) {
+
+ Consumer frc = fileRecord -> recordCounter.incrementAndGet();
+
+ StateStore stateStore = new InMemoryStateStore();
+
+ MonitoredFileConsumer mfc = new MonitoredFileConsumer(fcc, stateStore, frc);
+
+ createTestFile(testFilePath, records, false);
+ mfc.readFile(testFilePath);
+ }
+
+ Assertions.assertEquals(records, recordCounter.get());
+ }
+
+ @Test
+ public void testReadAppendedFile() throws IOException {
+ AtomicLong recordCounter = new AtomicLong();
+ Path testFilePath = Paths.get("target/MonitoredFileConsumerTest#testReadAppendedFile");
+
+ int records = 10;
+
+ try (FileChannelCache fcc = new FileChannelCache()) {
+
+ Consumer frc = fileRecord -> recordCounter.incrementAndGet();
+
+ StateStore stateStore = new InMemoryStateStore();
+
+ MonitoredFileConsumer mfc = new MonitoredFileConsumer(fcc, stateStore, frc);
+
+ // create
+ createTestFile(testFilePath, records, false);
+
+ // read 1
+ mfc.readFile(testFilePath);
+
+ // append
+ createTestFile(testFilePath, records, true);
+
+ // read 2
+ mfc.readFile(testFilePath);
+ }
+
+ Assertions.assertEquals(records * 2, recordCounter.get());
+ }
+
+ @Test
+ public void testReadDeletedFile() throws IOException {
+ AtomicLong recordCounter = new AtomicLong();
+ Path testFilePath = Paths.get("target/MonitoredFileConsumerTest#testReadDeletedFile");
+
+ int records = 10;
+
+ try (FileChannelCache fcc = new FileChannelCache()) {
+
+ Consumer frc = fileRecord -> recordCounter.incrementAndGet();
+
+ StateStore stateStore = new InMemoryStateStore();
+
+ MonitoredFileConsumer mfc = new MonitoredFileConsumer(fcc, stateStore, frc);
+
+ // create
+ try (FileWriter fileWriter = new FileWriter(testFilePath.toFile(), false)) {
+ // write 1
+ for (int i = 0; i < records; i++) {
+ fileWriter.write(i+"\n");
+ fileWriter.flush();
+ }
+
+ // read 1
+ mfc.readFile(testFilePath);
+
+ // delete
+ Files.delete(testFilePath);
+
+ // write 2
+ for (int i = 0; i < records; i++) {
+ fileWriter.write(i+"\n");
+ fileWriter.flush();
+ }
+
+ // read 2
+ mfc.readFile(testFilePath);
+ }
+ }
+
+ Assertions.assertEquals(records * 2, recordCounter.get());
+ }
+
+ @Test
+ public void testReadMovedFile() throws IOException {
+ AtomicLong recordCounter = new AtomicLong();
+ Path testFilePath = Paths.get("target/MonitoredFileConsumerTest#testReadMovedFile");
+ Path testMovedFilePath = Paths.get("target/MonitoredFileConsumerTest#testReadMovedFile.moved");
+
+ int records = 10;
+
+ try (FileChannelCache fcc = new FileChannelCache()) {
+
+ Consumer frc = fileRecord -> recordCounter.incrementAndGet();
+
+ StateStore stateStore = new InMemoryStateStore();
+
+ MonitoredFileConsumer mfc = new MonitoredFileConsumer(fcc, stateStore, frc);
+
+ // create
+ try (FileWriter fileWriter = new FileWriter(testFilePath.toFile(), false)) {
+ // write 1
+ for (int i = 0; i < records; i++) {
+ fileWriter.write(i+"\n");
+ fileWriter.flush();
+ }
+
+ // read 1
+ mfc.readFile(testFilePath);
+
+ // delete
+ Files.move(testFilePath, testMovedFilePath);
+
+ // write 2
+ for (int i = 0; i < records; i++) {
+ fileWriter.write(i+"\n");
+ fileWriter.flush();
+ }
+
+ // read 2
+ mfc.readFile(testFilePath);
+ }
+ }
+
+ Assertions.assertEquals(records * 2, recordCounter.get());
+ }
+}
diff --git a/src/test/java/com/teragrep/rlo_13/statestore/InMemoryStateStore.java b/src/test/java/com/teragrep/rlo_13/statestore/InMemoryStateStore.java
new file mode 100644
index 0000000..d41bc56
--- /dev/null
+++ b/src/test/java/com/teragrep/rlo_13/statestore/InMemoryStateStore.java
@@ -0,0 +1,53 @@
+/*
+ Java Stateful File Reader rlo_13
+ Copyright (C) 2023 Suomen Kanuuna Oy
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+package com.teragrep.rlo_13.statestore;
+
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+
+public class InMemoryStateStore implements StateStore {
+
+ private final Map pathOffsetMap = new HashMap<>();
+
+ @Override
+ public void setOffset(Path path, long offset) {
+ pathOffsetMap.put(path, offset);
+ }
+
+ @Override
+ public void deleteOffset(Path path) {
+ pathOffsetMap.remove(path);
+ }
+
+ @Override
+ public long getOffset(Path path) {
+ Long offset = pathOffsetMap.get(path);
+ if (offset == null) {
+ return 0;
+ }
+ else {
+ return offset;
+ }
+
+ }
+
+ @Override
+ public void close() {
+ // no-op
+ }
+}
diff --git a/src/test/java/com/teragrep/rlo_13/statestore/LMDBStateStoreTest.java b/src/test/java/com/teragrep/rlo_13/statestore/LMDBStateStoreTest.java
new file mode 100644
index 0000000..670ac28
--- /dev/null
+++ b/src/test/java/com/teragrep/rlo_13/statestore/LMDBStateStoreTest.java
@@ -0,0 +1,114 @@
+/*
+ Java Stateful File Reader rlo_13
+ Copyright (C) 2023 Suomen Kanuuna Oy
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+package com.teragrep.rlo_13.statestore;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+public class LMDBStateStoreTest {
+
+ @Test
+ public void testGet() throws IOException {
+ Path storePath = Paths.get("target/LMDBStateStoreTest#teStore");
+
+ Files.createDirectories(storePath);
+
+ try (LMDBStateStore lss = new LMDBStateStore(storePath)) {
+ Path testPath = Paths.get("/some/testGet");
+ lss.setOffset(testPath, 123456);
+
+ Assertions.assertEquals(123456,lss.getOffset(testPath));
+ }
+ }
+
+ @Test
+ public void testNoPath() throws IOException {
+ Path storePath = Paths.get("target/LMDBStateStoreTest#teStore");
+
+ Files.createDirectories(storePath);
+
+ try (LMDBStateStore lss = new LMDBStateStore(storePath)) {
+ Path notSetPath = Paths.get("/some/testNoPath");
+
+ Assertions.assertEquals(0,lss.getOffset(notSetPath));
+ }
+ }
+
+ @Test
+ public void testSetGetDeletePath() throws IOException {
+ Path storePath = Paths.get("target/LMDBStateStoreTest#teStore");
+
+ Files.createDirectories(storePath);
+
+ try (LMDBStateStore lss = new LMDBStateStore(storePath)) {
+ Path testPath = Paths.get("/some/testSetGetDeletePath");
+
+ // not yet set
+ Assertions.assertEquals(0,lss.getOffset(testPath));
+
+ // set it
+ lss.setOffset(testPath, 123456);
+ Assertions.assertEquals(123456,lss.getOffset(testPath));
+
+ // delete it
+ lss.deleteOffset(testPath);
+
+ // no longer set
+ Assertions.assertEquals(0,lss.getOffset(testPath));
+ }
+ }
+
+ @Test
+ public void testSetGetCloseGetPath() throws IOException {
+ Path storePath = Paths.get("target/LMDBStateStoreTest#MoreStore");
+
+ Path testPath = Paths.get("/some/testSetGetCloseGetPath");
+
+ Files.createDirectories(storePath);
+
+ // open store
+ try (LMDBStateStore lss = new LMDBStateStore(storePath)) {
+ // not yet set
+ Assertions.assertEquals(0,lss.getOffset(testPath));
+
+ // set it
+ lss.setOffset(testPath, 123456);
+ Assertions.assertEquals(123456,lss.getOffset(testPath));
+ } // closes
+
+ // open second time
+ try (LMDBStateStore lss = new LMDBStateStore(storePath)) {
+ // get already set
+ Assertions.assertEquals(123456,lss.getOffset(testPath));
+
+ // delete
+ lss.deleteOffset(testPath);
+ } // closes
+
+
+ // open third time
+ try (LMDBStateStore lss = new LMDBStateStore(storePath)) {
+ // get deleted
+ Assertions.assertEquals(0,lss.getOffset(testPath));
+ } // closes
+ }
+}