Skip to content
This repository has been archived by the owner on Jul 1, 2022. It is now read-only.

Commit

Permalink
Handle full queue better in RemoteReporter (#180)
Browse files Browse the repository at this point in the history
* Fix RemoteReport race condition when queue is full

The check “commandQueue.size() == maxQueueSize” is not thread safe, this lead to:
java.lang.IllegalStateException: Queue full

It’s better to .offer() and check the returned value

* Fix RemoteReporter problems with command queue is full

When the command queue is full it’s not possible to queue control
commands - FlushCommand or CloseCommand. They were throwing exceptions.

This change replaces BLockingQueue.add with BlockingQueue.offer to
handle this situation.

I’ve modified test InMemorySender to allow it to block. This makes it
possible to easily fill the RemoteReporter’s command queue in tests.

* RemoteReporter’s queue length metric update when queue full

When the command queue is full further FlushCommands are not enqueued
and queue length metric does not update. I moved it outside the
command - now it updates whenever scheduled flush() executes.
  • Loading branch information
Marcin Biegan authored and yurishkuro committed May 14, 2017
1 parent 797ef4f commit 4c06ea6
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.TimerTask;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -39,6 +40,7 @@
@ToString(exclude = {"commandQueue", "flushTimer", "queueProcessorThread", "metrics"})
@Slf4j
public class RemoteReporter implements Reporter {
private static final int CLOSE_ENQUEUE_TIMEOUT_MILLIS = 1000;
private final BlockingQueue<Command> commandQueue;
private final Timer flushTimer;
private final Thread queueProcessorThread;
Expand Down Expand Up @@ -68,7 +70,7 @@ public RemoteReporter(final Sender sender, int flushInterval, int maxQueueSize,
new TimerTask() {
@Override
public void run() {
commandQueue.add(new FlushCommand());
flush();
}
},
0,
Expand All @@ -78,20 +80,25 @@ public void run() {
@Override
public void report(Span span) {
// Its better to drop spans, than to block here
if (commandQueue.size() == maxQueueSize) {
boolean added = commandQueue.offer(new AppendCommand(span));

if (!added) {
metrics.reporterDropped.inc(1);
return;
}

commandQueue.add(new AppendCommand(span));
}

@Override
public void close() {
commandQueue.add(new CloseCommand());

try {
queueProcessorThread.join();
// best-effort: if we can't add CloseCommand in this time then it probably will never happen
boolean added = commandQueue
.offer(new CloseCommand(), CLOSE_ENQUEUE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
if (added) {
queueProcessorThread.join();
} else {
log.warn("Unable to cleanly close RemoteReporter, command queue is full - probably the"
+ " sender is stuck");
}
} catch (InterruptedException e) {
return;
} finally {
Expand All @@ -105,6 +112,15 @@ public void close() {
}
}

void flush() {
// to reduce the number of updateGauge stats, we only emit queue length on flush
metrics.reporterQueueLength.update(commandQueue.size());

// We can safely drop FlushCommand when the queue is full - sender should take care of flushing
// in such case
commandQueue.offer(new FlushCommand());
}

/*
* The code below implements the command pattern. This pattern is useful for
* situations where multiple threads would need to synchronize on a resource,
Expand Down Expand Up @@ -140,8 +156,6 @@ class FlushCommand implements Command {
public void execute() throws SenderException {
int n = sender.flush();
metrics.reporterSuccess.inc(n);
// to reduce the number of updateGauge stats, we only emit queue length on flush
metrics.reporterQueueLength.update(commandQueue.size());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,18 @@
import com.uber.jaeger.thriftjava.Span;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;

/**
* Sender which stores spans in memory. Appending a new span is a blocking operation unless
* "permitted". By default Integer.MAX_VALUE "appends" are permitted.
*/
public class InMemorySender implements Sender {

private List<Span> appended;
private List<Span> flushed;
private List<Span> received;
private Semaphore semaphore = new Semaphore(Integer.MAX_VALUE);

public InMemorySender() {
appended = new ArrayList<Span>();
Expand All @@ -54,6 +61,11 @@ public List<Span> getReceived() {

@Override
public int append(com.uber.jaeger.Span span) throws SenderException {
try {
semaphore.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
com.uber.jaeger.thriftjava.Span thriftSpan = JaegerThriftSpanConverter.convertSpan(span);
appended.add(thriftSpan);
received.add(thriftSpan);
Expand All @@ -73,4 +85,15 @@ public int flush() throws SenderException {
public int close() throws SenderException {
return flush();
}

/**
* Removes previously granted "append" permits and grants
* a new number of permits
*
* @param number number of "appends" to permit
*/
public void permitAppend(int number) {
semaphore.drainPermits();
semaphore.release(number);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@

package com.uber.jaeger.reporters;

import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import com.uber.jaeger.Span;
Expand All @@ -32,7 +35,10 @@
import com.uber.jaeger.metrics.Metrics;
import com.uber.jaeger.metrics.StatsFactoryImpl;
import com.uber.jaeger.samplers.ConstSampler;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Before;
import org.junit.Test;

Expand Down Expand Up @@ -62,7 +68,7 @@ public void setUp() throws Exception {
public void testRemoteReporterReport() throws Exception {
Span span = (Span) tracer.buildSpan("raza").start();
reporter.report(span);
Thread.sleep(5);
Thread.sleep(50);
List<com.uber.jaeger.thriftjava.Span> received = sender.getReceived();

assertEquals(received.size(), 1);
Expand Down Expand Up @@ -103,4 +109,115 @@ public void testRemoteReporterFlushTimerThread() throws Exception {
}
assertFalse(flushTimerThreadCount == 0);
}

// Starts a number of threads. Each can fill the queue on its own, so they will exceed its
// capacity many times over
@Test
public void testReportDoesntThrowWhenQueueFull() throws Exception {
final AtomicBoolean exceptionWasThrown = new AtomicBoolean(false);

int threadsCount = 10;
final CyclicBarrier barrier = new CyclicBarrier(threadsCount);
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < threadsCount; i++) {
Thread t = createSpanReportingThread(exceptionWasThrown, barrier);
threads.add(t);
t.start();
}

for (Thread t : threads) {
t.join();
}

assertFalse(exceptionWasThrown.get());
}

private Thread createSpanReportingThread(final AtomicBoolean exceptionWasThrown,
final CyclicBarrier barrier) {
return new Thread(new Runnable() {
@Override
public void run() {
for (int x = 0; x < maxQueueSize; x++) {
try {
barrier.await();
reporter.report(newSpan());
} catch (Throwable e) {
e.printStackTrace();
exceptionWasThrown.set(true);
}
}
}
});
}

@Test
public void testAppendWhenQueueFull() {
// change sender to blocking mode
sender.permitAppend(0);

for (int i = 0; i < maxQueueSize; i++) {
reporter.report(newSpan());
}

// When: at this point the queue is full or there is one slot empty (if the worker thread has
// already picked up some command). We add two spans to make sure that we overfill the queue
reporter.report(newSpan());
reporter.report(newSpan());

// Then: one or both spans should be dropped
Long droppedCount = metricsReporter.counters.get("jaeger.spans.state=dropped");
assertThat(droppedCount, anyOf(equalTo(1L), equalTo(2L)));
}

@Test
public void testCloseWhenQueueFull() {
// change sender to blocking mode
sender.permitAppend(0);

// fill the queue
for (int i = 0; i < maxQueueSize + 10; i++) {
reporter.report(newSpan());
}

reporter.close();

// expect no exception thrown
}

@Test
public void testFlushWhenQueueFull() {
// change sender to blocking mode
sender.permitAppend(0);

// fill the queue
for (int i = 0; i < maxQueueSize + 10; i++) {
reporter.report(newSpan());
}

((RemoteReporter) reporter).flush();

// expect no exception thrown
}

@Test
public void testFlushUpdatesQueueLength() throws Exception {
// change sender to blocking mode
sender.permitAppend(0);
RemoteReporter remoteReporter = (RemoteReporter) reporter;
remoteReporter.flush();

for (int i = 0; i < 10; i++) {
reporter.report(newSpan());
}

assertEquals(0, metricsReporter.gauges.get("jaeger.reporter-queue").longValue());

remoteReporter.flush();

assertTrue(metricsReporter.gauges.get("jaeger.reporter-queue") > 5);
}

private Span newSpan() {
return (Span) tracer.buildSpan("x").start();
}
}

0 comments on commit 4c06ea6

Please sign in to comment.