From 4c06ea6e0021a4b6f7506e511449547465de0115 Mon Sep 17 00:00:00 2001 From: Marcin Biegan Date: Mon, 15 May 2017 01:04:36 +0200 Subject: [PATCH] Handle full queue better in RemoteReporter (#180) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fix RemoteReport race condition when queue is full The check “commandQueue.size() == maxQueueSize” is not thread safe, this lead to: java.lang.IllegalStateException: Queue full It’s better to .offer() and check the returned value * Fix RemoteReporter problems with command queue is full When the command queue is full it’s not possible to queue control commands - FlushCommand or CloseCommand. They were throwing exceptions. This change replaces BLockingQueue.add with BlockingQueue.offer to handle this situation. I’ve modified test InMemorySender to allow it to block. This makes it possible to easily fill the RemoteReporter’s command queue in tests. * RemoteReporter’s queue length metric update when queue full When the command queue is full further FlushCommands are not enqueued and queue length metric does not update. I moved it outside the command - now it updates whenever scheduled flush() executes. --- .../uber/jaeger/reporters/RemoteReporter.java | 34 +++-- .../uber/jaeger/reporters/InMemorySender.java | 23 ++++ .../jaeger/reporters/RemoteReporterTest.java | 119 +++++++++++++++++- 3 files changed, 165 insertions(+), 11 deletions(-) diff --git a/jaeger-core/src/main/java/com/uber/jaeger/reporters/RemoteReporter.java b/jaeger-core/src/main/java/com/uber/jaeger/reporters/RemoteReporter.java index 4f5108474..bd0b317f5 100644 --- a/jaeger-core/src/main/java/com/uber/jaeger/reporters/RemoteReporter.java +++ b/jaeger-core/src/main/java/com/uber/jaeger/reporters/RemoteReporter.java @@ -30,6 +30,7 @@ import java.util.TimerTask; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; import lombok.ToString; import lombok.extern.slf4j.Slf4j; @@ -39,6 +40,7 @@ @ToString(exclude = {"commandQueue", "flushTimer", "queueProcessorThread", "metrics"}) @Slf4j public class RemoteReporter implements Reporter { + private static final int CLOSE_ENQUEUE_TIMEOUT_MILLIS = 1000; private final BlockingQueue commandQueue; private final Timer flushTimer; private final Thread queueProcessorThread; @@ -68,7 +70,7 @@ public RemoteReporter(final Sender sender, int flushInterval, int maxQueueSize, new TimerTask() { @Override public void run() { - commandQueue.add(new FlushCommand()); + flush(); } }, 0, @@ -78,20 +80,25 @@ public void run() { @Override public void report(Span span) { // Its better to drop spans, than to block here - if (commandQueue.size() == maxQueueSize) { + boolean added = commandQueue.offer(new AppendCommand(span)); + + if (!added) { metrics.reporterDropped.inc(1); - return; } - - commandQueue.add(new AppendCommand(span)); } @Override public void close() { - commandQueue.add(new CloseCommand()); - try { - queueProcessorThread.join(); + // best-effort: if we can't add CloseCommand in this time then it probably will never happen + boolean added = commandQueue + .offer(new CloseCommand(), CLOSE_ENQUEUE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + if (added) { + queueProcessorThread.join(); + } else { + log.warn("Unable to cleanly close RemoteReporter, command queue is full - probably the" + + " sender is stuck"); + } } catch (InterruptedException e) { return; } finally { @@ -105,6 +112,15 @@ public void close() { } } + void flush() { + // to reduce the number of updateGauge stats, we only emit queue length on flush + metrics.reporterQueueLength.update(commandQueue.size()); + + // We can safely drop FlushCommand when the queue is full - sender should take care of flushing + // in such case + commandQueue.offer(new FlushCommand()); + } + /* * The code below implements the command pattern. This pattern is useful for * situations where multiple threads would need to synchronize on a resource, @@ -140,8 +156,6 @@ class FlushCommand implements Command { public void execute() throws SenderException { int n = sender.flush(); metrics.reporterSuccess.inc(n); - // to reduce the number of updateGauge stats, we only emit queue length on flush - metrics.reporterQueueLength.update(commandQueue.size()); } } diff --git a/jaeger-core/src/test/java/com/uber/jaeger/reporters/InMemorySender.java b/jaeger-core/src/test/java/com/uber/jaeger/reporters/InMemorySender.java index 58fc61b8f..ad540bc70 100644 --- a/jaeger-core/src/test/java/com/uber/jaeger/reporters/InMemorySender.java +++ b/jaeger-core/src/test/java/com/uber/jaeger/reporters/InMemorySender.java @@ -28,11 +28,18 @@ import com.uber.jaeger.thriftjava.Span; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Semaphore; +/** + * Sender which stores spans in memory. Appending a new span is a blocking operation unless + * "permitted". By default Integer.MAX_VALUE "appends" are permitted. + */ public class InMemorySender implements Sender { + private List appended; private List flushed; private List received; + private Semaphore semaphore = new Semaphore(Integer.MAX_VALUE); public InMemorySender() { appended = new ArrayList(); @@ -54,6 +61,11 @@ public List getReceived() { @Override public int append(com.uber.jaeger.Span span) throws SenderException { + try { + semaphore.acquire(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } com.uber.jaeger.thriftjava.Span thriftSpan = JaegerThriftSpanConverter.convertSpan(span); appended.add(thriftSpan); received.add(thriftSpan); @@ -73,4 +85,15 @@ public int flush() throws SenderException { public int close() throws SenderException { return flush(); } + + /** + * Removes previously granted "append" permits and grants + * a new number of permits + * + * @param number number of "appends" to permit + */ + public void permitAppend(int number) { + semaphore.drainPermits(); + semaphore.release(number); + } } diff --git a/jaeger-core/src/test/java/com/uber/jaeger/reporters/RemoteReporterTest.java b/jaeger-core/src/test/java/com/uber/jaeger/reporters/RemoteReporterTest.java index 7fc38a2d7..418cd1096 100644 --- a/jaeger-core/src/test/java/com/uber/jaeger/reporters/RemoteReporterTest.java +++ b/jaeger-core/src/test/java/com/uber/jaeger/reporters/RemoteReporterTest.java @@ -22,8 +22,11 @@ package com.uber.jaeger.reporters; +import static org.hamcrest.CoreMatchers.anyOf; +import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import com.uber.jaeger.Span; @@ -32,7 +35,10 @@ import com.uber.jaeger.metrics.Metrics; import com.uber.jaeger.metrics.StatsFactoryImpl; import com.uber.jaeger.samplers.ConstSampler; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Before; import org.junit.Test; @@ -62,7 +68,7 @@ public void setUp() throws Exception { public void testRemoteReporterReport() throws Exception { Span span = (Span) tracer.buildSpan("raza").start(); reporter.report(span); - Thread.sleep(5); + Thread.sleep(50); List received = sender.getReceived(); assertEquals(received.size(), 1); @@ -103,4 +109,115 @@ public void testRemoteReporterFlushTimerThread() throws Exception { } assertFalse(flushTimerThreadCount == 0); } + + // Starts a number of threads. Each can fill the queue on its own, so they will exceed its + // capacity many times over + @Test + public void testReportDoesntThrowWhenQueueFull() throws Exception { + final AtomicBoolean exceptionWasThrown = new AtomicBoolean(false); + + int threadsCount = 10; + final CyclicBarrier barrier = new CyclicBarrier(threadsCount); + List threads = new ArrayList<>(); + for (int i = 0; i < threadsCount; i++) { + Thread t = createSpanReportingThread(exceptionWasThrown, barrier); + threads.add(t); + t.start(); + } + + for (Thread t : threads) { + t.join(); + } + + assertFalse(exceptionWasThrown.get()); + } + + private Thread createSpanReportingThread(final AtomicBoolean exceptionWasThrown, + final CyclicBarrier barrier) { + return new Thread(new Runnable() { + @Override + public void run() { + for (int x = 0; x < maxQueueSize; x++) { + try { + barrier.await(); + reporter.report(newSpan()); + } catch (Throwable e) { + e.printStackTrace(); + exceptionWasThrown.set(true); + } + } + } + }); + } + + @Test + public void testAppendWhenQueueFull() { + // change sender to blocking mode + sender.permitAppend(0); + + for (int i = 0; i < maxQueueSize; i++) { + reporter.report(newSpan()); + } + + // When: at this point the queue is full or there is one slot empty (if the worker thread has + // already picked up some command). We add two spans to make sure that we overfill the queue + reporter.report(newSpan()); + reporter.report(newSpan()); + + // Then: one or both spans should be dropped + Long droppedCount = metricsReporter.counters.get("jaeger.spans.state=dropped"); + assertThat(droppedCount, anyOf(equalTo(1L), equalTo(2L))); + } + + @Test + public void testCloseWhenQueueFull() { + // change sender to blocking mode + sender.permitAppend(0); + + // fill the queue + for (int i = 0; i < maxQueueSize + 10; i++) { + reporter.report(newSpan()); + } + + reporter.close(); + + // expect no exception thrown + } + + @Test + public void testFlushWhenQueueFull() { + // change sender to blocking mode + sender.permitAppend(0); + + // fill the queue + for (int i = 0; i < maxQueueSize + 10; i++) { + reporter.report(newSpan()); + } + + ((RemoteReporter) reporter).flush(); + + // expect no exception thrown + } + + @Test + public void testFlushUpdatesQueueLength() throws Exception { + // change sender to blocking mode + sender.permitAppend(0); + RemoteReporter remoteReporter = (RemoteReporter) reporter; + remoteReporter.flush(); + + for (int i = 0; i < 10; i++) { + reporter.report(newSpan()); + } + + assertEquals(0, metricsReporter.gauges.get("jaeger.reporter-queue").longValue()); + + remoteReporter.flush(); + + assertTrue(metricsReporter.gauges.get("jaeger.reporter-queue") > 5); + } + + private Span newSpan() { + return (Span) tracer.buildSpan("x").start(); + } }