Skip to content

Commit

Permalink
Add processors to autoscaling capacity response (elastic#87895)
Browse files Browse the repository at this point in the history
This adds `processors` to the autoscaling capacity response.

`processors` in the current capacity calculation is determined by the `allocated_processors` field in OsInfo. The reason behind this is because `allocated_processors` can be overridden by the process starting the node. Consequently, when autoscaling deciders need to look at the current processors and request for more, they should look at `allocated_processors` not `available_processors`. Mostly, this is because certain systems may want to account for threads being used by external processes and not allow ES to use all the available processors on the OS.
  • Loading branch information
benwtrent authored Jun 28, 2022
1 parent b34f2f6 commit 164decb
Show file tree
Hide file tree
Showing 29 changed files with 763 additions and 293 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/87895.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 87895
summary: Add processors to autoscaling capacity response
area: Autoscaling
type: enhancement
issues: []
6 changes: 6 additions & 0 deletions x-pack/plugin/autoscaling/qa/rest/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ restResources {
}
}

tasks.named("yamlRestTestV7CompatTest").configure {
systemProperty 'tests.rest.blacklist', [
"autoscaling/get_autoscaling_capacity/Test get fixed autoscaling capacity",
].join(',')
}

testClusters.configureEach {
testDistribution = 'DEFAULT'
setting 'xpack.security.enabled', 'true'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
- match: { policies.my_autoscaling_policy.deciders.fixed.required_capacity.total.memory: 73310 }
- match: { policies.my_autoscaling_policy.deciders.fixed.required_capacity.node.storage: 1337 }
- match: { policies.my_autoscaling_policy.deciders.fixed.required_capacity.node.memory: 7331 }
- match: { policies.my_autoscaling_policy.deciders.fixed.reason_summary: "fixed storage [1.3kb] memory [7.1kb] nodes [10]" }
- match: { policies.my_autoscaling_policy.deciders.fixed.reason_summary: "fixed storage [1.3kb] memory [7.1kb] processors [null] nodes [10]" }
- length: { policies.my_autoscaling_policy.current_nodes: 0 }


Expand Down Expand Up @@ -71,6 +71,52 @@
- gte: { policies.my_autoscaling_policy.current_capacity.node.memory: 0 }
- length: { policies.my_autoscaling_policy.current_nodes: 1 }

# test cleanup
- do:
autoscaling.delete_autoscaling_policy:
name: my_autoscaling_policy
---
"Test get fixed autoscaling capacity with processors":
- do:
autoscaling.put_autoscaling_policy:
name: my_autoscaling_policy
body:
# voting_only requires master to start so we are sure no nodes match
roles: ["voting_only"]
deciders:
fixed:
storage: 1337b
memory: 7331b
processors: 2
nodes: 10

- match: { "acknowledged": true }

- do:
autoscaling.get_autoscaling_capacity: {}

- match: { policies.my_autoscaling_policy.required_capacity.total.storage: 13370 }
- match: { policies.my_autoscaling_policy.required_capacity.total.memory: 73310 }
- match: { policies.my_autoscaling_policy.required_capacity.total.processors: 20 }
- match: { policies.my_autoscaling_policy.required_capacity.node.storage: 1337 }
- match: { policies.my_autoscaling_policy.required_capacity.node.memory: 7331 }
- match: { policies.my_autoscaling_policy.required_capacity.node.processors: 2 }
- match: { policies.my_autoscaling_policy.current_capacity.total.storage: 0 }
- match: { policies.my_autoscaling_policy.current_capacity.total.memory: 0 }
- match: { policies.my_autoscaling_policy.current_capacity.total.processors: 0 }
- match: { policies.my_autoscaling_policy.current_capacity.node.storage: 0 }
- match: { policies.my_autoscaling_policy.current_capacity.node.memory: 0 }
- match: { policies.my_autoscaling_policy.current_capacity.node.processors: 0 }
- match: { policies.my_autoscaling_policy.deciders.fixed.required_capacity.total.storage: 13370 }
- match: { policies.my_autoscaling_policy.deciders.fixed.required_capacity.total.memory: 73310 }
- match: { policies.my_autoscaling_policy.deciders.fixed.required_capacity.total.processors: 20 }
- match: { policies.my_autoscaling_policy.deciders.fixed.required_capacity.node.storage: 1337 }
- match: { policies.my_autoscaling_policy.deciders.fixed.required_capacity.node.memory: 7331 }
- match: { policies.my_autoscaling_policy.deciders.fixed.required_capacity.node.processors: 2 }
- match: { policies.my_autoscaling_policy.deciders.fixed.reason_summary: "fixed storage [1.3kb] memory [7.1kb] processors [2.0] nodes [10]" }
- length: { policies.my_autoscaling_policy.current_nodes: 0 }


# test cleanup
- do:
autoscaling.delete_autoscaling_policy:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResult;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderService;
import org.elasticsearch.xpack.autoscaling.capacity.FixedAutoscalingDeciderService;
import org.elasticsearch.xpack.autoscaling.capacity.memory.AutoscalingMemoryInfoService;
import org.elasticsearch.xpack.autoscaling.capacity.nodeinfo.AutoscalingNodeInfoService;
import org.elasticsearch.xpack.autoscaling.existence.FrozenExistenceDeciderService;
import org.elasticsearch.xpack.autoscaling.rest.RestDeleteAutoscalingPolicyHandler;
import org.elasticsearch.xpack.autoscaling.rest.RestGetAutoscalingCapacityHandler;
Expand Down Expand Up @@ -119,13 +119,13 @@ public Collection<Object> createComponents(
return List.of(
new AutoscalingCalculateCapacityService.Holder(this),
autoscalingLicenseChecker,
new AutoscalingMemoryInfoService(clusterService, client)
new AutoscalingNodeInfoService(clusterService, client)
);
}

@Override
public List<Setting<?>> getSettings() {
return List.of(AutoscalingMemoryInfoService.FETCH_TIMEOUT);
return List.of(AutoscalingNodeInfoService.FETCH_TIMEOUT);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.autoscaling.AutoscalingLicenseChecker;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingCalculateCapacityService;
import org.elasticsearch.xpack.autoscaling.capacity.memory.AutoscalingMemoryInfoService;
import org.elasticsearch.xpack.autoscaling.capacity.nodeinfo.AutoscalingNodeInfoService;

import java.util.Objects;

Expand All @@ -40,7 +40,7 @@ public class TransportGetAutoscalingCapacityAction extends TransportMasterNodeAc
private final AutoscalingCalculateCapacityService capacityService;
private final ClusterInfoService clusterInfoService;
private final SnapshotsInfoService snapshotsInfoService;
private final AutoscalingMemoryInfoService memoryInfoService;
private final AutoscalingNodeInfoService nodeInfoService;
private final AutoscalingLicenseChecker autoscalingLicenseChecker;
private final CapacityResponseCache<GetAutoscalingCapacityAction.Response> responseCache = new CapacityResponseCache<>(
run -> threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(run),
Expand All @@ -57,7 +57,7 @@ public TransportGetAutoscalingCapacityAction(
final AutoscalingCalculateCapacityService.Holder capacityServiceHolder,
final ClusterInfoService clusterInfoService,
final SnapshotsInfoService snapshotsInfoService,
final AutoscalingMemoryInfoService memoryInfoService,
final AutoscalingNodeInfoService nodeInfoService,
final AllocationDeciders allocationDeciders,
final AutoscalingLicenseChecker autoscalingLicenseChecker
) {
Expand All @@ -73,7 +73,7 @@ public TransportGetAutoscalingCapacityAction(
ThreadPool.Names.SAME
);
this.snapshotsInfoService = snapshotsInfoService;
this.memoryInfoService = memoryInfoService;
this.nodeInfoService = nodeInfoService;
this.capacityService = capacityServiceHolder.get(allocationDeciders);
this.clusterInfoService = clusterInfoService;
this.autoscalingLicenseChecker = Objects.requireNonNull(autoscalingLicenseChecker);
Expand Down Expand Up @@ -104,7 +104,7 @@ private GetAutoscalingCapacityAction.Response computeCapacity(Runnable ensureNot
clusterService.state(),
clusterInfoService.getClusterInfo(),
snapshotsInfoService.snapshotShardSizes(),
memoryInfoService.snapshot(),
nodeInfoService.snapshot(),
ensureNotCancelled
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import org.elasticsearch.xpack.autoscaling.Autoscaling;
import org.elasticsearch.xpack.autoscaling.AutoscalingMetadata;
import org.elasticsearch.xpack.autoscaling.action.PolicyValidator;
import org.elasticsearch.xpack.autoscaling.capacity.memory.AutoscalingMemoryInfo;
import org.elasticsearch.xpack.autoscaling.capacity.nodeinfo.AutoscalingNodeInfo;
import org.elasticsearch.xpack.autoscaling.capacity.nodeinfo.AutoscalingNodesInfo;
import org.elasticsearch.xpack.autoscaling.policy.AutoscalingPolicy;

import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
Expand Down Expand Up @@ -107,7 +109,7 @@ public SortedMap<String, AutoscalingDeciderResults> calculate(
ClusterState state,
ClusterInfo clusterInfo,
SnapshotShardSizeInfo shardSizeInfo,
AutoscalingMemoryInfo memoryInfo,
AutoscalingNodesInfo autoscalingNodesInfo,
Runnable ensureNotCancelled
) {
AutoscalingMetadata autoscalingMetadata = state.metadata().custom(AutoscalingMetadata.NAME);
Expand All @@ -119,7 +121,14 @@ public SortedMap<String, AutoscalingDeciderResults> calculate(
.map(
e -> Tuple.tuple(
e.getKey(),
calculateForPolicy(e.getValue().policy(), state, clusterInfo, shardSizeInfo, memoryInfo, ensureNotCancelled)
calculateForPolicy(
e.getValue().policy(),
state,
clusterInfo,
shardSizeInfo,
autoscalingNodesInfo,
ensureNotCancelled
)
)
)
.collect(Collectors.toMap(Tuple::v1, Tuple::v2))
Expand All @@ -134,7 +143,7 @@ private AutoscalingDeciderResults calculateForPolicy(
ClusterState state,
ClusterInfo clusterInfo,
SnapshotShardSizeInfo shardSizeInfo,
AutoscalingMemoryInfo memoryInfo,
AutoscalingNodesInfo autoscalingNodesInfo,
Runnable ensureNotCancelled
) {
if (hasUnknownRoles(policy)) {
Expand All @@ -150,7 +159,7 @@ private AutoscalingDeciderResults calculateForPolicy(
state,
clusterInfo,
shardSizeInfo,
memoryInfo,
autoscalingNodesInfo,
ensureNotCancelled
);
SortedMap<String, AutoscalingDeciderResult> results = deciders.entrySet()
Expand Down Expand Up @@ -191,10 +200,10 @@ DefaultAutoscalingDeciderContext createContext(
ClusterState state,
ClusterInfo clusterInfo,
SnapshotShardSizeInfo shardSizeInfo,
AutoscalingMemoryInfo memoryInfo,
AutoscalingNodesInfo autoscalingNodesInfo,
Runnable ensureNotCancelled
) {
return new DefaultAutoscalingDeciderContext(roles, state, clusterInfo, shardSizeInfo, memoryInfo, ensureNotCancelled);
return new DefaultAutoscalingDeciderContext(roles, state, clusterInfo, shardSizeInfo, autoscalingNodesInfo, ensureNotCancelled);
}

/**
Expand Down Expand Up @@ -223,7 +232,7 @@ static class DefaultAutoscalingDeciderContext implements AutoscalingDeciderConte
private final ClusterState state;
private final ClusterInfo clusterInfo;
private final SnapshotShardSizeInfo snapshotShardSizeInfo;
private final AutoscalingMemoryInfo memoryInfo;
private final AutoscalingNodesInfo autoscalingNodesInfo;
private final SortedSet<DiscoveryNode> currentNodes;
private final AutoscalingCapacity currentCapacity;
private final boolean currentCapacityAccurate;
Expand All @@ -234,7 +243,7 @@ static class DefaultAutoscalingDeciderContext implements AutoscalingDeciderConte
ClusterState state,
ClusterInfo clusterInfo,
SnapshotShardSizeInfo snapshotShardSizeInfo,
AutoscalingMemoryInfo memoryInfo,
AutoscalingNodesInfo autoscalingNodesInfo,
Runnable ensureNotCancelled
) {
this.roles = roles.stream().map(DiscoveryNodeRole::getRoleFromRoleName).collect(Sets.toUnmodifiableSortedSet());
Expand All @@ -243,7 +252,7 @@ static class DefaultAutoscalingDeciderContext implements AutoscalingDeciderConte
this.state = state;
this.clusterInfo = clusterInfo;
this.snapshotShardSizeInfo = snapshotShardSizeInfo;
this.memoryInfo = memoryInfo;
this.autoscalingNodesInfo = autoscalingNodesInfo;
this.currentNodes = state.nodes()
.stream()
.filter(this::rolesFilter)
Expand Down Expand Up @@ -295,7 +304,7 @@ private boolean nodeHasAccurateCapacity(DiscoveryNode node) {
}
}

return memoryInfo.get(node) != null;
return autoscalingNodesInfo.get(node).isPresent();
}

private AutoscalingCapacity calculateCurrentCapacity() {
Expand All @@ -319,10 +328,11 @@ private AutoscalingCapacity.AutoscalingResources resourcesFor(DiscoveryNode node
)
: 0L;

Long memory = memoryInfo.get(node);
Optional<AutoscalingNodeInfo> memoryAndProcessors = autoscalingNodesInfo.get(node);
return new AutoscalingCapacity.AutoscalingResources(
storage == -1 ? ByteSizeValue.ZERO : new ByteSizeValue(storage),
memory == null ? ByteSizeValue.ZERO : new ByteSizeValue(memory)
memoryAndProcessors.map(AutoscalingNodeInfo::memory).map(ByteSizeValue::new).orElse(ByteSizeValue.ZERO),
memoryAndProcessors.map(AutoscalingNodeInfo::processors).orElse(0f)
);
}

Expand Down
Loading

0 comments on commit 164decb

Please sign in to comment.