Skip to content
This repository has been archived by the owner on Jul 1, 2022. It is now read-only.

Handle full queue in RemoteReporter #180

Merged
merged 7 commits into from
May 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.TimerTask;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -39,6 +40,7 @@
@ToString(exclude = {"commandQueue", "flushTimer", "queueProcessorThread", "metrics"})
@Slf4j
public class RemoteReporter implements Reporter {
private static final int CLOSE_ENQUEUE_TIMEOUT_MILLIS = 1000;
private final BlockingQueue<Command> commandQueue;
private final Timer flushTimer;
private final Thread queueProcessorThread;
Expand Down Expand Up @@ -68,7 +70,7 @@ public RemoteReporter(final Sender sender, int flushInterval, int maxQueueSize,
new TimerTask() {
@Override
public void run() {
commandQueue.add(new FlushCommand());
flush();
}
},
0,
Expand All @@ -78,20 +80,25 @@ public void run() {
@Override
public void report(Span span) {
// Its better to drop spans, than to block here
if (commandQueue.size() == maxQueueSize) {
boolean added = commandQueue.offer(new AppendCommand(span));

if (!added) {
metrics.reporterDropped.inc(1);
return;
}

commandQueue.add(new AppendCommand(span));
}

@Override
public void close() {
commandQueue.add(new CloseCommand());

try {
queueProcessorThread.join();
// best-effort: if we can't add CloseCommand in this time then it probably will never happen
boolean added = commandQueue
.offer(new CloseCommand(), CLOSE_ENQUEUE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
if (added) {
queueProcessorThread.join();
} else {
log.warn("Unable to cleanly close RemoteReporter, command queue is full - probably the"
+ " sender is stuck");
}
} catch (InterruptedException e) {
return;
} finally {
Expand All @@ -105,6 +112,15 @@ public void close() {
}
}

void flush() {
// to reduce the number of updateGauge stats, we only emit queue length on flush
metrics.reporterQueueLength.update(commandQueue.size());

// We can safely drop FlushCommand when the queue is full - sender should take care of flushing
// in such case
commandQueue.offer(new FlushCommand());
}

/*
* The code below implements the command pattern. This pattern is useful for
* situations where multiple threads would need to synchronize on a resource,
Expand Down Expand Up @@ -140,8 +156,6 @@ class FlushCommand implements Command {
public void execute() throws SenderException {
int n = sender.flush();
metrics.reporterSuccess.inc(n);
// to reduce the number of updateGauge stats, we only emit queue length on flush
metrics.reporterQueueLength.update(commandQueue.size());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,18 @@
import com.uber.jaeger.thriftjava.Span;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;

/**
* Sender which stores spans in memory. Appending a new span is a blocking operation unless
* "permitted". By default Integer.MAX_VALUE "appends" are permitted.
*/
public class InMemorySender implements Sender {

private List<Span> appended;
private List<Span> flushed;
private List<Span> received;
private Semaphore semaphore = new Semaphore(Integer.MAX_VALUE);

public InMemorySender() {
appended = new ArrayList<Span>();
Expand All @@ -54,6 +61,11 @@ public List<Span> getReceived() {

@Override
public int append(com.uber.jaeger.Span span) throws SenderException {
try {
semaphore.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
com.uber.jaeger.thriftjava.Span thriftSpan = JaegerThriftSpanConverter.convertSpan(span);
appended.add(thriftSpan);
received.add(thriftSpan);
Expand All @@ -73,4 +85,15 @@ public int flush() throws SenderException {
public int close() throws SenderException {
return flush();
}

/**
* Removes previously granted "append" permits and grants
* a new number of permits
*
* @param number number of "appends" to permit
*/
public void permitAppend(int number) {
semaphore.drainPermits();
semaphore.release(number);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@

package com.uber.jaeger.reporters;

import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import com.uber.jaeger.Span;
Expand All @@ -32,7 +35,10 @@
import com.uber.jaeger.metrics.Metrics;
import com.uber.jaeger.metrics.StatsFactoryImpl;
import com.uber.jaeger.samplers.ConstSampler;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Before;
import org.junit.Test;

Expand Down Expand Up @@ -62,7 +68,7 @@ public void setUp() throws Exception {
public void testRemoteReporterReport() throws Exception {
Span span = (Span) tracer.buildSpan("raza").start();
reporter.report(span);
Thread.sleep(5);
Thread.sleep(50);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I increased it because it was failing every time on my laptop. There's already a PR to make it right: #177

List<com.uber.jaeger.thriftjava.Span> received = sender.getReceived();

assertEquals(received.size(), 1);
Expand Down Expand Up @@ -103,4 +109,115 @@ public void testRemoteReporterFlushTimerThread() throws Exception {
}
assertFalse(flushTimerThreadCount == 0);
}

// Starts a number of threads. Each can fill the queue on its own, so they will exceed its
// capacity many times over
@Test
public void testReportDoesntThrowWhenQueueFull() throws Exception {
final AtomicBoolean exceptionWasThrown = new AtomicBoolean(false);

int threadsCount = 10;
final CyclicBarrier barrier = new CyclicBarrier(threadsCount);
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < threadsCount; i++) {
Thread t = createSpanReportingThread(exceptionWasThrown, barrier);
threads.add(t);
t.start();
}

for (Thread t : threads) {
t.join();
}

assertFalse(exceptionWasThrown.get());
}

private Thread createSpanReportingThread(final AtomicBoolean exceptionWasThrown,
final CyclicBarrier barrier) {
return new Thread(new Runnable() {
@Override
public void run() {
for (int x = 0; x < maxQueueSize; x++) {
try {
barrier.await();
reporter.report(newSpan());
} catch (Throwable e) {
e.printStackTrace();
exceptionWasThrown.set(true);
}
}
}
});
}

@Test
public void testAppendWhenQueueFull() {
// change sender to blocking mode
sender.permitAppend(0);

for (int i = 0; i < maxQueueSize; i++) {
reporter.report(newSpan());
}

// When: at this point the queue is full or there is one slot empty (if the worker thread has
// already picked up some command). We add two spans to make sure that we overfill the queue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the workers are still reading from the queue, then it looks like you have a race condition and no guarantee that the next two commands would actually cause a queue overflow

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's why I changed InMemorySender to become blocking:

// change sender to blocking mode
sender.permitAppend(0);

Thanks to this nothing is processing the queue - QueueProcessor picks only 1 AppendCommand and blocks on the append call.
So this test:

  1. adds maxQueueSize spans to the queue
  2. QueueProcessor takes 1 commands and blocks
  3. adds additional span to make the queue full
  4. adds one more span and it is dropped without throwing exception

Point 2 sometimes might not happen (due to scheduling) - then both span from points 3) and 4) are dropped.

I tried to come with some simple way of testing various cases without possibility of race conditions - adding that semaphore/permits to InMemorySender was the simplest solution I could find.

reporter.report(newSpan());
reporter.report(newSpan());

// Then: one or both spans should be dropped
Long droppedCount = metricsReporter.counters.get("jaeger.spans.state=dropped");
assertThat(droppedCount, anyOf(equalTo(1L), equalTo(2L)));
}

@Test
public void testCloseWhenQueueFull() {
// change sender to blocking mode
sender.permitAppend(0);

// fill the queue
for (int i = 0; i < maxQueueSize + 10; i++) {
reporter.report(newSpan());
}

reporter.close();

// expect no exception thrown
}

@Test
public void testFlushWhenQueueFull() {
// change sender to blocking mode
sender.permitAppend(0);

// fill the queue
for (int i = 0; i < maxQueueSize + 10; i++) {
reporter.report(newSpan());
}

((RemoteReporter) reporter).flush();

// expect no exception thrown
}

@Test
public void testFlushUpdatesQueueLength() throws Exception {
// change sender to blocking mode
sender.permitAppend(0);
RemoteReporter remoteReporter = (RemoteReporter) reporter;
remoteReporter.flush();

for (int i = 0; i < 10; i++) {
reporter.report(newSpan());
}

assertEquals(0, metricsReporter.gauges.get("jaeger.reporter-queue").longValue());

remoteReporter.flush();

assertTrue(metricsReporter.gauges.get("jaeger.reporter-queue") > 5);
}

private Span newSpan() {
return (Span) tracer.buildSpan("x").start();
}
}