-
Notifications
You must be signed in to change notification settings - Fork 25.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TSDB: Implement Downsampling ILM Action for time-series indices #87269
Changes from all commits
32519c0
a34aa1c
d003985
f2c5471
a9845fc
45bc157
15650da
d6e99d8
b225e3f
a625c8d
1f47891
c82bd6e
9c0d422
6bc6b15
4114af6
f08afa9
d439aab
e3ac2fd
250543d
1f6b244
7b897e7
b46c422
4d2adec
81a0638
ef903b7
4d0a602
3dac148
4f2770e
9d762a3
5bbb446
6e3087b
e6ea553
b4226a8
2265432
6a6ba9e
6c90a7a
6d91aaa
8bb250c
4e28be7
78baf97
5b1c2dc
5893ce0
40964b2
5bdea59
d121009
66685c3
b860625
48dc8ae
704e9a9
5f38df3
7345a57
2d6d139
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
pr: 87269 | ||
summary: "TSDB: Implement downsampling ILM Action for time-series indices" | ||
area: TSDB | ||
type: feature | ||
issues: | ||
- 68609 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0; you may not use this file except in compliance with the Elastic License | ||
* 2.0. | ||
*/ | ||
package org.elasticsearch.xpack.core.ilm; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.elasticsearch.action.ActionListener; | ||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; | ||
import org.elasticsearch.action.support.master.AcknowledgedResponse; | ||
import org.elasticsearch.client.internal.Client; | ||
import org.elasticsearch.cluster.ClusterState; | ||
import org.elasticsearch.cluster.metadata.IndexMetadata; | ||
import org.elasticsearch.common.Strings; | ||
import org.elasticsearch.core.TimeValue; | ||
import org.elasticsearch.index.IndexNotFoundException; | ||
|
||
import java.util.Objects; | ||
import java.util.function.Function; | ||
|
||
/** | ||
* Deletes the target index created by an operation such as shrink or rollup and | ||
* identified the target index name stored in the lifecycle state of the managed | ||
* index (if any was generated) | ||
*/ | ||
public class CleanupTargetIndexStep extends AsyncRetryDuringSnapshotActionStep { | ||
public static final String NAME = "cleanup-target-index"; | ||
private static final Logger logger = LogManager.getLogger(CleanupTargetIndexStep.class); | ||
|
||
private final Function<IndexMetadata, String> sourceIndexNameSupplier; | ||
private final Function<IndexMetadata, String> targetIndexNameSupplier; | ||
|
||
public CleanupTargetIndexStep( | ||
StepKey key, | ||
StepKey nextStepKey, | ||
Client client, | ||
Function<IndexMetadata, String> sourceIndexNameSupplier, | ||
Function<IndexMetadata, String> targetIndexNameSupplier | ||
) { | ||
super(key, nextStepKey, client); | ||
this.sourceIndexNameSupplier = sourceIndexNameSupplier; | ||
this.targetIndexNameSupplier = targetIndexNameSupplier; | ||
} | ||
|
||
@Override | ||
public boolean isRetryable() { | ||
return true; | ||
} | ||
|
||
Function<IndexMetadata, String> getSourceIndexNameSupplier() { | ||
return sourceIndexNameSupplier; | ||
} | ||
|
||
Function<IndexMetadata, String> getTargetIndexNameSupplier() { | ||
return targetIndexNameSupplier; | ||
} | ||
|
||
@Override | ||
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Void> listener) { | ||
final String sourceIndexName = sourceIndexNameSupplier.apply(indexMetadata); | ||
if (Strings.isNullOrEmpty(sourceIndexName) == false) { | ||
// the current managed index is the target index | ||
if (currentClusterState.metadata().index(sourceIndexName) == null) { | ||
// if the source index does not exist, we'll skip deleting the | ||
// (managed) target index as that will cause data loss | ||
String policyName = indexMetadata.getLifecyclePolicyName(); | ||
logger.warn( | ||
"managed index [{}] has been created as part of policy [{}] and the source index [{}] does not exist " | ||
+ "anymore. will skip the [{}] step", | ||
indexMetadata.getIndex().getName(), | ||
policyName, | ||
sourceIndexName, | ||
NAME | ||
); | ||
listener.onResponse(null); | ||
return; | ||
} | ||
} | ||
|
||
final String targetIndexName = targetIndexNameSupplier.apply(indexMetadata); | ||
// if the target index was not generated there is nothing to delete so we move on | ||
if (Strings.hasText(targetIndexName) == false) { | ||
listener.onResponse(null); | ||
return; | ||
} | ||
getClient().admin() | ||
.indices() | ||
.delete(new DeleteIndexRequest(targetIndexName).masterNodeTimeout(TimeValue.MAX_VALUE), new ActionListener<>() { | ||
@Override | ||
public void onResponse(AcknowledgedResponse acknowledgedResponse) { | ||
// even if not all nodes acked the delete request yet we can consider this operation as successful as | ||
// we'll generate a new index name and attempt to create an index with the newly generated name | ||
listener.onResponse(null); | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
if (e instanceof IndexNotFoundException) { | ||
// we can move on if the index was deleted in the meantime | ||
listener.onResponse(null); | ||
} else { | ||
listener.onFailure(e); | ||
} | ||
} | ||
}); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) { | ||
return true; | ||
} | ||
if (o == null || getClass() != o.getClass()) { | ||
return false; | ||
} | ||
CleanupTargetIndexStep that = (CleanupTargetIndexStep) o; | ||
return super.equals(o) | ||
&& Objects.equals(targetIndexNameSupplier, that.targetIndexNameSupplier) | ||
&& Objects.equals(sourceIndexNameSupplier, that.sourceIndexNameSupplier); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(super.hashCode(), targetIndexNameSupplier, sourceIndexNameSupplier); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,34 +10,41 @@ | |
import org.apache.logging.log4j.Logger; | ||
import org.elasticsearch.cluster.ClusterState; | ||
import org.elasticsearch.cluster.metadata.IndexMetadata; | ||
import org.elasticsearch.cluster.metadata.LifecycleExecutionState; | ||
import org.elasticsearch.cluster.metadata.Metadata; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.index.Index; | ||
|
||
import java.util.Locale; | ||
import java.util.Objects; | ||
import java.util.function.BiFunction; | ||
|
||
/** | ||
* Copy the provided settings from the source to the target index. | ||
* <p> | ||
* The target index is derived from the source index using the provided prefix. | ||
* This is useful for actions like shrink or searchable snapshot that create a new index and migrate the ILM execution from the source | ||
* to the target index. | ||
* The target index is generated by a supplier function. | ||
* This is useful for actions like shrink, rollup or searchable snapshot that create | ||
* a new index and migrate the ILM execution from the source to the target index. | ||
*/ | ||
public class CopySettingsStep extends ClusterStateActionStep { | ||
public static final String NAME = "copy-settings"; | ||
|
||
private static final Logger logger = LogManager.getLogger(CopySettingsStep.class); | ||
|
||
private final String[] settingsKeys; | ||
private final String indexPrefix; | ||
|
||
public CopySettingsStep(StepKey key, StepKey nextStepKey, String indexPrefix, String... settingsKeys) { | ||
private final BiFunction<String, LifecycleExecutionState, String> targetIndexNameSupplier; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now that I have removed the copy-settings step from rollup ILM action, I can revert the I only thought that making it more generic using an index name supplier function may be needed in the future. I don't have a strong opinion about keeping it this way or reverting it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's fine to keep for now |
||
|
||
public CopySettingsStep( | ||
StepKey key, | ||
StepKey nextStepKey, | ||
BiFunction<String, LifecycleExecutionState, String> targetIndexNameSupplier, | ||
String... settingsKeys | ||
) { | ||
super(key, nextStepKey); | ||
Objects.requireNonNull(indexPrefix); | ||
Objects.requireNonNull(settingsKeys); | ||
this.indexPrefix = indexPrefix; | ||
this.settingsKeys = settingsKeys; | ||
this.targetIndexNameSupplier = targetIndexNameSupplier; | ||
} | ||
|
||
@Override | ||
|
@@ -49,17 +56,14 @@ public String[] getSettingsKeys() { | |
return settingsKeys; | ||
} | ||
|
||
public String getIndexPrefix() { | ||
return indexPrefix; | ||
} | ||
BiFunction<String, LifecycleExecutionState, String> getTargetIndexNameSupplier() { | ||
return targetIndexNameSupplier; | ||
}; | ||
|
||
@Override | ||
public ClusterState performAction(Index index, ClusterState clusterState) { | ||
String sourceIndexName = index.getName(); | ||
IndexMetadata sourceIndexMetadata = clusterState.metadata().index(sourceIndexName); | ||
String targetIndexName = indexPrefix + sourceIndexName; | ||
IndexMetadata targetIndexMetadata = clusterState.metadata().index(targetIndexName); | ||
|
||
if (sourceIndexMetadata == null) { | ||
// Index must have been since deleted, ignore it | ||
logger.debug("[{}] lifecycle action for index [{}] executed but index no longer exists", getKey().getAction(), sourceIndexName); | ||
|
@@ -70,6 +74,8 @@ public ClusterState performAction(Index index, ClusterState clusterState) { | |
return clusterState; | ||
} | ||
|
||
String targetIndexName = targetIndexNameSupplier.apply(sourceIndexName, sourceIndexMetadata.getLifecycleExecutionState()); | ||
IndexMetadata targetIndexMetadata = clusterState.metadata().index(targetIndexName); | ||
if (targetIndexMetadata == null) { | ||
String errorMessage = String.format( | ||
Locale.ROOT, | ||
|
@@ -107,11 +113,13 @@ public boolean equals(Object o) { | |
return false; | ||
} | ||
CopySettingsStep that = (CopySettingsStep) o; | ||
return Objects.equals(settingsKeys, that.settingsKeys) && Objects.equals(indexPrefix, that.indexPrefix); | ||
return super.equals(o) | ||
&& Objects.equals(targetIndexNameSupplier, that.targetIndexNameSupplier) | ||
&& Objects.equals(settingsKeys, that.settingsKeys); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(super.hashCode(), settingsKeys, indexPrefix); | ||
return Objects.hash(super.hashCode(), targetIndexNameSupplier, settingsKeys); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can modify
ShrinkAction
so that it usesCleanupTargetIndexStep
instead ofCleanupShrinkIndexStep
in a subsequent PR