Skip to content

Commit

Permalink
[Docs] Docs tests should wait for async execution to complete (#28481)
Browse files Browse the repository at this point in the history
This commit splits the async execution documentation into 2 parts, one
for the async method itself and one for the action listener. This allows
to add more doc and to use CountDownLatches in doc tests to wait for
asynchronous operations to be completed before moving to the next test.

It also renames few files.

Related to #28457
  • Loading branch information
tlrx authored Feb 1, 2018
1 parent d860971 commit bb97c00
Show file tree
Hide file tree
Showing 20 changed files with 552 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
Expand Down Expand Up @@ -65,6 +66,7 @@
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static java.util.Collections.emptyMap;
Expand All @@ -86,6 +88,7 @@
*/
public class CRUDDocumentationIT extends ESRestHighLevelClientTestCase {

@SuppressWarnings({"unchecked", "rawtypes"})
public void testIndex() throws Exception {
RestHighLevelClient client = highLevelClient();

Expand Down Expand Up @@ -227,9 +230,8 @@ public void testIndex() throws Exception {
}
{
IndexRequest request = new IndexRequest("posts", "doc", "async").source("field", "value");

// tag::index-execute-async
client.indexAsync(request, new ActionListener<IndexResponse>() {
// tag::index-execute-listener
ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
// <1>
Expand All @@ -239,13 +241,22 @@ public void onResponse(IndexResponse indexResponse) {
public void onFailure(Exception e) {
// <2>
}
});
};
// end::index-execute-listener

// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener(listener, latch);

// tag::index-execute-async
client.indexAsync(request, listener); // <1>
// end::index-execute-async

assertBusy(() -> assertTrue(client.exists(new GetRequest("posts", "doc", "async"))));
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}

@SuppressWarnings({"unchecked", "rawtypes"})
public void testUpdate() throws Exception {
RestHighLevelClient client = highLevelClient();
{
Expand Down Expand Up @@ -490,8 +501,8 @@ public void testUpdate() throws Exception {
{
UpdateRequest request = new UpdateRequest("posts", "doc", "async").doc("reason", "async update").docAsUpsert(true);

// tag::update-execute-async
client.updateAsync(request, new ActionListener<UpdateResponse>() {
// tag::update-execute-listener
ActionListener<UpdateResponse> listener = new ActionListener<UpdateResponse>() {
@Override
public void onResponse(UpdateResponse updateResponse) {
// <1>
Expand All @@ -501,13 +512,22 @@ public void onResponse(UpdateResponse updateResponse) {
public void onFailure(Exception e) {
// <2>
}
});
};
// end::update-execute-listener

// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener(listener, latch);

// tag::update-execute-async
client.updateAsync(request, listener); // <1>
// end::update-execute-async

assertBusy(() -> assertTrue(client.exists(new GetRequest("posts", "doc", "async"))));
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}

@SuppressWarnings({"unchecked", "rawtypes"})
public void testDelete() throws Exception {
RestHighLevelClient client = highLevelClient();

Expand Down Expand Up @@ -602,8 +622,8 @@ public void testDelete() throws Exception {

DeleteRequest request = new DeleteRequest("posts", "doc", "async");

// tag::delete-execute-async
client.deleteAsync(request, new ActionListener<DeleteResponse>() {
// tag::delete-execute-listener
ActionListener<DeleteResponse> listener = new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
// <1>
Expand All @@ -613,14 +633,23 @@ public void onResponse(DeleteResponse deleteResponse) {
public void onFailure(Exception e) {
// <2>
}
});
};
// end::delete-execute-listener

// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener(listener, latch);

// tag::delete-execute-async
client.deleteAsync(request, listener); // <1>
// end::delete-execute-async

assertBusy(() -> assertFalse(client.exists(new GetRequest("posts", "doc", "async"))));
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}

public void testBulk() throws IOException {
@SuppressWarnings({"unchecked", "rawtypes"})
public void testBulk() throws Exception {
RestHighLevelClient client = highLevelClient();
{
// tag::bulk-request
Expand Down Expand Up @@ -696,8 +725,8 @@ public void testBulk() throws IOException {
request.waitForActiveShards(ActiveShardCount.ALL); // <2>
// end::bulk-request-active-shards

// tag::bulk-execute-async
client.bulkAsync(request, new ActionListener<BulkResponse>() {
// tag::bulk-execute-listener
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
// <1>
Expand All @@ -707,12 +736,23 @@ public void onResponse(BulkResponse bulkResponse) {
public void onFailure(Exception e) {
// <2>
}
});
};
// end::bulk-execute-listener

// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener(listener, latch);

// tag::bulk-execute-async
client.bulkAsync(request, listener); // <1>
// end::bulk-execute-async

assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}

public void testGet() throws IOException {
@SuppressWarnings({"unchecked", "rawtypes"})
public void testGet() throws Exception {
RestHighLevelClient client = highLevelClient();
{
String mappings = "{\n" +
Expand Down Expand Up @@ -839,8 +879,9 @@ public void testGet() throws IOException {
}
{
GetRequest request = new GetRequest("posts", "doc", "1");
//tag::get-execute-async
client.getAsync(request, new ActionListener<GetResponse>() {

// tag::get-execute-listener
ActionListener<GetResponse> listener = new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse getResponse) {
// <1>
Expand All @@ -850,8 +891,18 @@ public void onResponse(GetResponse getResponse) {
public void onFailure(Exception e) {
// <2>
}
});
};
// end::get-execute-listener

// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener(listener, latch);

//tag::get-execute-async
client.getAsync(request, listener); // <1>
//end::get-execute-async

assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
{
//tag::get-indexnotfound
Expand Down
Loading

0 comments on commit bb97c00

Please sign in to comment.