Skip to content

Commit

Permalink
Fix failing integration tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
jtibshirani committed Dec 10, 2018
1 parent 3993aa9 commit 537763e
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public void testBulkProcessorConcurrentRequests() throws Exception {
for (BulkItemResponse bulkItemResponse : listener.bulkItems) {
assertThat(bulkItemResponse.getFailureMessage(), bulkItemResponse.isFailed(), equalTo(false));
assertThat(bulkItemResponse.getIndex(), equalTo("test"));
assertThat(bulkItemResponse.getType(), equalTo("test"));
assertThat(bulkItemResponse.getType(), equalTo("_doc"));
//with concurrent requests > 1 we can't rely on the order of the bulk requests
assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(numDocs)));
//we do want to check that we don't get duplicate ids back
Expand Down Expand Up @@ -240,12 +240,12 @@ public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception
for (int i = 1; i <= numDocs; i++) {
if (randomBoolean()) {
testDocs++;
processor.add(new IndexRequest("test", "test", Integer.toString(testDocs))
processor.add(new IndexRequest("test", "_doc", Integer.toString(testDocs))
.source(XContentType.JSON, "field", "value"));
multiGetRequest.add("test", "test", Integer.toString(testDocs));
multiGetRequest.add("test", Integer.toString(testDocs));
} else {
testReadOnlyDocs++;
processor.add(new IndexRequest("test-ro", "test", Integer.toString(testReadOnlyDocs))
processor.add(new IndexRequest("test-ro", "_doc", Integer.toString(testReadOnlyDocs))
.source(XContentType.JSON, "field", "value"));
}
}
Expand All @@ -262,7 +262,7 @@ public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception
Set<String> readOnlyIds = new HashSet<>();
for (BulkItemResponse bulkItemResponse : listener.bulkItems) {
assertThat(bulkItemResponse.getIndex(), either(equalTo("test")).or(equalTo("test-ro")));
assertThat(bulkItemResponse.getType(), equalTo("test"));
assertThat(bulkItemResponse.getType(), equalTo("_doc"));
if (bulkItemResponse.getIndex().equals("test")) {
assertThat(bulkItemResponse.isFailed(), equalTo(false));
//with concurrent requests > 1 we can't rely on the order of the bulk requests
Expand Down Expand Up @@ -330,12 +330,12 @@ public void testGlobalParametersAndBulkProcessor() throws Exception {
.setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs)
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
.setGlobalIndex("test")
.setGlobalType("test")
.setGlobalType("_doc")
.setGlobalRouting("routing")
.setGlobalPipeline("pipeline_id")
.build()) {

indexDocs(processor, numDocs, null, null, "test", "test", "pipeline_id");
indexDocs(processor, numDocs, null, null, "test", "pipeline_id");
latch.await();

assertThat(listener.beforeCounts.get(), equalTo(1));
Expand All @@ -346,7 +346,7 @@ public void testGlobalParametersAndBulkProcessor() throws Exception {
Iterable<SearchHit> hits = searchAll(new SearchRequest("test").routing("routing"));

assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
assertThat(hits, everyItem(Matchers.allOf(hasIndex("test"), hasType("test"))));
assertThat(hits, everyItem(Matchers.allOf(hasIndex("test"), hasType("_doc"))));
assertThat(hits, containsInAnyOrder(expectedIds(numDocs)));
}
}
Expand All @@ -359,18 +359,18 @@ private Matcher<SearchHit>[] expectedIds(int numDocs) {
.<Matcher<SearchHit>>toArray(Matcher[]::new);
}

private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs, String localIndex, String localType,
private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs, String localIndex,
String globalIndex, String globalType, String globalPipeline) throws Exception {
MultiGetRequest multiGetRequest = new MultiGetRequest();
for (int i = 1; i <= numDocs; i++) {
if (randomBoolean()) {
processor.add(new IndexRequest(localIndex, localType, Integer.toString(i))
processor.add(new IndexRequest(localIndex, "_doc", Integer.toString(i))
.source(XContentType.JSON, "field", randomRealisticUnicodeOfLengthBetween(1, 30)));
} else {
BytesArray data = bytesBulkRequest(localIndex, localType, i);
BytesArray data = bytesBulkRequest(localIndex, "_doc", i);
processor.add(data, globalIndex, globalType, globalPipeline, null, XContentType.JSON);
}
multiGetRequest.add(localIndex, localType, Integer.toString(i));
multiGetRequest.add(localIndex, Integer.toString(i));
}
return multiGetRequest;
}
Expand All @@ -396,15 +396,15 @@ private static BytesArray bytesBulkRequest(String localIndex, String localType,
}

private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) throws Exception {
return indexDocs(processor, numDocs, "test", "test", null, null, null);
return indexDocs(processor, numDocs, "test", null, null, null);
}

private static void assertResponseItems(List<BulkItemResponse> bulkItemResponses, int numDocs) {
assertThat(bulkItemResponses.size(), is(numDocs));
int i = 1;
for (BulkItemResponse bulkItemResponse : bulkItemResponses) {
assertThat(bulkItemResponse.getIndex(), equalTo("test"));
assertThat(bulkItemResponse.getType(), equalTo("test"));
assertThat(bulkItemResponse.getType(), equalTo("_doc"));
assertThat(bulkItemResponse.getId(), equalTo(Integer.toString(i++)));
assertThat("item " + i + " failed with cause: " + bulkItemResponse.getFailureMessage(),
bulkItemResponse.isFailed(), equalTo(false));
Expand All @@ -416,7 +416,7 @@ private static void assertMultiGetResponse(MultiGetResponse multiGetResponse, in
int i = 1;
for (MultiGetItemResponse multiGetItemResponse : multiGetResponse) {
assertThat(multiGetItemResponse.getIndex(), equalTo("test"));
assertThat(multiGetItemResponse.getType(), equalTo("test"));
assertThat(multiGetItemResponse.getType(), equalTo("_doc"));
assertThat(multiGetItemResponse.getId(), equalTo(Integer.toString(i++)));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
public class BulkProcessorRetryIT extends ESRestHighLevelClientTestCase {

private static final String INDEX_NAME = "index";
private static final String TYPE_NAME = "type";

private static BulkProcessor.Builder initBulkProcessorBuilder(BulkProcessor.Listener listener) {
return BulkProcessor.builder(
Expand Down Expand Up @@ -144,9 +143,9 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) {
MultiGetRequest multiGetRequest = new MultiGetRequest();
for (int i = 1; i <= numDocs; i++) {
processor.add(new IndexRequest(INDEX_NAME, TYPE_NAME, Integer.toString(i))
processor.add(new IndexRequest(INDEX_NAME, "_doc", Integer.toString(i))
.source(XContentType.JSON, "field", randomRealisticUnicodeOfCodepointLengthBetween(1, 30)));
multiGetRequest.add(INDEX_NAME, TYPE_NAME, Integer.toString(i));
multiGetRequest.add(INDEX_NAME, Integer.toString(i));
}
return multiGetRequest;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ public void testStartDatafeed() throws Exception {

// Set up the index and docs
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
createIndexRequest.mapping("doc", "timestamp", "type=date", "total", "type=long");
createIndexRequest.mapping("_doc", "timestamp", "type=date", "total", "type=long");
highLevelClient().indices().create(createIndexRequest, RequestOptions.DEFAULT);
BulkRequest bulk = new BulkRequest();
bulk.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
Expand All @@ -538,7 +538,7 @@ public void testStartDatafeed() throws Exception {
while(pastCopy < now) {
IndexRequest doc = new IndexRequest();
doc.index(indexName);
doc.type("doc");
doc.type("_doc");
doc.id("id" + i);
doc.source("{\"total\":" +randomInt(1000) + ",\"timestamp\":"+ pastCopy +"}", XContentType.JSON);
bulk.add(doc);
Expand All @@ -558,7 +558,7 @@ public void testStartDatafeed() throws Exception {
DatafeedConfig datafeed = DatafeedConfig.builder(datafeedId, jobId)
.setIndices(indexName)
.setQueryDelay(TimeValue.timeValueSeconds(1))
.setTypes(Arrays.asList("doc"))
.setTypes(Arrays.asList("_doc"))
.setFrequency(TimeValue.timeValueSeconds(1)).build();
machineLearningClient.putDatafeed(new PutDatafeedRequest(datafeed), RequestOptions.DEFAULT);

Expand Down Expand Up @@ -603,7 +603,7 @@ public void testStopDatafeed() throws Exception {

// Set up the index
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
createIndexRequest.mapping("doc", "timestamp", "type=date", "total", "type=long");
createIndexRequest.mapping("_doc", "timestamp", "type=date", "total", "type=long");
highLevelClient().indices().create(createIndexRequest, RequestOptions.DEFAULT);

// create the job and the datafeed
Expand Down Expand Up @@ -667,7 +667,7 @@ public void testGetDatafeedStats() throws Exception {

// Set up the index
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
createIndexRequest.mapping("doc", "timestamp", "type=date", "total", "type=long");
createIndexRequest.mapping("_doc", "timestamp", "type=date", "total", "type=long");
highLevelClient().indices().create(createIndexRequest, RequestOptions.DEFAULT);

// create the job and the datafeed
Expand Down Expand Up @@ -736,7 +736,7 @@ public void testPreviewDatafeed() throws Exception {

// Set up the index and docs
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
createIndexRequest.mapping("doc", "timestamp", "type=date", "total", "type=long");
createIndexRequest.mapping("_doc", "timestamp", "type=date", "total", "type=long");
highLevelClient().indices().create(createIndexRequest, RequestOptions.DEFAULT);
BulkRequest bulk = new BulkRequest();
bulk.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
Expand All @@ -748,7 +748,7 @@ public void testPreviewDatafeed() throws Exception {
Integer total = randomInt(1000);
IndexRequest doc = new IndexRequest();
doc.index(indexName);
doc.type("doc");
doc.type("_doc");
doc.id("id" + i);
doc.source("{\"total\":" + total + ",\"timestamp\":"+ thePast +"}", XContentType.JSON);
bulk.add(doc);
Expand All @@ -768,7 +768,7 @@ public void testPreviewDatafeed() throws Exception {
DatafeedConfig datafeed = DatafeedConfig.builder(datafeedId, jobId)
.setIndices(indexName)
.setQueryDelay(TimeValue.timeValueSeconds(1))
.setTypes(Collections.singletonList("doc"))
.setTypes(Collections.singletonList("_doc"))
.setFrequency(TimeValue.timeValueSeconds(1)).build();
machineLearningClient.putDatafeed(new PutDatafeedRequest(datafeed), RequestOptions.DEFAULT);

Expand All @@ -795,7 +795,7 @@ private String createExpiredData(String jobId) throws Exception {
String indexId = jobId + "-data";
// Set up the index and docs
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexId);
createIndexRequest.mapping("doc", "timestamp", "type=date,format=epoch_millis", "total", "type=long");
createIndexRequest.mapping("_doc", "timestamp", "type=date,format=epoch_millis", "total", "type=long");
highLevelClient().indices().create(createIndexRequest, RequestOptions.DEFAULT);
BulkRequest bulk = new BulkRequest();
bulk.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
Expand All @@ -809,7 +809,7 @@ private String createExpiredData(String jobId) throws Exception {
long timestamp = nowMillis - TimeValue.timeValueHours(totalBuckets - bucket).getMillis();
int bucketRate = bucket == anomalousBucket ? anomalousRate : normalRate;
for (int point = 0; point < bucketRate; point++) {
IndexRequest indexRequest = new IndexRequest(indexId, "doc");
IndexRequest indexRequest = new IndexRequest(indexId, "_doc");
indexRequest.source(XContentType.JSON, "timestamp", timestamp, "total", randomInt(1000));
bulk.add(indexRequest);
}
Expand All @@ -819,7 +819,7 @@ private String createExpiredData(String jobId) throws Exception {
{
// Index a randomly named unused state document
String docId = "non_existing_job_" + randomFrom("model_state_1234567#1", "quantiles", "categorizer_state#1");
IndexRequest indexRequest = new IndexRequest(".ml-state", "doc", docId);
IndexRequest indexRequest = new IndexRequest(".ml-state", "_doc", docId);
indexRequest.source(Collections.emptyMap(), XContentType.JSON);
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
highLevelClient().index(indexRequest, RequestOptions.DEFAULT);
Expand Down Expand Up @@ -1388,7 +1388,7 @@ private void updateModelSnapshotTimestamp(String jobId, String timestamp) throws
String documentId = jobId + "_model_snapshot_" + snapshotId;

String snapshotUpdate = "{ \"timestamp\": " + timestamp + "}";
UpdateRequest updateSnapshotRequest = new UpdateRequest(".ml-anomalies-" + jobId, "doc", documentId);
UpdateRequest updateSnapshotRequest = new UpdateRequest(".ml-anomalies-" + jobId, "_doc", documentId);
updateSnapshotRequest.doc(snapshotUpdate.getBytes(StandardCharsets.UTF_8), XContentType.JSON);
highLevelClient().update(updateSnapshotRequest, RequestOptions.DEFAULT);

Expand All @@ -1403,7 +1403,7 @@ private String createAndPutDatafeed(String jobId, String indexName) throws IOExc
DatafeedConfig datafeed = DatafeedConfig.builder(datafeedId, jobId)
.setIndices(indexName)
.setQueryDelay(TimeValue.timeValueSeconds(1))
.setTypes(Arrays.asList("doc"))
.setTypes(Arrays.asList("_doc"))
.setFrequency(TimeValue.timeValueSeconds(1)).build();
highLevelClient().machineLearning().putDatafeed(new PutDatafeedRequest(datafeed), RequestOptions.DEFAULT);
return datafeedId;
Expand All @@ -1414,7 +1414,7 @@ public void createModelSnapshot(String jobId, String snapshotId) throws IOExcept
Job job = MachineLearningIT.buildJob(jobId);
highLevelClient().machineLearning().putJob(new PutJobRequest(job), RequestOptions.DEFAULT);

IndexRequest indexRequest = new IndexRequest(".ml-anomalies-shared", "doc", documentId);
IndexRequest indexRequest = new IndexRequest(".ml-anomalies-shared", "_doc", documentId);
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
indexRequest.source("{\"job_id\":\"" + jobId + "\", \"timestamp\":1541587919000, " +
"\"description\":\"State persisted due to job close at 2018-11-07T10:51:59+0000\", " +
Expand All @@ -1434,7 +1434,7 @@ public void createModelSnapshots(String jobId, List<String> snapshotIds) throws

for(String snapshotId : snapshotIds) {
String documentId = jobId + "_model_snapshot_" + snapshotId;
IndexRequest indexRequest = new IndexRequest(".ml-anomalies-shared", "doc", documentId);
IndexRequest indexRequest = new IndexRequest(".ml-anomalies-shared", "_doc", documentId);
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
indexRequest.source("{\"job_id\":\"" + jobId + "\", \"timestamp\":1541587919000, " +
"\"description\":\"State persisted due to job close at 2018-11-07T10:51:59+0000\", " +
Expand Down
Loading

0 comments on commit 537763e

Please sign in to comment.