Skip to content

Commit

Permalink
UPSTREAM: 98019: specify the container CPU set during the creation
Browse files Browse the repository at this point in the history
cpu manager: specify the container CPU set during the creation

(cherry picked from commit 6fae185)
Signed-off-by: Artyom Lukianov <alukiano@redhat.com>
  • Loading branch information
k8s-ci-robot authored and soltysh committed Sep 8, 2021
1 parent 339c9c7 commit 3a5a7ab
Show file tree
Hide file tree
Showing 13 changed files with 158 additions and 64 deletions.
4 changes: 4 additions & 0 deletions pkg/kubelet/cm/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ go_library(
"helpers_linux.go",
"helpers_unsupported.go",
"internal_container_lifecycle.go",
"internal_container_lifecycle_linux.go",
"internal_container_lifecycle_unsupported.go",
"internal_container_lifecycle_windows.go",
"node_container_manager_linux.go",
"pod_container_manager_linux.go",
"pod_container_manager_stub.go",
Expand Down Expand Up @@ -42,6 +45,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/cri-api/pkg/apis:go_default_library",
"//staging/src/k8s.io/cri-api/pkg/apis/runtime/v1alpha2:go_default_library",
"//staging/src/k8s.io/kubelet/pkg/apis/podresources/v1:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
] + select({
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/cm/container_manager_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -1029,7 +1029,7 @@ func (cm *containerManagerImpl) GetDevices(podUID, containerName string) []*podr
}

func (cm *containerManagerImpl) GetCPUs(podUID, containerName string) []int64 {
return cm.cpuManager.GetCPUs(podUID, containerName)
return cm.cpuManager.GetCPUs(podUID, containerName).ToSliceNoSortInt64()
}

func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool {
Expand Down
43 changes: 9 additions & 34 deletions pkg/kubelet/cm/cpumanager/cpu_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,9 @@ type Manager interface {
// e.g. at pod admission time.
Allocate(pod *v1.Pod, container *v1.Container) error

// AddContainer is called between container create and container start
// so that initial CPU affinity settings can be written through to the
// container runtime before the first process begins to execute.
AddContainer(p *v1.Pod, c *v1.Container, containerID string) error
// AddContainer adds the mapping between container ID to pod UID and the container name
// The mapping used to remove the CPU allocation during the container removal
AddContainer(p *v1.Pod, c *v1.Container, containerID string)

// RemoveContainer is called after Kubelet decides to kill or delete a
// container. After this call, the CPU manager stops trying to reconcile
Expand All @@ -80,7 +79,7 @@ type Manager interface {

// GetCPUs implements the podresources.CPUsProvider interface to provide allocated
// cpus for the container
GetCPUs(podUID, containerName string) []int64
GetCPUs(podUID, containerName string) cpuset.CPUSet

// GetPodTopologyHints implements the topologymanager.HintProvider Interface
// and is consulted to achieve NUMA aware resource alignment per Pod
Expand Down Expand Up @@ -237,29 +236,10 @@ func (m *manager) Allocate(p *v1.Pod, c *v1.Container) error {
return nil
}

func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) error {
func (m *manager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) {
m.Lock()
// Get the CPUs assigned to the container during Allocate()
// (or fall back to the default CPUSet if none were assigned).
cpus := m.state.GetCPUSetOrDefault(string(p.UID), c.Name)
m.Unlock()

if !cpus.IsEmpty() {
err := m.updateContainerCPUSet(containerID, cpus)
if err != nil {
klog.Errorf("[cpumanager] AddContainer error: error updating CPUSet for container (pod: %s, container: %s, container id: %s, err: %v)", p.Name, c.Name, containerID, err)
m.Lock()
err := m.policyRemoveContainerByRef(string(p.UID), c.Name)
if err != nil {
klog.Errorf("[cpumanager] AddContainer rollback state error: %v", err)
}
m.Unlock()
}
return err
}

klog.V(5).Infof("[cpumanager] update container resources is skipped due to cpu set is empty")
return nil
defer m.Unlock()
m.containerMap.Add(string(pod.UID), container.Name, containerID)
}

func (m *manager) RemoveContainer(containerID string) error {
Expand Down Expand Up @@ -481,11 +461,6 @@ func (m *manager) updateContainerCPUSet(containerID string, cpus cpuset.CPUSet)
})
}

func (m *manager) GetCPUs(podUID, containerName string) []int64 {
cpus := m.state.GetCPUSetOrDefault(string(podUID), containerName)
result := []int64{}
for _, cpu := range cpus.ToSliceNoSort() {
result = append(result, int64(cpu))
}
return result
func (m *manager) GetCPUs(podUID, containerName string) cpuset.CPUSet {
return m.state.GetCPUSetOrDefault(podUID, containerName)
}
26 changes: 7 additions & 19 deletions pkg/kubelet/cm/cpumanager/cpu_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,14 +252,6 @@ func TestCPUManagerAdd(t *testing.T) {
expAllocateErr: fmt.Errorf("fake reg error"),
expAddContainerErr: nil,
},
{
description: "cpu manager add - container update error",
updateErr: fmt.Errorf("fake update error"),
policy: testPolicy,
expCPUSet: cpuset.NewCPUSet(1, 2, 3, 4),
expAllocateErr: nil,
expAddContainerErr: fmt.Errorf("fake update error"),
},
}

for _, testCase := range testCases {
Expand Down Expand Up @@ -287,7 +279,8 @@ func TestCPUManagerAdd(t *testing.T) {
testCase.description, testCase.expAllocateErr, err)
}

err = mgr.AddContainer(pod, container, "fakeID")
mgr.AddContainer(pod, container, "fakeID")
_, _, err = mgr.containerMap.GetContainerRef("fakeID")
if !reflect.DeepEqual(err, testCase.expAddContainerErr) {
t.Errorf("CPU Manager AddContainer() error (%v). expected error: %v but got: %v",
testCase.description, testCase.expAddContainerErr, err)
Expand Down Expand Up @@ -520,7 +513,9 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) {
t.Errorf("StaticPolicy Allocate() error (%v). unexpected error for container id: %v: %v",
testCase.description, containerIDs[i], err)
}
err = mgr.AddContainer(testCase.pod, &containers[i], containerIDs[i])

mgr.AddContainer(testCase.pod, &containers[i], containerIDs[i])
_, _, err = mgr.containerMap.GetContainerRef(containerIDs[i])
if err != nil {
t.Errorf("StaticPolicy AddContainer() error (%v). unexpected error for container id: %v: %v",
testCase.description, containerIDs[i], err)
Expand Down Expand Up @@ -1018,14 +1013,6 @@ func TestCPUManagerAddWithResvList(t *testing.T) {
expAllocateErr: nil,
expAddContainerErr: nil,
},
{
description: "cpu manager add - container update error",
updateErr: fmt.Errorf("fake update error"),
policy: testPolicy,
expCPUSet: cpuset.NewCPUSet(0, 1, 2, 3),
expAllocateErr: nil,
expAddContainerErr: fmt.Errorf("fake update error"),
},
}

for _, testCase := range testCases {
Expand Down Expand Up @@ -1053,7 +1040,8 @@ func TestCPUManagerAddWithResvList(t *testing.T) {
testCase.description, testCase.expAllocateErr, err)
}

err = mgr.AddContainer(pod, container, "fakeID")
mgr.AddContainer(pod, container, "fakeID")
_, _, err = mgr.containerMap.GetContainerRef("fakeID")
if !reflect.DeepEqual(err, testCase.expAddContainerErr) {
t.Errorf("CPU Manager AddContainer() error (%v). expected error: %v but got: %v",
testCase.description, testCase.expAddContainerErr, err)
Expand Down
8 changes: 4 additions & 4 deletions pkg/kubelet/cm/cpumanager/fake_cpu_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/status"
Expand All @@ -45,9 +46,8 @@ func (m *fakeManager) Allocate(pod *v1.Pod, container *v1.Container) error {
return nil
}

func (m *fakeManager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) error {
func (m *fakeManager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) {
klog.Infof("[fake cpumanager] AddContainer (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID)
return nil
}

func (m *fakeManager) RemoveContainer(containerID string) error {
Expand All @@ -69,9 +69,9 @@ func (m *fakeManager) State() state.Reader {
return m.state
}

func (m *fakeManager) GetCPUs(podUID, containerName string) []int64 {
func (m *fakeManager) GetCPUs(podUID, containerName string) cpuset.CPUSet {
klog.Infof("[fake cpumanager] GetCPUs(podUID: %s, containerName: %s)", podUID, containerName)
return nil
return cpuset.CPUSet{}
}

// NewFakeManager creates empty/fake cpu manager
Expand Down
24 changes: 23 additions & 1 deletion pkg/kubelet/cm/cpuset/cpuset.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ package cpuset
import (
"bytes"
"fmt"
"k8s.io/klog/v2"
"reflect"
"sort"
"strconv"
"strings"

"k8s.io/klog/v2"
)

// Builder is a mutable builder for CPUSet. Functions that mutate instances
Expand Down Expand Up @@ -198,6 +199,27 @@ func (s CPUSet) ToSliceNoSort() []int {
return result
}

// ToSliceInt64 returns an ordered slice of int64 that contains all elements from
// this set
func (s CPUSet) ToSliceInt64() []int64 {
var result []int64
for cpu := range s.elems {
result = append(result, int64(cpu))
}
sort.Slice(result, func(i, j int) bool { return result[i] < result[j] })
return result
}

// ToSliceNoSortInt64 returns a slice of int64 that contains all elements from
// this set.
func (s CPUSet) ToSliceNoSortInt64() []int64 {
var result []int64
for cpu := range s.elems {
result = append(result, int64(cpu))
}
return result
}

// String returns a new string representation of the elements in this CPU set
// in canonical linux CPU list format.
//
Expand Down
5 changes: 5 additions & 0 deletions pkg/kubelet/cm/fake_internal_container_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cm

import (
"k8s.io/api/core/v1"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
)

func NewFakeInternalContainerLifecycle() *fakeInternalContainerLifecycle {
Expand All @@ -26,6 +27,10 @@ func NewFakeInternalContainerLifecycle() *fakeInternalContainerLifecycle {

type fakeInternalContainerLifecycle struct{}

func (f *fakeInternalContainerLifecycle) PreCreateContainer(pod *v1.Pod, container *v1.Container, containerConfig *runtimeapi.ContainerConfig) error {
return nil
}

func (f *fakeInternalContainerLifecycle) PreStartContainer(pod *v1.Pod, container *v1.Container, containerID string) error {
return nil
}
Expand Down
9 changes: 4 additions & 5 deletions pkg/kubelet/cm/internal_container_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ package cm

import (
"k8s.io/api/core/v1"

utilfeature "k8s.io/apiserver/pkg/util/feature"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
)

type InternalContainerLifecycle interface {
PreCreateContainer(pod *v1.Pod, container *v1.Container, containerConfig *runtimeapi.ContainerConfig) error
PreStartContainer(pod *v1.Pod, container *v1.Container, containerID string) error
PreStopContainer(containerID string) error
PostStopContainer(containerID string) error
Expand All @@ -39,11 +40,9 @@ type internalContainerLifecycleImpl struct {

func (i *internalContainerLifecycleImpl) PreStartContainer(pod *v1.Pod, container *v1.Container, containerID string) error {
if i.cpuManager != nil {
err := i.cpuManager.AddContainer(pod, container, containerID)
if err != nil {
return err
}
i.cpuManager.AddContainer(pod, container, containerID)
}

if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) {
i.topologyManager.AddContainer(pod, container, containerID)
}
Expand Down
35 changes: 35 additions & 0 deletions pkg/kubelet/cm/internal_container_lifecycle_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// +build linux

/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cm

import (
"k8s.io/api/core/v1"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
)

func (i *internalContainerLifecycleImpl) PreCreateContainer(pod *v1.Pod, container *v1.Container, containerConfig *runtimeapi.ContainerConfig) error {
if i.cpuManager != nil {
allocatedCPUs := i.cpuManager.GetCPUs(string(pod.UID), container.Name)
if !allocatedCPUs.IsEmpty() {
containerConfig.Linux.Resources.CpusetCpus = allocatedCPUs.String()
}
}

return nil
}
28 changes: 28 additions & 0 deletions pkg/kubelet/cm/internal_container_lifecycle_unsupported.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// +build !linux,!windows

/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cm

import (
"k8s.io/api/core/v1"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
)

func (i *internalContainerLifecycleImpl) PreCreateContainer(pod *v1.Pod, container *v1.Container, containerConfig *runtimeapi.ContainerConfig) error {
return nil
}
28 changes: 28 additions & 0 deletions pkg/kubelet/cm/internal_container_lifecycle_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// +build windows

/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cm

import (
"k8s.io/api/core/v1"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
)

func (i *internalContainerLifecycleImpl) PreCreateContainer(pod *v1.Pod, container *v1.Container, containerConfig *runtimeapi.ContainerConfig) error {
return nil
}
1 change: 1 addition & 0 deletions pkg/kubelet/dockershim/helpers_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func (ds *dockerService) updateCreateConfig(
CPUShares: rOpts.CpuShares,
CPUQuota: rOpts.CpuQuota,
CPUPeriod: rOpts.CpuPeriod,
CpusetCpus: rOpts.CpusetCpus,
}
createConfig.HostConfig.OomScoreAdj = int(rOpts.OomScoreAdj)
}
Expand Down
Loading

0 comments on commit 3a5a7ab

Please sign in to comment.