Skip to content

Commit

Permalink
Merge pull request kubernetes#90419 from klueska/upstream-update-reus…
Browse files Browse the repository at this point in the history
…able-cpus-strategy

Update strategy used to reuse CPUs from init containers in CPUManager
  • Loading branch information
k8s-ci-robot authored and Artyom Lukianov committed Jun 2, 2020
1 parent 4dbe82e commit 405c4ed
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 19 deletions.
9 changes: 9 additions & 0 deletions pkg/kubelet/cm/cpumanager/cpu_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,8 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) {
testCase.expInitCSets,
testCase.expCSets...)

cumCSet := cpuset.NewCPUSet()

for i := range containers {
err := mgr.Allocate(testCase.pod, &containers[i])
if err != nil {
Expand All @@ -525,6 +527,13 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) {
t.Errorf("StaticPolicy AddContainer() error (%v). expected cpuset %v for container %v but got %v",
testCase.description, expCSets[i], containers[i].Name, cset)
}

cumCSet = cumCSet.Union(cset)
}

if !testCase.stDefaultCPUSet.Difference(cumCSet).Equals(state.defaultCPUSet) {
t.Errorf("StaticPolicy error (%v). expected final state for defaultCPUSet %v but got %v",
testCase.description, testCase.stDefaultCPUSet.Difference(cumCSet), state.defaultCPUSet)
}
}
}
Expand Down
57 changes: 39 additions & 18 deletions pkg/kubelet/cm/cpumanager/policy_static.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ type staticPolicy struct {
reserved cpuset.CPUSet
// topology manager reference to get container Topology affinity
affinity topologymanager.Store
// set of CPUs to reuse across allocations in a pod
cpusToReuse map[string]cpuset.CPUSet
}

// Ensure staticPolicy implements Policy interface
Expand Down Expand Up @@ -107,9 +109,10 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv
klog.Infof("[cpumanager] reserved %d CPUs (\"%s\") not available for exclusive assignment", reserved.Size(), reserved)

return &staticPolicy{
topology: topology,
reserved: reserved,
affinity: affinity,
topology: topology,
reserved: reserved,
affinity: affinity,
cpusToReuse: make(map[string]cpuset.CPUSet),
}, nil
}

Expand Down Expand Up @@ -188,12 +191,37 @@ func (p *staticPolicy) assignableCPUs(s state.State) cpuset.CPUSet {
return s.GetDefaultCPUSet().Difference(p.reserved)
}

func (p *staticPolicy) updateCPUsToReuse(pod *v1.Pod, container *v1.Container, cset cpuset.CPUSet) {
// If pod entries to m.cpusToReuse other than the current pod exist, delete them.
for podUID := range p.cpusToReuse {
if podUID != string(pod.UID) {
delete(p.cpusToReuse, podUID)
}
}
// If no cpuset exists for cpusToReuse by this pod yet, create one.
if _, ok := p.cpusToReuse[string(pod.UID)]; !ok {
p.cpusToReuse[string(pod.UID)] = cpuset.NewCPUSet()
}
// Check if the container is an init container.
// If so, add its cpuset to the cpuset of reusable CPUs for any new allocations.
for _, initContainer := range pod.Spec.InitContainers {
if container.Name == initContainer.Name {
p.cpusToReuse[string(pod.UID)] = p.cpusToReuse[string(pod.UID)].Union(cset)
return
}
}
// Otherwise it is an app container.
// Remove its cpuset from the cpuset of reusable CPUs for any new allocations.
p.cpusToReuse[string(pod.UID)] = p.cpusToReuse[string(pod.UID)].Difference(cset)
}

func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error {
if numCPUs := p.guaranteedCPUs(pod, container); numCPUs != 0 {
klog.Infof("[cpumanager] static policy: Allocate (pod: %s, container: %s)", pod.Name, container.Name)
// container belongs in an exclusively allocated pool

if _, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
if cpuset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
p.updateCPUsToReuse(pod, container, cpuset)
klog.Infof("[cpumanager] static policy: container already present in state, skipping (pod: %s, container: %s)", pod.Name, container.Name)
return nil
}
Expand All @@ -203,23 +231,14 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai
klog.Infof("[cpumanager] Pod %v, Container %v Topology Affinity is: %v", pod.UID, container.Name, hint)

// Allocate CPUs according to the NUMA affinity contained in the hint.
cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity)
cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity, p.cpusToReuse[string(pod.UID)])
if err != nil {
klog.Errorf("[cpumanager] unable to allocate %d CPUs (pod: %s, container: %s, error: %v)", numCPUs, pod.Name, container.Name, err)
return err
}
s.SetCPUSet(string(pod.UID), container.Name, cpuset)
p.updateCPUsToReuse(pod, container, cpuset)

// Check if the container that has just been allocated resources is an init container.
// If so, release its CPUs back into the shared pool so they can be reallocated.
for _, initContainer := range pod.Spec.InitContainers {
if container.Name == initContainer.Name {
if toRelease, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
// Mutate the shared pool, adding released cpus.
s.SetDefaultCPUSet(s.GetDefaultCPUSet().Union(toRelease))
}
}
}
}
// container belongs in the shared pool (nothing to do; use default cpuset)
return nil
Expand All @@ -235,15 +254,17 @@ func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerNa
return nil
}

func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask) (cpuset.CPUSet, error) {
func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask, reusableCPUs cpuset.CPUSet) (cpuset.CPUSet, error) {
klog.Infof("[cpumanager] allocateCpus: (numCPUs: %d, socket: %v)", numCPUs, numaAffinity)

assignableCPUs := p.assignableCPUs(s).Union(reusableCPUs)

// If there are aligned CPUs in numaAffinity, attempt to take those first.
result := cpuset.NewCPUSet()
if numaAffinity != nil {
alignedCPUs := cpuset.NewCPUSet()
for _, numaNodeID := range numaAffinity.GetBits() {
alignedCPUs = alignedCPUs.Union(p.assignableCPUs(s).Intersection(p.topology.CPUDetails.CPUsInNUMANodes(numaNodeID)))
alignedCPUs = alignedCPUs.Union(assignableCPUs.Intersection(p.topology.CPUDetails.CPUsInNUMANodes(numaNodeID)))
}

numAlignedToAlloc := alignedCPUs.Size()
Expand All @@ -260,7 +281,7 @@ func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bit
}

// Get any remaining CPUs from what's leftover after attempting to grab aligned ones.
remainingCPUs, err := takeByTopology(p.topology, p.assignableCPUs(s).Difference(result), numCPUs-result.Size())
remainingCPUs, err := takeByTopology(p.topology, assignableCPUs.Difference(result), numCPUs-result.Size())
if err != nil {
return cpuset.NewCPUSet(), err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/cm/cpumanager/policy_static_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ func TestTopologyAwareAllocateCPUs(t *testing.T) {
continue
}

cset, err := policy.allocateCPUs(st, tc.numRequested, tc.socketMask)
cset, err := policy.allocateCPUs(st, tc.numRequested, tc.socketMask, cpuset.NewCPUSet())
if err != nil {
t.Errorf("StaticPolicy allocateCPUs() error (%v). expected CPUSet %v not error %v",
tc.description, tc.expCSet, err)
Expand Down

0 comments on commit 405c4ed

Please sign in to comment.