Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevents backlog when writing to Elasticsearch #1765

Merged
merged 2 commits into from
Oct 10, 2017
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Attempt to use semaphore as might be more reliable to read than queue…
… size
  • Loading branch information
Adrian Cole committed Oct 10, 2017
commit 69bb3d4e079361e883ff7b9f81f91c97c0fc4085
Original file line number Diff line number Diff line change
@@ -302,7 +302,7 @@ HttpCall.Factory http() {
.build();
ok.dispatcher().setMaxRequests(maxRequests());
ok.dispatcher().setMaxRequestsPerHost(maxRequests());
return new HttpCall.Factory(ok, maxRequests(), HttpUrl.parse(hosts.get(0)));
return new HttpCall.Factory(ok, HttpUrl.parse(hosts.get(0)));
}

@Override public void close() {
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@

import java.io.Closeable;
import java.io.IOException;
import okhttp3.Dispatcher;
import java.util.concurrent.Semaphore;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
@@ -36,12 +36,12 @@ public interface BodyConverter<V> {

public static class Factory implements Closeable {
final OkHttpClient ok;
final int maxRequests;
final Semaphore semaphore;
public final HttpUrl baseUrl;

public Factory(OkHttpClient ok, int maxRequests, HttpUrl baseUrl) {
public Factory(OkHttpClient ok, HttpUrl baseUrl) {
this.ok = ok;
this.maxRequests = maxRequests;
this.semaphore = new Semaphore(ok.dispatcher().getMaxRequests());
this.baseUrl = baseUrl;
}

@@ -56,36 +56,38 @@ public <V> HttpCall<V> newCall(Request request, BodyConverter<V> bodyConverter)

public final okhttp3.Call call;
public final BodyConverter<V> bodyConverter;
final Dispatcher dispatcher;
final int maxRequests;
final Semaphore semaphore;


HttpCall(Factory factory, Request request, BodyConverter<V> bodyConverter) {
this(
factory.ok.newCall(request),
factory.ok.dispatcher(),
factory.maxRequests,
factory.semaphore,
bodyConverter
);
}

HttpCall(okhttp3.Call call, Dispatcher dispatcher, int maxRequests,
BodyConverter<V> bodyConverter) {
HttpCall(okhttp3.Call call, Semaphore semaphore, BodyConverter<V> bodyConverter) {
this.call = call;
this.dispatcher = dispatcher;
this.maxRequests = maxRequests;
this.semaphore = semaphore;
this.bodyConverter = bodyConverter;
}

@Override public V execute() throws IOException {
return parseResponse(call.execute(), bodyConverter);
if (!semaphore.tryAcquire()) throw new IllegalStateException("over capacity");
try {
return parseResponse(call.execute(), bodyConverter);
} finally {
semaphore.release();
}
}

@Override public void enqueue(Callback<V> delegate) {
if (dispatcher.runningCallsCount() == maxRequests) {
if (!semaphore.tryAcquire()) {
delegate.onError(new IllegalStateException("over capacity"));
return;
}
call.enqueue(new V2CallbackAdapter<>(bodyConverter, delegate));
call.enqueue(new V2CallbackAdapter<>(semaphore, bodyConverter, delegate));
}

@Override public void cancel() {
@@ -97,24 +99,28 @@ public <V> HttpCall<V> newCall(Request request, BodyConverter<V> bodyConverter)
}

@Override public HttpCall<V> clone() {
return new HttpCall<V>(call.clone(), dispatcher, maxRequests, bodyConverter);
return new HttpCall<V>(call.clone(), semaphore, bodyConverter);
}

static class V2CallbackAdapter<V> implements okhttp3.Callback {
final Semaphore semaphore;
final BodyConverter<V> bodyConverter;
final Callback<V> delegate;

V2CallbackAdapter(BodyConverter<V> bodyConverter, Callback<V> delegate) {
V2CallbackAdapter(Semaphore semaphore, BodyConverter<V> bodyConverter, Callback<V> delegate) {
this.semaphore = semaphore;
this.bodyConverter = bodyConverter;
this.delegate = delegate;
}

@Override public void onFailure(okhttp3.Call call, IOException e) {
semaphore.release();
delegate.onError(e);
}

/** Note: this runs on the {@link okhttp3.OkHttpClient#dispatcher() dispatcher} thread! */
@Override public void onResponse(okhttp3.Call call, Response response) {
semaphore.release();
try {
delegate.onSuccess(parseResponse(response, bodyConverter));
} catch (Throwable e) {
Original file line number Diff line number Diff line change
@@ -33,6 +33,7 @@

import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
import static zipkin2.elasticsearch.ElasticsearchSpanConsumer.prefixWithTimestampMillisAndQuery;

public class ElasticsearchSpanConsumerTest {
@@ -214,7 +215,15 @@ public void close() throws IOException {
};
// one request is delayed
storage.spanConsumer().accept(asList(TestObjects.CLIENT_SPAN)).enqueue(callback);
// this request is dropped

// synchronous requests fail on backlog
try {
storage.spanConsumer().accept(asList(TestObjects.CLIENT_SPAN)).execute();
failBecauseExceptionWasNotThrown(IllegalStateException.class);
} catch (IllegalStateException e) {
}

// asynchronous requests fail on backlog
storage.spanConsumer().accept(asList(TestObjects.CLIENT_SPAN)).enqueue(callback);

assertThat(q.take())
Original file line number Diff line number Diff line change
@@ -36,7 +36,7 @@ public class HttpCallTest {
@Rule
public MockWebServer mws = new MockWebServer();

HttpCall.Factory http = new HttpCall.Factory(new OkHttpClient(), 1, mws.url(""));
HttpCall.Factory http = new HttpCall.Factory(new OkHttpClient(), mws.url(""));
Request request = new Request.Builder().url(http.baseUrl).build();

@After
Original file line number Diff line number Diff line change
@@ -29,7 +29,7 @@ public class SearchCallFactoryTest {
public MockWebServer es = new MockWebServer();

SearchCallFactory client =
new SearchCallFactory(new HttpCall.Factory(new OkHttpClient(), 1, es.url("")));
new SearchCallFactory(new HttpCall.Factory(new OkHttpClient(), es.url("")));

@After
public void close() throws IOException {