Skip to content

Commit

Permalink
Merge pull request #14 from StrongestNumber9/add_start
Browse files Browse the repository at this point in the history
Make it into runnable, add start() and stop() functions
  • Loading branch information
StrongestNumber9 authored Jul 27, 2023
2 parents b6fe522 + 89e243f commit 7d02f2a
Showing 1 changed file with 33 additions and 6 deletions.
39 changes: 33 additions & 6 deletions src/main/java/com/teragrep/rlo_12/DirectoryEventWatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.nio.file.attribute.BasicFileAttributes;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.regex.Matcher;
Expand All @@ -41,7 +42,7 @@
* -> /path/to/dir/foo/fii/faa subscribe as well (any dir under /path/to/dir
* -> match any file under /path/to/dir or it's sub-directories for pattern
*/
public class DirectoryEventWatcher {
public class DirectoryEventWatcher implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(DirectoryEventWatcher.class);

private final Path initialDirectory;
Expand All @@ -61,6 +62,10 @@ public class DirectoryEventWatcher {

private final long pollingInterval;
private final TimeUnit pollingIntervalTimeUnit;
private final Thread thread;
private final FileStatusManager fileStatusManager;
private final Thread fileStatusManagerThread;
private AtomicBoolean isRunning = new AtomicBoolean(true);

/**
* FileEventWatcher
Expand Down Expand Up @@ -128,12 +133,17 @@ public DirectoryEventWatcher(Path directory,

this.directoryWatcher = initialDirectory.getFileSystem().newWatchService();

FileStatusManager fileStatusManager = new FileStatusManager(transferQueue, readConsumerSupplier, maximumPoolSize);
fileStatusManager = new FileStatusManager(transferQueue, readConsumerSupplier, maximumPoolSize);

Thread fileStatusManagerThread = new Thread(fileStatusManager);
fileStatusManagerThread.start();
fileStatusManagerThread = new Thread(fileStatusManager);

initialScan(initialDirectory);
thread = new Thread(this);
thread.setName("DEW");
}

public void start() {
fileStatusManagerThread.start();
thread.start();
}

/**
Expand Down Expand Up @@ -161,7 +171,7 @@ private void initialScan(Path directory) throws IOException {
* @throws InterruptedException Directory polling was interrupted
*/
public void watch() throws IOException, InterruptedException {
while (true) {
while (isRunning.get()) {

// wait for key to be signaled
WatchKey key;
Expand Down Expand Up @@ -259,6 +269,23 @@ public void watch() throws IOException, InterruptedException {
}
}

@Override
public void run() {
try {
initialScan(initialDirectory);
watch();
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}

public void stop() throws InterruptedException {
isRunning.set(false);
thread.join();
fileStatusManager.stop();
fileStatusManagerThread.join();
}

private class ScavengingFileVisitor implements FileVisitor<Path> {
ScavengingFileVisitor() {
}
Expand Down

0 comments on commit 7d02f2a

Please sign in to comment.