From f0105ccfe6526bf045edb7f0b5ead2c13c166f4a Mon Sep 17 00:00:00 2001 From: Weicong Sun <61702346+weicongs-amazon@users.noreply.github.com> Date: Fri, 6 Nov 2020 14:46:39 -0800 Subject: [PATCH] Add checkpoint index retention for multi entity detector (#283) --- .../ad/cluster/DailyCron.java | 1 + .../ad/cluster/MasterEventListener.java | 27 ++-- .../ad/cluster/diskcleanup/IndexCleanup.java | 127 ++++++++++++++++++ .../ModelCheckpointIndexRetention.java | 115 ++++++++++++++++ .../ad/cluster/MasterEventListenerTests.java | 18 +-- .../diskcleanup/IndexCleanupTests.java | 127 ++++++++++++++++++ .../ModelCheckpointIndexRetentionTests.java | 100 ++++++++++++++ 7 files changed, 496 insertions(+), 19 deletions(-) create mode 100644 src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanup.java create mode 100644 src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetention.java create mode 100644 src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanupTests.java create mode 100644 src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetentionTests.java diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCron.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCron.java index 9b22a38e..646c6a0f 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCron.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCron.java @@ -31,6 +31,7 @@ import com.amazon.opendistroforelasticsearch.ad.ml.CheckpointDao; import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; +@Deprecated public class DailyCron implements Runnable { private static final Logger LOG = LogManager.getLogger(DailyCron.class); protected static final String FIELD_MODEL = "queue"; diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListener.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListener.java index 3dbba85c..e35d53fa 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListener.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListener.java @@ -25,13 +25,16 @@ import org.elasticsearch.threadpool.Scheduler.Cancellable; import org.elasticsearch.threadpool.ThreadPool; +import com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup.IndexCleanup; +import com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup.ModelCheckpointIndexRetention; import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings; import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer; +import com.google.common.annotations.VisibleForTesting; public class MasterEventListener implements LocalNodeMasterListener { - private Cancellable dailyCron; + private Cancellable checkpointIndexRetentionCron; private Cancellable hourlyCron; private ClusterService clusterService; private ThreadPool threadPool; @@ -70,18 +73,19 @@ public void beforeStop() { }); } - if (dailyCron == null) { - dailyCron = threadPool + if (checkpointIndexRetentionCron == null) { + IndexCleanup indexCleanup = new IndexCleanup(client, clientUtil, clusterService); + checkpointIndexRetentionCron = threadPool .scheduleWithFixedDelay( - new DailyCron(clock, AnomalyDetectorSettings.CHECKPOINT_TTL, clientUtil), + new ModelCheckpointIndexRetention(AnomalyDetectorSettings.CHECKPOINT_TTL, clock, indexCleanup), TimeValue.timeValueHours(24), executorName() ); clusterService.addLifecycleListener(new LifecycleListener() { @Override public void beforeStop() { - cancel(dailyCron); - dailyCron = null; + cancel(checkpointIndexRetentionCron); + checkpointIndexRetentionCron = null; } }); } @@ -90,9 +94,9 @@ public void beforeStop() { @Override public void offMaster() { cancel(hourlyCron); - cancel(dailyCron); + cancel(checkpointIndexRetentionCron); hourlyCron = null; - dailyCron = null; + checkpointIndexRetentionCron = null; } private void cancel(Cancellable cron) { @@ -101,11 +105,12 @@ private void cancel(Cancellable cron) { } } - public Cancellable getDailyCron() { - return dailyCron; + @VisibleForTesting + protected Cancellable getCheckpointIndexRetentionCron() { + return checkpointIndexRetentionCron; } - public Cancellable getHourlyCron() { + protected Cancellable getHourlyCron() { return hourlyCron; } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanup.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanup.java new file mode 100644 index 00000000..7044f459 --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanup.java @@ -0,0 +1,127 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup; + +import java.util.Arrays; +import java.util.Objects; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.stats.CommonStats; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.reindex.DeleteByQueryAction; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.index.store.StoreStats; + +import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; + +/** + * Clean up the old docs for indices. + */ +public class IndexCleanup { + private static final Logger LOG = LogManager.getLogger(IndexCleanup.class); + + private final Client client; + private final ClientUtil clientUtil; + private final ClusterService clusterService; + + public IndexCleanup(Client client, ClientUtil clientUtil, ClusterService clusterService) { + this.client = client; + this.clientUtil = clientUtil; + this.clusterService = clusterService; + } + + /** + * delete docs when shard size is bigger than max limitation. + * @param indexName index name + * @param maxShardSize max shard size + * @param queryForDeleteByQueryRequest query request + * @param listener action listener + */ + public void deleteDocsBasedOnShardSize( + String indexName, + long maxShardSize, + QueryBuilder queryForDeleteByQueryRequest, + ActionListener listener + ) { + + if (!clusterService.state().getRoutingTable().hasIndex(indexName)) { + LOG.debug("skip as the index:{} doesn't exist", indexName); + return; + } + + ActionListener indicesStatsResponseListener = ActionListener.wrap(indicesStatsResponse -> { + // Check if any shard size is bigger than maxShardSize + boolean cleanupNeeded = Arrays + .stream(indicesStatsResponse.getShards()) + .map(ShardStats::getStats) + .filter(Objects::nonNull) + .map(CommonStats::getStore) + .filter(Objects::nonNull) + .map(StoreStats::getSizeInBytes) + .anyMatch(size -> size > maxShardSize); + + if (cleanupNeeded) { + deleteDocsByQuery( + indexName, + queryForDeleteByQueryRequest, + ActionListener.wrap(r -> listener.onResponse(true), listener::onFailure) + ); + } else { + listener.onResponse(false); + } + }, listener::onFailure); + + getCheckpointShardStoreStats(indexName, indicesStatsResponseListener); + } + + private void getCheckpointShardStoreStats(String indexName, ActionListener listener) { + IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); + indicesStatsRequest.store(); + indicesStatsRequest.indices(indexName); + client.admin().indices().stats(indicesStatsRequest, listener); + } + + /** + * Delete docs based on query request + * @param indexName index name + * @param queryForDeleteByQueryRequest query request + * @param listener action listener + */ + public void deleteDocsByQuery(String indexName, QueryBuilder queryForDeleteByQueryRequest, ActionListener listener) { + DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(indexName) + .setQuery(queryForDeleteByQueryRequest) + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) + .setRefresh(true); + + try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { + clientUtil.execute(DeleteByQueryAction.INSTANCE, deleteRequest, ActionListener.wrap(response -> { + // if 0 docs get deleted, it means our query cannot find any matching doc + LOG.info("{} docs are deleted for index:{}", response.getDeleted(), indexName); + listener.onResponse(response.getDeleted()); + }, listener::onFailure)); + } + + } +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetention.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetention.java new file mode 100644 index 00000000..8db2ab82 --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetention.java @@ -0,0 +1,115 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup; + +import java.time.Clock; +import java.time.Duration; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.index.query.QueryBuilders; + +import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; +import com.amazon.opendistroforelasticsearch.ad.ml.CheckpointDao; + +/** + * Model checkpoints cleanup of multi-entity detectors. + *

Problem: + * In multi-entity detectors, we can have thousands, even millions of entities, of which the model checkpoints will consume + * lots of disk resources. To protect the our disk usage, the checkpoint index size will be limited with specified threshold. + * Once its size exceeds the threshold, the model checkpoints cleanup process will be activated. + *

+ *

Solution: + * Before multi-entity detectors, there is daily cron job to clean up the inactive checkpoints longer than some configurable days. + * We will keep the this logic, and add new clean up way based on shard size. + *

+ */ +public class ModelCheckpointIndexRetention implements Runnable { + private static final Logger LOG = LogManager.getLogger(ModelCheckpointIndexRetention.class); + + // The recommended max shard size is 50G, we don't wanna our index exceeds this number + private static final long MAX_SHARD_SIZE_IN_BYTE = 50 * 1024 * 1024 * 1024L; + // We can't clean up all of the checkpoints. At least keep models for 1 day + private static final Duration MINIMUM_CHECKPOINT_TTL = Duration.ofDays(1); + + private final Duration defaultCheckpointTtl; + private final Clock clock; + private final IndexCleanup indexCleanup; + + public ModelCheckpointIndexRetention(Duration defaultCheckpointTtl, Clock clock, IndexCleanup indexCleanup) { + this.defaultCheckpointTtl = defaultCheckpointTtl; + this.clock = clock; + this.indexCleanup = indexCleanup; + } + + @Override + public void run() { + indexCleanup + .deleteDocsByQuery( + CommonName.CHECKPOINT_INDEX_NAME, + QueryBuilders + .boolQuery() + .filter( + QueryBuilders + .rangeQuery(CheckpointDao.TIMESTAMP) + .lte(clock.millis() - defaultCheckpointTtl.toMillis()) + .format(CommonName.EPOCH_MILLIS_FORMAT) + ), + ActionListener + .wrap( + response -> { cleanupBasedOnShardSize(defaultCheckpointTtl.minusDays(1)); }, + // The docs will be deleted in next scheduled windows. No need for retrying. + exception -> LOG.error("delete docs by query fails for checkpoint index", exception) + ) + ); + + } + + private void cleanupBasedOnShardSize(Duration cleanUpTtl) { + indexCleanup + .deleteDocsBasedOnShardSize( + CommonName.CHECKPOINT_INDEX_NAME, + MAX_SHARD_SIZE_IN_BYTE, + QueryBuilders + .boolQuery() + .filter( + QueryBuilders + .rangeQuery(CheckpointDao.TIMESTAMP) + .lte(clock.millis() - cleanUpTtl.toMillis()) + .format(CommonName.EPOCH_MILLIS_FORMAT) + ), + ActionListener.wrap(cleanupNeeded -> { + if (cleanupNeeded) { + if (cleanUpTtl.equals(MINIMUM_CHECKPOINT_TTL)) { + return; + } + + Duration nextCleanupTtl = cleanUpTtl.minusDays(1); + if (nextCleanupTtl.compareTo(MINIMUM_CHECKPOINT_TTL) < 0) { + nextCleanupTtl = MINIMUM_CHECKPOINT_TTL; + } + cleanupBasedOnShardSize(nextCleanupTtl); + } else { + LOG.debug("clean up not needed anymore for checkpoint index"); + } + }, + // The docs will be deleted in next scheduled windows. No need for retrying. + exception -> LOG.error("checkpoint index retention based on shard size fails", exception) + ) + ); + } +} diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java index 385b45ee..eb1f8dfb 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java @@ -36,6 +36,7 @@ import org.junit.Before; import com.amazon.opendistroforelasticsearch.ad.AbstractADTest; +import com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup.ModelCheckpointIndexRetention; import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer; @@ -46,7 +47,7 @@ public class MasterEventListenerTests extends AbstractADTest { private Client client; private Clock clock; private Cancellable hourlyCancellable; - private Cancellable dailyCancellable; + private Cancellable checkpointIndexRetentionCancellable; private MasterEventListener masterService; private ClientUtil clientUtil; private DiscoveryNodeFilterer nodeFilter; @@ -58,10 +59,11 @@ public void setUp() throws Exception { clusterService = mock(ClusterService.class); threadPool = mock(ThreadPool.class); hourlyCancellable = mock(Cancellable.class); - dailyCancellable = mock(Cancellable.class); + checkpointIndexRetentionCancellable = mock(Cancellable.class); when(threadPool.scheduleWithFixedDelay(any(HourlyCron.class), any(TimeValue.class), any(String.class))) .thenReturn(hourlyCancellable); - when(threadPool.scheduleWithFixedDelay(any(DailyCron.class), any(TimeValue.class), any(String.class))).thenReturn(dailyCancellable); + when(threadPool.scheduleWithFixedDelay(any(ModelCheckpointIndexRetention.class), any(TimeValue.class), any(String.class))) + .thenReturn(checkpointIndexRetentionCancellable); client = mock(Client.class); clock = mock(Clock.class); clientUtil = mock(ClientUtil.class); @@ -75,11 +77,11 @@ public void setUp() throws Exception { public void testOnOffMaster() { masterService.onMaster(); assertThat(hourlyCancellable, is(notNullValue())); - assertThat(dailyCancellable, is(notNullValue())); + assertThat(checkpointIndexRetentionCancellable, is(notNullValue())); assertTrue(!masterService.getHourlyCron().isCancelled()); - assertTrue(!masterService.getDailyCron().isCancelled()); + assertTrue(!masterService.getCheckpointIndexRetentionCron().isCancelled()); masterService.offMaster(); - assertThat(masterService.getDailyCron(), is(nullValue())); + assertThat(masterService.getCheckpointIndexRetentionCron(), is(nullValue())); assertThat(masterService.getHourlyCron(), is(nullValue())); } @@ -100,10 +102,10 @@ public void testBeforeStop() { }).when(clusterService).addLifecycleListener(any()); masterService.onMaster(); - assertThat(masterService.getDailyCron(), is(nullValue())); + assertThat(masterService.getCheckpointIndexRetentionCron(), is(nullValue())); assertThat(masterService.getHourlyCron(), is(nullValue())); masterService.offMaster(); - assertThat(masterService.getDailyCron(), is(nullValue())); + assertThat(masterService.getCheckpointIndexRetentionCron(), is(nullValue())); assertThat(masterService.getHourlyCron(), is(nullValue())); } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanupTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanupTests.java new file mode 100644 index 00000000..fe181f1e --- /dev/null +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanupTests.java @@ -0,0 +1,127 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.stats.CommonStats; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.IndicesAdminClient; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.index.reindex.DeleteByQueryAction; +import org.elasticsearch.index.store.StoreStats; +import org.junit.Assert; +import org.mockito.Answers; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import com.amazon.opendistroforelasticsearch.ad.AbstractADTest; +import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; + +public class IndexCleanupTests extends AbstractADTest { + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + Client client; + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + ClusterService clusterService; + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + ClientUtil clientUtil; + + @InjectMocks + IndexCleanup indexCleanup; + + @Mock + IndicesStatsResponse indicesStatsResponse; + + @Mock + ShardStats shardStats; + + @Mock + CommonStats commonStats; + + @Mock + StoreStats storeStats; + + @Mock + IndicesAdminClient indicesAdminClient; + + @SuppressWarnings("unchecked") + @Override + public void setUp() throws Exception { + super.setUp(); + super.setUpLog4jForJUnit(IndexCleanup.class); + MockitoAnnotations.initMocks(this); + when(clusterService.state().getRoutingTable().hasIndex(anyString())).thenReturn(true); + indexCleanup = new IndexCleanup(client, clientUtil, clusterService); + when(indicesStatsResponse.getShards()).thenReturn(new ShardStats[] { shardStats }); + when(shardStats.getStats()).thenReturn(commonStats); + when(commonStats.getStore()).thenReturn(storeStats); + when(client.admin().indices()).thenReturn(indicesAdminClient); + when(client.threadPool().getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + ActionListener listener = (ActionListener) args[1]; + listener.onResponse(indicesStatsResponse); + return null; + }).when(indicesAdminClient).stats(any(), any()); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + super.tearDownLog4jForJUnit(); + } + + public void testDeleteDocsBasedOnShardSizeWithCleanupNeededAsTrue() throws Exception { + long maxShardSize = 1000; + when(storeStats.getSizeInBytes()).thenReturn(maxShardSize + 1); + indexCleanup.deleteDocsBasedOnShardSize("indexname", maxShardSize, null, ActionListener.wrap(result -> { + assertTrue(result); + verify(clientUtil).execute(eq(DeleteByQueryAction.INSTANCE), any(), any()); + }, exception -> { throw new RuntimeException(exception); })); + } + + public void testDeleteDocsBasedOnShardSizeWithCleanupNeededAsFalse() throws Exception { + long maxShardSize = 1000; + when(storeStats.getSizeInBytes()).thenReturn(maxShardSize - 1); + indexCleanup + .deleteDocsBasedOnShardSize( + "indexname", + maxShardSize, + null, + ActionListener.wrap(Assert::assertFalse, exception -> { throw new RuntimeException(exception); }) + ); + } + + public void testDeleteDocsBasedOnShardSizeIndexNotExisted() throws Exception { + when(clusterService.state().getRoutingTable().hasIndex(anyString())).thenReturn(false); + indexCleanup.deleteDocsBasedOnShardSize("indexname", 1000, null, null); + assertTrue(testAppender.containsMessage("skip as the index:indexname doesn't exist")); + } +} diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetentionTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetentionTests.java new file mode 100644 index 00000000..24bd6848 --- /dev/null +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetentionTests.java @@ -0,0 +1,100 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.time.Clock; +import java.time.Duration; + +import org.elasticsearch.action.ActionListener; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import com.amazon.opendistroforelasticsearch.ad.AbstractADTest; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; + +public class ModelCheckpointIndexRetentionTests extends AbstractADTest { + + Duration defaultCheckpointTtl = Duration.ofDays(3); + + Clock clock = Clock.systemUTC(); + + @Mock + IndexCleanup indexCleanup; + + ModelCheckpointIndexRetention modelCheckpointIndexRetention; + + @SuppressWarnings("unchecked") + @Before + public void setUp() throws Exception { + super.setUp(); + super.setUpLog4jForJUnit(IndexCleanup.class); + MockitoAnnotations.initMocks(this); + modelCheckpointIndexRetention = new ModelCheckpointIndexRetention(defaultCheckpointTtl, clock, indexCleanup); + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + ActionListener listener = (ActionListener) args[2]; + listener.onResponse(1L); + return null; + }).when(indexCleanup).deleteDocsByQuery(anyString(), any(), any()); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + super.tearDownLog4jForJUnit(); + } + + @SuppressWarnings("unchecked") + @Test + public void testRunWithCleanupAsNeeded() throws Exception { + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + ActionListener listener = (ActionListener) args[3]; + listener.onResponse(true); + return null; + }).when(indexCleanup).deleteDocsBasedOnShardSize(eq(CommonName.CHECKPOINT_INDEX_NAME), eq(50 * 1024 * 1024 * 1024L), any(), any()); + + modelCheckpointIndexRetention.run(); + verify(indexCleanup, times(2)) + .deleteDocsBasedOnShardSize(eq(CommonName.CHECKPOINT_INDEX_NAME), eq(50 * 1024 * 1024 * 1024L), any(), any()); + verify(indexCleanup).deleteDocsByQuery(eq(CommonName.CHECKPOINT_INDEX_NAME), any(), any()); + } + + @SuppressWarnings("unchecked") + @Test + public void testRunWithCleanupAsFalse() throws Exception { + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + ActionListener listener = (ActionListener) args[3]; + listener.onResponse(false); + return null; + }).when(indexCleanup).deleteDocsBasedOnShardSize(eq(CommonName.CHECKPOINT_INDEX_NAME), eq(50 * 1024 * 1024 * 1024L), any(), any()); + + modelCheckpointIndexRetention.run(); + verify(indexCleanup).deleteDocsBasedOnShardSize(eq(CommonName.CHECKPOINT_INDEX_NAME), eq(50 * 1024 * 1024 * 1024L), any(), any()); + verify(indexCleanup).deleteDocsByQuery(eq(CommonName.CHECKPOINT_INDEX_NAME), any(), any()); + } +}