diff --git a/vs-log/src/com/liquidlabs/log/space/agg/ChronicleQReplayAggregator.java b/vs-log/src/com/liquidlabs/log/space/agg/ChronicleQReplayAggregator.java index 327c42b..04eb548 100644 --- a/vs-log/src/com/liquidlabs/log/space/agg/ChronicleQReplayAggregator.java +++ b/vs-log/src/com/liquidlabs/log/space/agg/ChronicleQReplayAggregator.java @@ -44,16 +44,16 @@ public class ChronicleQReplayAggregator implements ReplayAggregator { private final static Logger LOGGER = Logger.getLogger(ChronicleQReplayAggregator.class); - private final LogRequest request; + private final LogRequest request; private String uuid; private String basePath; private Chronicle queue = null; private ConcurrentLinkedQueue writeQueue = new ConcurrentLinkedQueue<>(); private LogReplayHandler replayHandler; - private long startTime = new DateTime().getMillis(); + private long startTime = new DateTime().getMillis(); - private AtomicBoolean cancelled = new AtomicBoolean(false); + private AtomicBoolean cancelled = new AtomicBoolean(false); private volatile int sent = 0; ScheduledExecutorService scheduler = ExecutorService.newScheduledThreadPool("services"); @@ -61,18 +61,18 @@ public class ChronicleQReplayAggregator implements ReplayAggregator { private volatile int written; - public String listenerId; + public String listenerId; private ScheduledFuture future; private int pausePeriod = Integer.getInteger("q.pause",100); private volatile AtomicInteger writing = new AtomicInteger(); - private int MAX_WRITTEN = Integer.getInteger("replay.max.queue", 1 * 100 * 1000); + private volatile AtomicInteger tried = new AtomicInteger(); volatile static int created = 0; public ChronicleQReplayAggregator(LogRequest request, LogReplayHandler replayHandler, String handlerId) { - this.request = request; - this.replayHandler = replayHandler; - this.listenerId = request.subscriber() + "_id"; + this.request = request; + this.replayHandler = replayHandler; + this.listenerId = request.subscriber() + "_id"; try { String pid =PIDGetter.getPID(); @@ -95,7 +95,8 @@ public ChronicleQReplayAggregator(LogRequest request, LogReplayHandler replayHan public void handle(ReplayEvent event) { - if (cancelled.get() || written++ > MAX_WRITTEN) { + tried.incrementAndGet(); + if (cancelled.get()) { if (!cancelled.get()) cancelled.set(true); return ; } @@ -108,7 +109,7 @@ public void handle(ReplayEvent event) { } else { flushQueue(); } - } + } private void flushQueue() { bytes = 0; @@ -154,39 +155,48 @@ private void persist(List writeQueue) { ExcerptTailer reader = null; public void run(){ - try { - while (!cancelled.get() && reader.nextIndex()) { - int sizes = reader.readInt(); - byte[] bytess = new byte[sizes]; - reader.read(bytess); - - List items = (List) Convertor.deserialize(bytess); - reader.finish(); + try { + while (!cancelled.get() && reader.nextIndex()) { + int sizes = reader.readInt(); + byte[] bytess = new byte[sizes]; + reader.read(bytess); + + List items = (List) Convertor.deserialize(bytess); + reader.finish(); // System.out.println("RUNNER: " + items.size() + " Size:" + size + " Sent:" + sent + " written:" + written); - size.addAndGet(items.size() * -1); - sent += items.size(); + size.addAndGet(items.size() * -1); + sent += items.size(); - replayHandler.handle(items); + replayHandler.handle(items); - } - } catch (Throwable t){ + } + } catch (Throwable t){ LOGGER.warn("LOGGER" + t.getMessage() + " cancelled:" + cancelled + " sent:" + sent + " incoming" + size); cancelled.set(true); - if (t.getMessage().contains("RetryInvocationException: SendFailed.Throwable:noSender")) { - replayHandler = null; - } - } finally { - if (cancelled.get() || isExpired() || request.isCancelled()) { - cancel(); - } - } - } - - public Integer age(long now) { - return Long.valueOf((now - startTime)/1000l).intValue(); - } - - public void cancel() { + if (t.getMessage().contains("RetryInvocationException: SendFailed.Throwable:noSender")) { + replayHandler = null; + } + } finally { + if (cancelled.get() || isExpired() || request.isCancelled()) { + cancel(); + } + } + } + + public Integer age(long now) { + return Long.valueOf((now - startTime)/1000l).intValue(); + } + + public void cancel() { + scheduler.schedule(new Runnable() { + @Override + public void run() { + cancelInternal(); + } + }, 3, TimeUnit.SECONDS); + } + + public void cancelInternal() { synchronized (this) { if (this.queue == null) return; @@ -200,7 +210,7 @@ public void cancel() { } } this.cancelled.set(true); - LOGGER.info("LOGGER Cancelled:" + request.subscriber() + " Listener:" + listenerId + " written: " + written + " sent:" + sent + " persistedBytes:" + persistedBytes + " persistedQueues:" + queuedPersisted); + LOGGER.info("LOGGER Cancelled:" + request.subscriber() + " Listener:" + listenerId + " tried:" + tried + " written: " + written + " sent:" + sent + " persistedBytes:" + persistedBytes + " persistedQueues:" + queuedPersisted); // try and coordinate the shutdown int waiting = 0; while (writing.get() > 0 && waiting++ < Integer.getInteger("replay.agg.wait.count", 100)) { @@ -230,7 +240,7 @@ public void cancel() { deleteQueueFiles(); } } - } + } boolean closed = false; public void close() { if (!closed) { @@ -250,17 +260,17 @@ private void deleteQueueFiles() { } public boolean isExpired() { - return replayHandler == null || this.request.isExpired(); - } + return replayHandler == null || this.request.isExpired(); + } - public boolean isRunnable() { - boolean itemsQueued = queue.size() > 0; - return itemsQueued && !request.isCancelled() && !request.isExpired() ; - } + public boolean isRunnable() { + boolean itemsQueued = queue.size() > 0; + return itemsQueued && !request.isCancelled() && !request.isExpired() ; + } - public int size() { - return size.get(); - } + public int size() { + return size.get(); + } @Override @@ -280,4 +290,8 @@ public void flush() { public LogReplayHandler getReplayHandler() { return replayHandler; } + + public String toString() { + return this.getClass().getSimpleName() +" :" + request.subscriber() + " Listener:" + listenerId + " tried:" + tried + " written: " + written + " sent:" + sent + " persistedBytes:" + persistedBytes + " persistedQueues:" + queuedPersisted; + } }