From 405c4ed52f39ed117cff34ff3b3872dd129120f2 Mon Sep 17 00:00:00 2001 From: Kubernetes Prow Robot Date: Mon, 27 Apr 2020 12:09:23 -0700 Subject: [PATCH] Merge pull request #90419 from klueska/upstream-update-reusable-cpus-strategy Update strategy used to reuse CPUs from init containers in CPUManager --- pkg/kubelet/cm/cpumanager/cpu_manager_test.go | 9 +++ pkg/kubelet/cm/cpumanager/policy_static.go | 57 +++++++++++++------ .../cm/cpumanager/policy_static_test.go | 2 +- 3 files changed, 49 insertions(+), 19 deletions(-) diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go index e9c7852c602e1..eb5ae3213fc23 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go @@ -503,6 +503,8 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) { testCase.expInitCSets, testCase.expCSets...) + cumCSet := cpuset.NewCPUSet() + for i := range containers { err := mgr.Allocate(testCase.pod, &containers[i]) if err != nil { @@ -525,6 +527,13 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) { t.Errorf("StaticPolicy AddContainer() error (%v). expected cpuset %v for container %v but got %v", testCase.description, expCSets[i], containers[i].Name, cset) } + + cumCSet = cumCSet.Union(cset) + } + + if !testCase.stDefaultCPUSet.Difference(cumCSet).Equals(state.defaultCPUSet) { + t.Errorf("StaticPolicy error (%v). expected final state for defaultCPUSet %v but got %v", + testCase.description, testCase.stDefaultCPUSet.Difference(cumCSet), state.defaultCPUSet) } } } diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index da68ed808bdae..02b88e9b4c00b 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -77,6 +77,8 @@ type staticPolicy struct { reserved cpuset.CPUSet // topology manager reference to get container Topology affinity affinity topologymanager.Store + // set of CPUs to reuse across allocations in a pod + cpusToReuse map[string]cpuset.CPUSet } // Ensure staticPolicy implements Policy interface @@ -107,9 +109,10 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv klog.Infof("[cpumanager] reserved %d CPUs (\"%s\") not available for exclusive assignment", reserved.Size(), reserved) return &staticPolicy{ - topology: topology, - reserved: reserved, - affinity: affinity, + topology: topology, + reserved: reserved, + affinity: affinity, + cpusToReuse: make(map[string]cpuset.CPUSet), }, nil } @@ -188,12 +191,37 @@ func (p *staticPolicy) assignableCPUs(s state.State) cpuset.CPUSet { return s.GetDefaultCPUSet().Difference(p.reserved) } +func (p *staticPolicy) updateCPUsToReuse(pod *v1.Pod, container *v1.Container, cset cpuset.CPUSet) { + // If pod entries to m.cpusToReuse other than the current pod exist, delete them. + for podUID := range p.cpusToReuse { + if podUID != string(pod.UID) { + delete(p.cpusToReuse, podUID) + } + } + // If no cpuset exists for cpusToReuse by this pod yet, create one. + if _, ok := p.cpusToReuse[string(pod.UID)]; !ok { + p.cpusToReuse[string(pod.UID)] = cpuset.NewCPUSet() + } + // Check if the container is an init container. + // If so, add its cpuset to the cpuset of reusable CPUs for any new allocations. + for _, initContainer := range pod.Spec.InitContainers { + if container.Name == initContainer.Name { + p.cpusToReuse[string(pod.UID)] = p.cpusToReuse[string(pod.UID)].Union(cset) + return + } + } + // Otherwise it is an app container. + // Remove its cpuset from the cpuset of reusable CPUs for any new allocations. + p.cpusToReuse[string(pod.UID)] = p.cpusToReuse[string(pod.UID)].Difference(cset) +} + func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error { if numCPUs := p.guaranteedCPUs(pod, container); numCPUs != 0 { klog.Infof("[cpumanager] static policy: Allocate (pod: %s, container: %s)", pod.Name, container.Name) // container belongs in an exclusively allocated pool - if _, ok := s.GetCPUSet(string(pod.UID), container.Name); ok { + if cpuset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok { + p.updateCPUsToReuse(pod, container, cpuset) klog.Infof("[cpumanager] static policy: container already present in state, skipping (pod: %s, container: %s)", pod.Name, container.Name) return nil } @@ -203,23 +231,14 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai klog.Infof("[cpumanager] Pod %v, Container %v Topology Affinity is: %v", pod.UID, container.Name, hint) // Allocate CPUs according to the NUMA affinity contained in the hint. - cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity) + cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity, p.cpusToReuse[string(pod.UID)]) if err != nil { klog.Errorf("[cpumanager] unable to allocate %d CPUs (pod: %s, container: %s, error: %v)", numCPUs, pod.Name, container.Name, err) return err } s.SetCPUSet(string(pod.UID), container.Name, cpuset) + p.updateCPUsToReuse(pod, container, cpuset) - // Check if the container that has just been allocated resources is an init container. - // If so, release its CPUs back into the shared pool so they can be reallocated. - for _, initContainer := range pod.Spec.InitContainers { - if container.Name == initContainer.Name { - if toRelease, ok := s.GetCPUSet(string(pod.UID), container.Name); ok { - // Mutate the shared pool, adding released cpus. - s.SetDefaultCPUSet(s.GetDefaultCPUSet().Union(toRelease)) - } - } - } } // container belongs in the shared pool (nothing to do; use default cpuset) return nil @@ -235,15 +254,17 @@ func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerNa return nil } -func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask) (cpuset.CPUSet, error) { +func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask, reusableCPUs cpuset.CPUSet) (cpuset.CPUSet, error) { klog.Infof("[cpumanager] allocateCpus: (numCPUs: %d, socket: %v)", numCPUs, numaAffinity) + assignableCPUs := p.assignableCPUs(s).Union(reusableCPUs) + // If there are aligned CPUs in numaAffinity, attempt to take those first. result := cpuset.NewCPUSet() if numaAffinity != nil { alignedCPUs := cpuset.NewCPUSet() for _, numaNodeID := range numaAffinity.GetBits() { - alignedCPUs = alignedCPUs.Union(p.assignableCPUs(s).Intersection(p.topology.CPUDetails.CPUsInNUMANodes(numaNodeID))) + alignedCPUs = alignedCPUs.Union(assignableCPUs.Intersection(p.topology.CPUDetails.CPUsInNUMANodes(numaNodeID))) } numAlignedToAlloc := alignedCPUs.Size() @@ -260,7 +281,7 @@ func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bit } // Get any remaining CPUs from what's leftover after attempting to grab aligned ones. - remainingCPUs, err := takeByTopology(p.topology, p.assignableCPUs(s).Difference(result), numCPUs-result.Size()) + remainingCPUs, err := takeByTopology(p.topology, assignableCPUs.Difference(result), numCPUs-result.Size()) if err != nil { return cpuset.NewCPUSet(), err } diff --git a/pkg/kubelet/cm/cpumanager/policy_static_test.go b/pkg/kubelet/cm/cpumanager/policy_static_test.go index ea2bcf11333ba..b4b46c68c17b3 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_static_test.go @@ -639,7 +639,7 @@ func TestTopologyAwareAllocateCPUs(t *testing.T) { continue } - cset, err := policy.allocateCPUs(st, tc.numRequested, tc.socketMask) + cset, err := policy.allocateCPUs(st, tc.numRequested, tc.socketMask, cpuset.NewCPUSet()) if err != nil { t.Errorf("StaticPolicy allocateCPUs() error (%v). expected CPUSet %v not error %v", tc.description, tc.expCSet, err)