Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature/extensions] Migrate Create, Update, and Validate actions to extension #792

Merged
merged 26 commits into from
Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
14684ff
Add High Level Rest Client to extension
dbwiddis Jan 8, 2023
e532819
Copy SDK version of RestIndexAction and Abstract parent
dbwiddis Jan 8, 2023
4ca9c85
Changes to prepareRequest for Extensions
dbwiddis Jan 8, 2023
ba80c7a
Implement response on success
dbwiddis Jan 8, 2023
aab2571
Reorder methods logically
dbwiddis Jan 8, 2023
e969993
Remove duplicate exception handling
dbwiddis Jan 8, 2023
465ed68
Pass RestHighLevelClient to action handlers
dbwiddis Jan 9, 2023
aa73a1f
Renaming for consistency
dbwiddis Jan 9, 2023
64478a3
Bypass superclass causing NPE
dbwiddis Jan 10, 2023
449c233
Working create (once) with hacks
dbwiddis Jan 18, 2023
aa5ca52
Add SettingsUpdateConsumers to SDKClusterService
dbwiddis Jan 19, 2023
e1778b3
Fix compile errors
dbwiddis Jan 20, 2023
415f229
Working create detector
dbwiddis Jan 20, 2023
c490d89
Migrate to SDKClient Wrapper end-of-day checkpoint
dbwiddis Jan 23, 2023
317fd7d
Migrate ParseUtils call
dbwiddis Jan 24, 2023
bd969e9
More class migrations
dbwiddis Jan 24, 2023
1e38200
Add SearchFeatureDao
dbwiddis Jan 25, 2023
92c389d
Clean up getDetector()
dbwiddis Jan 25, 2023
03cf8c4
Validate Detector Rest Handler
dbwiddis Jan 25, 2023
7e84eb0
Fix debug issues
dbwiddis Jan 27, 2023
acfb18f
Use an SDKClusterSettings wrapper to reduce diff
dbwiddis Jan 31, 2023
098b659
Tests compile
dbwiddis Jan 31, 2023
b4b31e9
Bypass tests with incomplete components
dbwiddis Jan 31, 2023
a7a837e
Undo accidental method deletion
dbwiddis Feb 1, 2023
01dd362
Delete temporary class
dbwiddis Feb 4, 2023
7f68dc7
Code review fixes
dbwiddis Feb 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,14 @@ List<String> jacocoExclusions = [
'org.opensearch.ad.transport.IndexAnomalyDetectorTransportAction',
'org.opensearch.ad.transport.handler.ADSearchHandler',

// TODO: Disabled until create components integration is complete
// https://github.com/opensearch-project/opensearch-sdk-java/issues/283
'org.opensearch.ad.indices.AnomalyDetectionIndices.IndexState',
'org.opensearch.ad.feature.SearchFeatureDao.TopEntitiesListener',
'org.opensearch.ad.ml.CheckpointDao',
'org.opensearch.ad.transport.handler.MultiEntityResultHandler',
'org.opensearch.ad.transport.handler.AnomalyResultBulkIndexHandler',

// TODO: Removing all code except for create detector.
// See https://github.com/opensearch-project/opensearch-sdk/issues/20
'org.opensearch.ad.util.ParseUtils',
Expand Down Expand Up @@ -756,9 +764,9 @@ dependencies {
// Removed Common Utils dependency from AD
// implementation "org.opensearch:common-utils:${common_utils_version}"
implementation "org.opensearch.sdk:opensearch-sdk-java:1.0.0-SNAPSHOT"
implementation "org.opensearch.client:opensearch-rest-client:${opensearch_version}"
implementation "org.opensearch.client:opensearch-java:${opensearch_version}"
implementation "org.opensearch.client:opensearch-rest-client:${opensearch_version}"
implementation "org.opensearch.client:opensearch-rest-high-level-client:${opensearch_version}"
implementation group: 'com.google.guava', name: 'guava', version:'31.0.1-jre'
implementation group: 'com.google.guava', name: 'failureaccess', version:'1.0.1'
implementation group: 'org.javassist', name: 'javassist', version:'3.28.0-GA'
Expand Down
32 changes: 25 additions & 7 deletions src/main/java/org/opensearch/ad/AnomalyDetectorExtension.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.DetectorInternalState;
import org.opensearch.ad.rest.RestCreateDetectorAction;
import org.opensearch.ad.rest.RestGetDetectorAction;
import org.opensearch.ad.rest.RestValidateDetectorAction;
import org.opensearch.ad.rest.RestIndexAnomalyDetectorAction;
import org.opensearch.ad.rest.RestValidateAnomalyDetectorAction;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.settings.EnabledSetting;
import org.opensearch.client.opensearch.OpenSearchClient;
Expand All @@ -31,13 +31,16 @@
import org.opensearch.sdk.ExtensionRestHandler;
import org.opensearch.sdk.ExtensionsRunner;
import org.opensearch.sdk.SDKClient;
import org.opensearch.sdk.SDKClient.SDKRestClient;

import com.google.common.collect.ImmutableList;

public class AnomalyDetectorExtension extends BaseExtension {

private static final String EXTENSION_SETTINGS_PATH = "/ad-extension.yml";

public static final String AD_BASE_DETECTORS_URI = "/detectors";

public AnomalyDetectorExtension() {
super(EXTENSION_SETTINGS_PATH);
}
Expand All @@ -46,9 +49,13 @@ public AnomalyDetectorExtension() {
public List<ExtensionRestHandler> getExtensionRestHandlers() {
return List
.of(
new RestCreateDetectorAction(extensionsRunner, this),
new RestGetDetectorAction(),
new RestValidateDetectorAction(extensionsRunner, this)
new RestIndexAnomalyDetectorAction(extensionsRunner, this),
// FIXME delete this
// new RestCreateDetectorAction(extensionsRunner, this),
new RestValidateAnomalyDetectorAction(extensionsRunner, this),
new RestGetDetectorAction()
// FIXME delete this
// new RestValidateDetectorAction(extensionsRunner, this)
);
}

Expand Down Expand Up @@ -105,15 +112,26 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
// TODO: replace or override client object on BaseExtension
// https://github.com/opensearch-project/opensearch-sdk-java/issues/160
public OpenSearchClient getClient() {
SDKClient sdkClient = new SDKClient();
OpenSearchClient client = sdkClient
@SuppressWarnings("resource")
OpenSearchClient client = new SDKClient()
.initializeJavaClient(
getExtensionSettings().getOpensearchAddress(),
Integer.parseInt(getExtensionSettings().getOpensearchPort())
);
return client;
}

@Deprecated
public SDKRestClient getRestClient() {
@SuppressWarnings("resource")
SDKRestClient client = new SDKClient()
.initializeRestClient(
getExtensionSettings().getOpensearchAddress(),
Integer.parseInt(getExtensionSettings().getOpensearchPort())
);
return client;
}

public static void main(String[] args) throws IOException {
// Execute this extension by instantiating it and passing to ExtensionsRunner
ExtensionsRunner.run(new AnomalyDetectorExtension());
Expand Down
77 changes: 37 additions & 40 deletions src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,12 @@
import org.opensearch.ad.ml.HybridThresholdingModel;
import org.opensearch.ad.ml.ModelManager;
import org.opensearch.ad.ratelimit.CheckpointWriteWorker;
import org.opensearch.ad.rest.RestIndexAnomalyDetectorAction;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.settings.EnabledSetting;
import org.opensearch.ad.stats.ADStats;
import org.opensearch.ad.task.ADBatchTaskRunner;
import org.opensearch.ad.task.ADTaskCacheManager;
import org.opensearch.ad.task.ADTaskManager;
import org.opensearch.ad.transport.IndexAnomalyDetectorAction;
import org.opensearch.ad.transport.IndexAnomalyDetectorTransportAction;
import org.opensearch.ad.util.ClientUtil;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.ad.util.IndexUtils;
Expand Down Expand Up @@ -167,8 +164,8 @@ public List<RestHandler> getRestHandlers(
jobRunner.setAdTaskManager(adTaskManager);

RestGetAnomalyDetectorAction restGetAnomalyDetectorAction = new RestGetAnomalyDetectorAction();
*/
RestIndexAnomalyDetectorAction restIndexAnomalyDetectorAction = new RestIndexAnomalyDetectorAction(settings, clusterService);
*/
/* @anomaly-detection.create-detector
RestSearchAnomalyDetectorAction searchAnomalyDetectorAction = new RestSearchAnomalyDetectorAction();
RestSearchAnomalyResultAction searchAnomalyResultAction = new RestSearchAnomalyResultAction();
Expand All @@ -186,21 +183,21 @@ public List<RestHandler> getRestHandlers(
return ImmutableList
.of(
// restGetAnomalyDetectorAction,
restIndexAnomalyDetectorAction
/* @anomaly-detection.create-detector
searchAnomalyDetectorAction,
searchAnomalyResultAction,
searchADTasksAction,
deleteAnomalyDetectorAction,
executeAnomalyDetectorAction,
anomalyDetectorJobAction,
statsAnomalyDetectorAction,
searchAnomalyDetectorInfoAction,
previewAnomalyDetectorAction,
deleteAnomalyResultsAction,
searchTopAnomalyResultAction,
validateAnomalyDetectorAction
*/
// restIndexAnomalyDetectorAction
/* @anomaly-detection.create-detector
searchAnomalyDetectorAction,
searchAnomalyResultAction,
searchADTasksAction,
deleteAnomalyDetectorAction,
executeAnomalyDetectorAction,
anomalyDetectorJobAction,
statsAnomalyDetectorAction,
searchAnomalyDetectorInfoAction,
previewAnomalyDetectorAction,
deleteAnomalyResultsAction,
searchTopAnomalyResultAction,
validateAnomalyDetectorAction
*/
);
}

Expand Down Expand Up @@ -238,8 +235,8 @@ public Collection<Object> createComponents(
*/
// AnomalyDetectionIndices is Injected for IndexAnomalyDetectorTrasnportAction constructor
this.anomalyDetectionIndices = new AnomalyDetectionIndices(
client,
clusterService,
null, // client,
null, // clusterService,
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
threadPool,
settings,
nodeFilter,
Expand All @@ -252,12 +249,12 @@ public Collection<Object> createComponents(
Interpolator interpolator = new LinearUniformInterpolator(singleFeatureLinearUniformInterpolator);
// SearchFeatureDao is Injected for IndexAnomalyDetectorTrasnportAction constructor
SearchFeatureDao searchFeatureDao = new SearchFeatureDao(
client,
null, // Client client,
xContentRegistry,
interpolator,
clientUtil,
settings,
clusterService,
null, // ClusterService clusterService,
AnomalyDetectorSettings.NUM_SAMPLES_PER_TREE
);

Expand Down Expand Up @@ -876,24 +873,24 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
new ActionHandler<>(StatsAnomalyDetectorAction.INSTANCE, StatsAnomalyDetectorTransportAction.class),
new ActionHandler<>(DeleteAnomalyDetectorAction.INSTANCE, DeleteAnomalyDetectorTransportAction.class),
new ActionHandler<>(GetAnomalyDetectorAction.INSTANCE, GetAnomalyDetectorTransportAction.class),
*/
new ActionHandler<>(IndexAnomalyDetectorAction.INSTANCE, IndexAnomalyDetectorTransportAction.class)
/* @anomaly-detection.create-detector
new ActionHandler<>(AnomalyDetectorJobAction.INSTANCE, AnomalyDetectorJobTransportAction.class),
new ActionHandler<>(ADResultBulkAction.INSTANCE, ADResultBulkTransportAction.class),
new ActionHandler<>(EntityResultAction.INSTANCE, EntityResultTransportAction.class),
new ActionHandler<>(EntityProfileAction.INSTANCE, EntityProfileTransportAction.class),
new ActionHandler<>(SearchAnomalyDetectorInfoAction.INSTANCE, SearchAnomalyDetectorInfoTransportAction.class),
new ActionHandler<>(PreviewAnomalyDetectorAction.INSTANCE, PreviewAnomalyDetectorTransportAction.class),
new ActionHandler<>(ADBatchAnomalyResultAction.INSTANCE, ADBatchAnomalyResultTransportAction.class),
new ActionHandler<>(ADBatchTaskRemoteExecutionAction.INSTANCE, ADBatchTaskRemoteExecutionTransportAction.class),
new ActionHandler<>(ADTaskProfileAction.INSTANCE, ADTaskProfileTransportAction.class),
new ActionHandler<>(ADCancelTaskAction.INSTANCE, ADCancelTaskTransportAction.class),
new ActionHandler<>(ForwardADTaskAction.INSTANCE, ForwardADTaskTransportAction.class),
new ActionHandler<>(DeleteAnomalyResultsAction.INSTANCE, DeleteAnomalyResultsTransportAction.class),
new ActionHandler<>(SearchTopAnomalyResultAction.INSTANCE, SearchTopAnomalyResultTransportAction.class),
new ActionHandler<>(ValidateAnomalyDetectorAction.INSTANCE, ValidateAnomalyDetectorTransportAction.class)
*/
*/
/* @anomaly-detection.create-detector
new ActionHandler<>(AnomalyDetectorJobAction.INSTANCE, AnomalyDetectorJobTransportAction.class),
new ActionHandler<>(ADResultBulkAction.INSTANCE, ADResultBulkTransportAction.class),
new ActionHandler<>(EntityResultAction.INSTANCE, EntityResultTransportAction.class),
new ActionHandler<>(EntityProfileAction.INSTANCE, EntityProfileTransportAction.class),
new ActionHandler<>(SearchAnomalyDetectorInfoAction.INSTANCE, SearchAnomalyDetectorInfoTransportAction.class),
new ActionHandler<>(PreviewAnomalyDetectorAction.INSTANCE, PreviewAnomalyDetectorTransportAction.class),
new ActionHandler<>(ADBatchAnomalyResultAction.INSTANCE, ADBatchAnomalyResultTransportAction.class),
new ActionHandler<>(ADBatchTaskRemoteExecutionAction.INSTANCE, ADBatchTaskRemoteExecutionTransportAction.class),
new ActionHandler<>(ADTaskProfileAction.INSTANCE, ADTaskProfileTransportAction.class),
new ActionHandler<>(ADCancelTaskAction.INSTANCE, ADCancelTaskTransportAction.class),
new ActionHandler<>(ForwardADTaskAction.INSTANCE, ForwardADTaskTransportAction.class),
new ActionHandler<>(DeleteAnomalyResultsAction.INSTANCE, DeleteAnomalyResultsTransportAction.class),
new ActionHandler<>(SearchTopAnomalyResultAction.INSTANCE, SearchTopAnomalyResultTransportAction.class),
new ActionHandler<>(ValidateAnomalyDetectorAction.INSTANCE, ValidateAnomalyDetectorTransportAction.class)
*/
);
}

Expand Down
26 changes: 17 additions & 9 deletions src/main/java/org/opensearch/ad/feature/SearchFeatureDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import org.apache.logging.log4j.LogManager;
Expand All @@ -48,14 +49,15 @@
import org.opensearch.ad.model.IntervalTimeConfiguration;
import org.opensearch.ad.util.ClientUtil;
import org.opensearch.ad.util.ParseUtils;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.query.RangeQueryBuilder;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.sdk.SDKClient.SDKRestClient;
import org.opensearch.sdk.SDKClusterService;
import org.opensearch.search.aggregations.Aggregation;
import org.opensearch.search.aggregations.AggregationBuilder;
import org.opensearch.search.aggregations.AggregationBuilders;
Expand Down Expand Up @@ -85,7 +87,7 @@ public class SearchFeatureDao extends AbstractRetriever {
private static final Logger logger = LogManager.getLogger(SearchFeatureDao.class);

// Dependencies
private final Client client;
private final SDKRestClient client;
private final NamedXContentRegistry xContent;
private final Interpolator interpolator;
private final ClientUtil clientUtil;
Expand All @@ -97,12 +99,12 @@ public class SearchFeatureDao extends AbstractRetriever {

// used for testing as we can mock clock
public SearchFeatureDao(
Client client,
SDKRestClient client,
NamedXContentRegistry xContent,
Interpolator interpolator,
ClientUtil clientUtil,
Settings settings,
ClusterService clusterService,
SDKClusterService clusterService,
int minimumDocCount,
Clock clock,
int maxEntitiesForPreview,
Expand All @@ -114,9 +116,15 @@ public SearchFeatureDao(
this.interpolator = interpolator;
this.clientUtil = clientUtil;
this.maxEntitiesForPreview = maxEntitiesForPreview;
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_ENTITIES_FOR_PREVIEW, it -> this.maxEntitiesForPreview = it);
this.pageSize = pageSize;
clusterService.getClusterSettings().addSettingsUpdateConsumer(PAGE_SIZE, it -> this.pageSize = it);
try {
Map<Setting<?>, Consumer<?>> settingsUpdateConsumers = new HashMap<>();
settingsUpdateConsumers.put(MAX_ENTITIES_FOR_PREVIEW, it -> this.maxEntitiesForPreview = (int) it);
settingsUpdateConsumers.put(PAGE_SIZE, it -> this.pageSize = (int) it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(settingsUpdateConsumers);
} catch (Exception e) {
// TODO Handle this
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
}
this.minimumDocCountForPreview = minimumDocCount;
this.previewTimeoutInMilliseconds = previewTimeoutInMilliseconds;
this.clock = clock;
Expand All @@ -135,12 +143,12 @@ public SearchFeatureDao(
* make sure an entity has enough samples for preview
*/
public SearchFeatureDao(
Client client,
SDKRestClient client,
NamedXContentRegistry xContent,
Interpolator interpolator,
ClientUtil clientUtil,
Settings settings,
ClusterService clusterService,
SDKClusterService clusterService,
int minimumDocCount
) {
this(
Expand Down
Loading