diff --git a/jaeger-core/src/main/java/com/uber/jaeger/reporters/RemoteReporter.java b/jaeger-core/src/main/java/com/uber/jaeger/reporters/RemoteReporter.java index 4f5108474..bd0b317f5 100644 --- a/jaeger-core/src/main/java/com/uber/jaeger/reporters/RemoteReporter.java +++ b/jaeger-core/src/main/java/com/uber/jaeger/reporters/RemoteReporter.java @@ -30,6 +30,7 @@ import java.util.TimerTask; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; import lombok.ToString; import lombok.extern.slf4j.Slf4j; @@ -39,6 +40,7 @@ @ToString(exclude = {"commandQueue", "flushTimer", "queueProcessorThread", "metrics"}) @Slf4j public class RemoteReporter implements Reporter { + private static final int CLOSE_ENQUEUE_TIMEOUT_MILLIS = 1000; private final BlockingQueue commandQueue; private final Timer flushTimer; private final Thread queueProcessorThread; @@ -68,7 +70,7 @@ public RemoteReporter(final Sender sender, int flushInterval, int maxQueueSize, new TimerTask() { @Override public void run() { - commandQueue.add(new FlushCommand()); + flush(); } }, 0, @@ -78,20 +80,25 @@ public void run() { @Override public void report(Span span) { // Its better to drop spans, than to block here - if (commandQueue.size() == maxQueueSize) { + boolean added = commandQueue.offer(new AppendCommand(span)); + + if (!added) { metrics.reporterDropped.inc(1); - return; } - - commandQueue.add(new AppendCommand(span)); } @Override public void close() { - commandQueue.add(new CloseCommand()); - try { - queueProcessorThread.join(); + // best-effort: if we can't add CloseCommand in this time then it probably will never happen + boolean added = commandQueue + .offer(new CloseCommand(), CLOSE_ENQUEUE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + if (added) { + queueProcessorThread.join(); + } else { + log.warn("Unable to cleanly close RemoteReporter, command queue is full - probably the" + + " sender is stuck"); + } } catch (InterruptedException e) { return; } finally { @@ -105,6 +112,15 @@ public void close() { } } + void flush() { + // to reduce the number of updateGauge stats, we only emit queue length on flush + metrics.reporterQueueLength.update(commandQueue.size()); + + // We can safely drop FlushCommand when the queue is full - sender should take care of flushing + // in such case + commandQueue.offer(new FlushCommand()); + } + /* * The code below implements the command pattern. This pattern is useful for * situations where multiple threads would need to synchronize on a resource, @@ -140,8 +156,6 @@ class FlushCommand implements Command { public void execute() throws SenderException { int n = sender.flush(); metrics.reporterSuccess.inc(n); - // to reduce the number of updateGauge stats, we only emit queue length on flush - metrics.reporterQueueLength.update(commandQueue.size()); } } diff --git a/jaeger-core/src/test/java/com/uber/jaeger/reporters/InMemorySender.java b/jaeger-core/src/test/java/com/uber/jaeger/reporters/InMemorySender.java index 58fc61b8f..ad540bc70 100644 --- a/jaeger-core/src/test/java/com/uber/jaeger/reporters/InMemorySender.java +++ b/jaeger-core/src/test/java/com/uber/jaeger/reporters/InMemorySender.java @@ -28,11 +28,18 @@ import com.uber.jaeger.thriftjava.Span; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Semaphore; +/** + * Sender which stores spans in memory. Appending a new span is a blocking operation unless + * "permitted". By default Integer.MAX_VALUE "appends" are permitted. + */ public class InMemorySender implements Sender { + private List appended; private List flushed; private List received; + private Semaphore semaphore = new Semaphore(Integer.MAX_VALUE); public InMemorySender() { appended = new ArrayList(); @@ -54,6 +61,11 @@ public List getReceived() { @Override public int append(com.uber.jaeger.Span span) throws SenderException { + try { + semaphore.acquire(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } com.uber.jaeger.thriftjava.Span thriftSpan = JaegerThriftSpanConverter.convertSpan(span); appended.add(thriftSpan); received.add(thriftSpan); @@ -73,4 +85,15 @@ public int flush() throws SenderException { public int close() throws SenderException { return flush(); } + + /** + * Removes previously granted "append" permits and grants + * a new number of permits + * + * @param number number of "appends" to permit + */ + public void permitAppend(int number) { + semaphore.drainPermits(); + semaphore.release(number); + } } diff --git a/jaeger-core/src/test/java/com/uber/jaeger/reporters/RemoteReporterTest.java b/jaeger-core/src/test/java/com/uber/jaeger/reporters/RemoteReporterTest.java index 7fc38a2d7..418cd1096 100644 --- a/jaeger-core/src/test/java/com/uber/jaeger/reporters/RemoteReporterTest.java +++ b/jaeger-core/src/test/java/com/uber/jaeger/reporters/RemoteReporterTest.java @@ -22,8 +22,11 @@ package com.uber.jaeger.reporters; +import static org.hamcrest.CoreMatchers.anyOf; +import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import com.uber.jaeger.Span; @@ -32,7 +35,10 @@ import com.uber.jaeger.metrics.Metrics; import com.uber.jaeger.metrics.StatsFactoryImpl; import com.uber.jaeger.samplers.ConstSampler; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Before; import org.junit.Test; @@ -62,7 +68,7 @@ public void setUp() throws Exception { public void testRemoteReporterReport() throws Exception { Span span = (Span) tracer.buildSpan("raza").start(); reporter.report(span); - Thread.sleep(5); + Thread.sleep(50); List received = sender.getReceived(); assertEquals(received.size(), 1); @@ -103,4 +109,115 @@ public void testRemoteReporterFlushTimerThread() throws Exception { } assertFalse(flushTimerThreadCount == 0); } + + // Starts a number of threads. Each can fill the queue on its own, so they will exceed its + // capacity many times over + @Test + public void testReportDoesntThrowWhenQueueFull() throws Exception { + final AtomicBoolean exceptionWasThrown = new AtomicBoolean(false); + + int threadsCount = 10; + final CyclicBarrier barrier = new CyclicBarrier(threadsCount); + List threads = new ArrayList<>(); + for (int i = 0; i < threadsCount; i++) { + Thread t = createSpanReportingThread(exceptionWasThrown, barrier); + threads.add(t); + t.start(); + } + + for (Thread t : threads) { + t.join(); + } + + assertFalse(exceptionWasThrown.get()); + } + + private Thread createSpanReportingThread(final AtomicBoolean exceptionWasThrown, + final CyclicBarrier barrier) { + return new Thread(new Runnable() { + @Override + public void run() { + for (int x = 0; x < maxQueueSize; x++) { + try { + barrier.await(); + reporter.report(newSpan()); + } catch (Throwable e) { + e.printStackTrace(); + exceptionWasThrown.set(true); + } + } + } + }); + } + + @Test + public void testAppendWhenQueueFull() { + // change sender to blocking mode + sender.permitAppend(0); + + for (int i = 0; i < maxQueueSize; i++) { + reporter.report(newSpan()); + } + + // When: at this point the queue is full or there is one slot empty (if the worker thread has + // already picked up some command). We add two spans to make sure that we overfill the queue + reporter.report(newSpan()); + reporter.report(newSpan()); + + // Then: one or both spans should be dropped + Long droppedCount = metricsReporter.counters.get("jaeger.spans.state=dropped"); + assertThat(droppedCount, anyOf(equalTo(1L), equalTo(2L))); + } + + @Test + public void testCloseWhenQueueFull() { + // change sender to blocking mode + sender.permitAppend(0); + + // fill the queue + for (int i = 0; i < maxQueueSize + 10; i++) { + reporter.report(newSpan()); + } + + reporter.close(); + + // expect no exception thrown + } + + @Test + public void testFlushWhenQueueFull() { + // change sender to blocking mode + sender.permitAppend(0); + + // fill the queue + for (int i = 0; i < maxQueueSize + 10; i++) { + reporter.report(newSpan()); + } + + ((RemoteReporter) reporter).flush(); + + // expect no exception thrown + } + + @Test + public void testFlushUpdatesQueueLength() throws Exception { + // change sender to blocking mode + sender.permitAppend(0); + RemoteReporter remoteReporter = (RemoteReporter) reporter; + remoteReporter.flush(); + + for (int i = 0; i < 10; i++) { + reporter.report(newSpan()); + } + + assertEquals(0, metricsReporter.gauges.get("jaeger.reporter-queue").longValue()); + + remoteReporter.flush(); + + assertTrue(metricsReporter.gauges.get("jaeger.reporter-queue") > 5); + } + + private Span newSpan() { + return (Span) tracer.buildSpan("x").start(); + } }