Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
add user in AD task (#370)
Browse files Browse the repository at this point in the history
  • Loading branch information
ylwu-amzn authored Jan 25, 2021
1 parent b93ec3f commit 8230412
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,11 @@ public static String getAnomalyDetectorJobMappings() throws IOException {
*/
public static String getDetectionStateMappings() throws IOException {
URL url = AnomalyDetectionIndices.class.getClassLoader().getResource(ANOMALY_DETECTION_STATE_INDEX_MAPPING_FILE);
return Resources.toString(url, Charsets.UTF_8);
String detectionStateMappings = Resources.toString(url, Charsets.UTF_8);
String detectorIndexMappings = AnomalyDetectionIndices.getAnomalyDetectorMappings();
detectorIndexMappings = detectorIndexMappings
.substring(detectorIndexMappings.indexOf("\"properties\""), detectorIndexMappings.lastIndexOf("}"));
return detectionStateMappings.replace("DETECTOR_INDEX_MAPPING_PLACE_HOLDER", detectorIndexMappings);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import com.amazon.opendistroforelasticsearch.ad.annotation.Generated;
import com.amazon.opendistroforelasticsearch.ad.util.ParseUtils;
import com.amazon.opendistroforelasticsearch.commons.authuser.User;
import com.google.common.base.Objects;

/**
Expand All @@ -54,6 +55,7 @@ public class ADTask implements ToXContentObject, Writeable {
public static final String COORDINATING_NODE_FIELD = "coordinating_node";
public static final String WORKER_NODE_FIELD = "worker_node";
public static final String DETECTOR_FIELD = "detector";
public static final String USER_FIELD = "user";

private String taskId = null;
private Instant lastUpdateTime = null;
Expand All @@ -74,6 +76,7 @@ public class ADTask implements ToXContentObject, Writeable {

private String coordinatingNode = null;
private String workerNode = null;
private User user = null;

private ADTask() {}

Expand All @@ -100,6 +103,11 @@ public ADTask(StreamInput input) throws IOException {
this.stoppedBy = input.readOptionalString();
this.coordinatingNode = input.readOptionalString();
this.workerNode = input.readOptionalString();
if (input.readBoolean()) {
this.user = new User(input);
} else {
user = null;
}
}

@Override
Expand Down Expand Up @@ -127,6 +135,12 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(stoppedBy);
out.writeOptionalString(coordinatingNode);
out.writeOptionalString(workerNode);
if (user != null) {
out.writeBoolean(true); // user exists
user.writeTo(out);
} else {
out.writeBoolean(false); // user does not exist
}
}

public static Builder builder() {
Expand All @@ -152,6 +166,7 @@ public static class Builder {
private String stoppedBy = null;
private String coordinatingNode = null;
private String workerNode = null;
private User user = null;

public Builder() {}

Expand Down Expand Up @@ -245,6 +260,11 @@ public Builder workerNode(String workerNode) {
return this;
}

public Builder user(User user) {
this.user = user;
return this;
}

public ADTask build() {
ADTask adTask = new ADTask();
adTask.taskId = this.taskId;
Expand All @@ -265,6 +285,7 @@ public ADTask build() {
adTask.stoppedBy = this.stoppedBy;
adTask.coordinatingNode = this.coordinatingNode;
adTask.workerNode = this.workerNode;
adTask.user = this.user;

return adTask;
}
Expand Down Expand Up @@ -328,6 +349,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (detector != null) {
xContentBuilder.field(DETECTOR_FIELD, detector);
}
if (user != null) {
xContentBuilder.field(USER_FIELD, user);
}
return xContentBuilder.endObject();
}

Expand All @@ -354,6 +378,7 @@ public static ADTask parse(XContentParser parser, String taskId) throws IOExcept
String parsedTaskId = taskId;
String coordinatingNode = null;
String workerNode = null;
User user = null;

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
Expand Down Expand Up @@ -415,6 +440,9 @@ public static ADTask parse(XContentParser parser, String taskId) throws IOExcept
case WORKER_NODE_FIELD:
workerNode = parser.text();
break;
case USER_FIELD:
user = User.parse(parser);
break;
default:
parser.skipChildren();
break;
Expand Down Expand Up @@ -461,6 +489,7 @@ public static ADTask parse(XContentParser parser, String taskId) throws IOExcept
.coordinatingNode(coordinatingNode)
.workerNode(workerNode)
.detector(anomalyDetector)
.user(user)
.build();
}

Expand Down Expand Up @@ -489,7 +518,8 @@ public boolean equals(Object o) {
&& Objects.equal(getCheckpointId(), that.getCheckpointId())
&& Objects.equal(getCoordinatingNode(), that.getCoordinatingNode())
&& Objects.equal(getWorkerNode(), that.getWorkerNode())
&& Objects.equal(getDetector(), that.getDetector());
&& Objects.equal(getDetector(), that.getDetector())
&& Objects.equal(getUser(), that.getUser());
}

@Generated
Expand All @@ -514,7 +544,8 @@ public int hashCode() {
checkpointId,
coordinatingNode,
workerNode,
detector
detector,
user
);
}

Expand Down Expand Up @@ -598,4 +629,7 @@ public String getWorkerNode() {
return workerNode;
}

public User getUser() {
return user;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,7 @@ private void createNewADTask(AnomalyDetector detector, User user, ActionListener
.lastUpdateTime(now)
.startedBy(userName)
.coordinatingNode(clusterService.localNode().getId())
.user(user)
.build();

IndexRequest request = new IndexRequest(CommonName.DETECTION_STATE_INDEX);
Expand Down
159 changes: 20 additions & 139 deletions src/main/resources/mappings/anomaly-detection-state.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,9 @@
"worker_node": {
"type": "keyword"
},
"detector": {
"user": {
"type": "nested",
"properties": {
"schema_version": {
"type": "integer"
},
"name": {
"type": "text",
"fields": {
Expand All @@ -73,151 +71,34 @@
}
}
},
"description": {
"type": "text"
},
"time_field": {
"type": "keyword"
},
"indices": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"filter_query": {
"type": "object",
"enabled": false
},
"feature_attributes": {
"type": "nested",
"properties": {
"feature_id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"feature_name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"feature_enabled": {
"type": "boolean"
},
"aggregation_query": {
"type": "object",
"enabled": false
"backend_roles": {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword"
}
}
},
"detection_interval": {
"properties": {
"period": {
"properties": {
"interval": {
"type": "integer"
},
"unit": {
"type": "keyword"
}
}
"roles": {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword"
}
}
},
"window_delay": {
"properties": {
"period": {
"properties": {
"interval": {
"type": "integer"
},
"unit": {
"type": "keyword"
}
}
}
}
},
"shingle_size": {
"type": "integer"
},
"last_update_time": {
"type": "date",
"format": "strict_date_time||epoch_millis"
},
"ui_metadata": {
"type": "object",
"enabled": false
},
"user": {
"type": "nested",
"properties": {
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"backend_roles": {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword"
}
}
},
"roles": {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword"
}
}
},
"custom_attribute_names": {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword"
}
}
}
}
},
"category_field": {
"type": "keyword"
},
"detector_type": {
"type": "keyword"
},
"detection_date_range": {
"properties": {
"start_time": {
"type": "date",
"format": "strict_date_time||epoch_millis"
},
"end_time": {
"type": "date",
"format": "strict_date_time||epoch_millis"
"custom_attribute_names": {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword"
}
}
}
}
},
"detector": {
DETECTOR_INDEX_MAPPING_PLACE_HOLDER
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,12 @@ private void createRandomDetector(String indexName) throws IOException {
IndexResponse indexResponse = client().index(new IndexRequest(indexName).source(xContentBuilder)).actionGet();
assertEquals("Doc was not created", RestStatus.CREATED, indexResponse.status());
}

public void testGetDetectionStateIndexMapping() throws IOException {
String detectorIndexMappings = AnomalyDetectionIndices.getAnomalyDetectorMappings();
detectorIndexMappings = detectorIndexMappings
.substring(detectorIndexMappings.indexOf("\"properties\""), detectorIndexMappings.lastIndexOf("}"));
String detectionStateIndexMapping = AnomalyDetectionIndices.getDetectionStateMappings();
assertTrue(detectionStateIndexMapping.contains(detectorIndexMappings));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void testValidHistoricalDetector() throws IOException, InterruptedExcepti
assertEquals(ADTaskState.FINISHED.name(), finishedTask.getState());
}

public void testStartHistoricalDetectorWithUser() throws IOException, InterruptedException {
public void testStartHistoricalDetectorWithUser() throws IOException {
DetectionDateRange dateRange = new DetectionDateRange(startTime, endTime);
AnomalyDetector detector = TestHelpers
.randomDetector(dateRange, ImmutableList.of(maxValueFeature()), testIndex, detectionIntervalInMinutes, timeField);
Expand All @@ -131,6 +131,7 @@ public void testStartHistoricalDetectorWithUser() throws IOException, Interrupte
AnomalyDetectorJobResponse response = nodeClient.execute(MockAnomalyDetectorJobAction.INSTANCE, request).actionGet(100000);
ADTask adTask = getADTask(response.getId());
assertNotNull(adTask.getStartedBy());
assertNotNull(adTask.getUser());
}
}

Expand Down Expand Up @@ -299,6 +300,8 @@ public void testStopRealtimeDetector() throws IOException {
public void testStopHistoricalDetector() throws IOException, InterruptedException {
ADTask adTask = startHistoricalDetector(startTime, endTime);
assertEquals(ADTaskState.INIT.name(), adTask.getState());
assertNull(adTask.getStartedBy());
assertNull(adTask.getUser());
AnomalyDetectorJobRequest request = stopDetectorJobRequest(adTask.getDetectorId());
client().execute(AnomalyDetectorJobAction.INSTANCE, request).actionGet(10000);
Thread.sleep(10000);
Expand Down

0 comments on commit 8230412

Please sign in to comment.