Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log enhancement(No apiserver) #2

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions pkg/controller/controller_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
_ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/apis/core/validation"
hashutil "k8s.io/kubernetes/pkg/util/hash"
"k8s.io/kubernetes/pkg/util/httptrace"
taintutils "k8s.io/kubernetes/pkg/util/taints"
"k8s.io/utils/integer"

Expand Down Expand Up @@ -448,12 +449,12 @@ func (r RealControllerRevisionControl) PatchControllerRevision(namespace, name s
// created as an interface to allow testing.
type PodControlInterface interface {
// CreatePods creates new pods according to the spec.
CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object) error
CreatePods(ctx context.Context, namespace string, template *v1.PodTemplateSpec, object runtime.Object) error
// CreatePodsOnNode creates a new pod according to the spec on the specified node,
// and sets the ControllerRef.
CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error
CreatePodsOnNode(ctx context.Context, nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error
// CreatePodsWithControllerRef creates new pods according to the spec, and sets object as the pod's controller.
CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error
CreatePodsWithControllerRef(ctx context.Context, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error
// DeletePod deletes the pod identified by podID.
DeletePod(namespace string, podID string, object runtime.Object) error
// PatchPod patches the pod.
Expand Down Expand Up @@ -518,22 +519,22 @@ func validateControllerRef(controllerRef *metav1.OwnerReference) error {
return nil
}

func (r RealPodControl) CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object) error {
return r.createPods("", namespace, template, object, nil)
func (r RealPodControl) CreatePods(ctx context.Context, namespace string, template *v1.PodTemplateSpec, object runtime.Object) error {
return r.createPods(ctx, "", namespace, template, object, nil)
}

func (r RealPodControl) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error {
func (r RealPodControl) CreatePodsWithControllerRef(ctx context.Context, namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error {
if err := validateControllerRef(controllerRef); err != nil {
return err
}
return r.createPods("", namespace, template, controllerObject, controllerRef)
return r.createPods(ctx, "", namespace, template, controllerObject, controllerRef)
}

func (r RealPodControl) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
func (r RealPodControl) CreatePodsOnNode(ctx context.Context, nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
if err := validateControllerRef(controllerRef); err != nil {
return err
}
return r.createPods(nodeName, namespace, template, object, controllerRef)
return r.createPods(ctx, nodeName, namespace, template, object, controllerRef)
}

func (r RealPodControl) PatchPod(namespace, name string, data []byte) error {
Expand Down Expand Up @@ -566,7 +567,7 @@ func GetPodFromTemplate(template *v1.PodTemplateSpec, parentObject runtime.Objec
return pod, nil
}

func (r RealPodControl) createPods(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
func (r RealPodControl) createPods(ctx context.Context, nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
pod, err := GetPodFromTemplate(template, object, controllerRef)
if err != nil {
return err
Expand All @@ -577,7 +578,8 @@ func (r RealPodControl) createPods(nodeName, namespace string, template *v1.PodT
if len(labels.Set(pod.Labels)) == 0 {
return fmt.Errorf("unable to create pods, no labels")
}
newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
httptrace.SpanContextToAnnotations(ctx, &pod.Annotations)
newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{})
if err != nil {
// only send an event if the namespace isn't terminating
if !apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
Expand Down Expand Up @@ -638,7 +640,7 @@ func (f *FakePodControl) PatchPod(namespace, name string, data []byte) error {
return nil
}

func (f *FakePodControl) CreatePods(namespace string, spec *v1.PodTemplateSpec, object runtime.Object) error {
func (f *FakePodControl) CreatePods(ctx context.Context, namespace string, spec *v1.PodTemplateSpec, object runtime.Object) error {
f.Lock()
defer f.Unlock()
f.CreateCallCount++
Expand All @@ -652,7 +654,7 @@ func (f *FakePodControl) CreatePods(namespace string, spec *v1.PodTemplateSpec,
return nil
}

func (f *FakePodControl) CreatePodsWithControllerRef(namespace string, spec *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
func (f *FakePodControl) CreatePodsWithControllerRef(ctx context.Context, namespace string, spec *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
f.Lock()
defer f.Unlock()
f.CreateCallCount++
Expand All @@ -667,7 +669,7 @@ func (f *FakePodControl) CreatePodsWithControllerRef(namespace string, spec *v1.
return nil
}

func (f *FakePodControl) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
func (f *FakePodControl) CreatePodsOnNode(ctx context.Context, nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
f.Lock()
defer f.Unlock()
f.CreateCallCount++
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/controller_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func TestCreatePods(t *testing.T) {
controllerSpec := newReplicationController(1)

// Make sure createReplica sends a POST to the apiserver with a pod from the controllers pod template
err := podControl.CreatePods(ns, controllerSpec.Spec.Template, controllerSpec)
err := podControl.CreatePods(context.Background(), ns, controllerSpec.Spec.Template, controllerSpec)
assert.NoError(t, err, "unexpected error: %v", err)

expectedPod := v1.Pod{
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/daemon/daemon_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,7 @@ func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nod
podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity(
podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix])

err := dsc.podControl.CreatePodsWithControllerRef(ds.Namespace, podTemplate,
err := dsc.podControl.CreatePodsWithControllerRef(context.Background(), ds.Namespace, podTemplate,
ds, metav1.NewControllerRef(ds, controllerKind))

if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/daemon/daemon_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,10 @@ func newFakePodControl() *fakePodControl {
}
}

func (f *fakePodControl) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
func (f *fakePodControl) CreatePodsOnNode(ctx context.Context, nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
f.Lock()
defer f.Unlock()
if err := f.FakePodControl.CreatePodsOnNode(nodeName, namespace, template, object, controllerRef); err != nil {
if err := f.FakePodControl.CreatePodsOnNode(context.Background(), nodeName, namespace, template, object, controllerRef); err != nil {
return fmt.Errorf("failed to create pod on node %q", nodeName)
}

Expand Down Expand Up @@ -267,10 +267,10 @@ func (f *fakePodControl) CreatePodsOnNode(nodeName, namespace string, template *
return nil
}

func (f *fakePodControl) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
func (f *fakePodControl) CreatePodsWithControllerRef(ctx context.Context, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
f.Lock()
defer f.Unlock()
if err := f.FakePodControl.CreatePodsWithControllerRef(namespace, template, object, controllerRef); err != nil {
if err := f.FakePodControl.CreatePodsWithControllerRef(context.Background(), namespace, template, object, controllerRef); err != nil {
return fmt.Errorf("failed to create pod for DaemonSet")
}

Expand Down
38 changes: 27 additions & 11 deletions pkg/controller/deployment/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ import (
"strconv"

apps "k8s.io/api/apps/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller"
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
"k8s.io/kubernetes/pkg/util/httptrace"
labelsutil "k8s.io/kubernetes/pkg/util/labels"
)

Expand Down Expand Up @@ -72,6 +73,9 @@ func (dc *DeploymentController) sync(d *apps.Deployment, rsList []*apps.ReplicaS
// These conditions are needed so that we won't accidentally report lack of progress for resumed deployments
// that were paused for longer than progressDeadlineSeconds.
func (dc *DeploymentController) checkPausedConditions(d *apps.Deployment) error {
// Get span from annotations and set to ctx
ctx := httptrace.SpanContextFromAnnotations(context.Background(), d.GetAnnotations())

if !deploymentutil.HasProgressDeadline(d) {
return nil
}
Expand All @@ -98,7 +102,7 @@ func (dc *DeploymentController) checkPausedConditions(d *apps.Deployment) error
}

var err error
_, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
_, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
return err
}

Expand Down Expand Up @@ -136,6 +140,9 @@ const (
// 3. If there's no existing new RS and createIfNotExisted is true, create one with appropriate revision number (maxOldRevision + 1) and replicas.
// Note that the pod-template-hash will be added to adopted RSes and pods.
func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, oldRSs []*apps.ReplicaSet, createIfNotExisted bool) (*apps.ReplicaSet, error) {
// Get span from annotations and set to ctx
ctx := httptrace.SpanContextFromAnnotations(context.Background(), d.GetAnnotations())

existingNewRS := deploymentutil.FindNewReplicaSet(d, rsList)

// Calculate the max revision number among all old RSes
Expand All @@ -155,7 +162,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old
minReadySecondsNeedsUpdate := rsCopy.Spec.MinReadySeconds != d.Spec.MinReadySeconds
if annotationsUpdated || minReadySecondsNeedsUpdate {
rsCopy.Spec.MinReadySeconds = d.Spec.MinReadySeconds
return dc.client.AppsV1().ReplicaSets(rsCopy.ObjectMeta.Namespace).Update(context.TODO(), rsCopy, metav1.UpdateOptions{})
return dc.client.AppsV1().ReplicaSets(rsCopy.ObjectMeta.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{})
}

// Should use the revision in existingNewRS's annotation, since it set by before
Expand All @@ -173,7 +180,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old

if needsUpdate {
var err error
if _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{}); err != nil {
if _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -220,7 +227,8 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old
// hash collisions. If there is any other error, we need to report it in the status of
// the Deployment.
alreadyExists := false
createdRS, err := dc.client.AppsV1().ReplicaSets(d.Namespace).Create(context.TODO(), &newRS, metav1.CreateOptions{})

createdRS, err := dc.client.AppsV1().ReplicaSets(d.Namespace).Create(ctx, &newRS, metav1.CreateOptions{})
switch {
// We may end up hitting this due to a slow cache or a fast resync of the Deployment.
case errors.IsAlreadyExists(err):
Expand Down Expand Up @@ -252,7 +260,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old
*d.Status.CollisionCount++
// Update the collisionCount for the Deployment and let it requeue by returning the original
// error.
_, dErr := dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
_, dErr := dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
if dErr == nil {
klog.V(2).Infof("Found a hash collision for deployment %q - bumping collisionCount (%d->%d) to resolve it", d.Name, preCollisionCount, *d.Status.CollisionCount)
}
Expand All @@ -268,7 +276,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old
// We don't really care about this error at this point, since we have a bigger issue to report.
// TODO: Identify which errors are permanent and switch DeploymentIsFailed to take into account
// these reasons as well. Related issue: https://github.com/kubernetes/kubernetes/issues/18568
_, _ = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
_, _ = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
}
dc.eventRecorder.Eventf(d, v1.EventTypeWarning, deploymentutil.FailedRSCreateReason, msg)
return nil, err
Expand All @@ -285,7 +293,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old
needsUpdate = true
}
if needsUpdate {
_, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
_, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
}
return createdRS, err
}
Expand Down Expand Up @@ -409,6 +417,8 @@ func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *apps.ReplicaSe
}

func (dc *DeploymentController) scaleReplicaSet(rs *apps.ReplicaSet, newScale int32, deployment *apps.Deployment, scalingOperation string) (bool, *apps.ReplicaSet, error) {
// Get span from annotations and set to ctx
ctx := httptrace.SpanContextFromAnnotations(context.Background(), rs.GetAnnotations())

sizeNeedsUpdate := *(rs.Spec.Replicas) != newScale

Expand All @@ -420,7 +430,7 @@ func (dc *DeploymentController) scaleReplicaSet(rs *apps.ReplicaSet, newScale in
rsCopy := rs.DeepCopy()
*(rsCopy.Spec.Replicas) = newScale
deploymentutil.SetReplicasAnnotations(rsCopy, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(*deployment))
rs, err = dc.client.AppsV1().ReplicaSets(rsCopy.Namespace).Update(context.TODO(), rsCopy, metav1.UpdateOptions{})
rs, err = dc.client.AppsV1().ReplicaSets(rsCopy.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{})
if err == nil && sizeNeedsUpdate {
scaled = true
dc.eventRecorder.Eventf(deployment, v1.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale)
Expand All @@ -433,6 +443,9 @@ func (dc *DeploymentController) scaleReplicaSet(rs *apps.ReplicaSet, newScale in
// where N=d.Spec.RevisionHistoryLimit. Old replica sets are older versions of the podtemplate of a deployment kept
// around by default 1) for historical reasons and 2) for the ability to rollback a deployment.
func (dc *DeploymentController) cleanupDeployment(oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) error {
// Get span from annotations and set to ctx
ctx := httptrace.SpanContextFromAnnotations(context.Background(), deployment.GetAnnotations())

if !deploymentutil.HasRevisionHistoryLimit(deployment) {
return nil
}
Expand All @@ -458,7 +471,7 @@ func (dc *DeploymentController) cleanupDeployment(oldRSs []*apps.ReplicaSet, dep
continue
}
klog.V(4).Infof("Trying to cleanup replica set %q for deployment %q", rs.Name, deployment.Name)
if err := dc.client.AppsV1().ReplicaSets(rs.Namespace).Delete(context.TODO(), rs.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
if err := dc.client.AppsV1().ReplicaSets(rs.Namespace).Delete(ctx, rs.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
// Return error instead of aggregating and continuing DELETEs on the theory
// that we may be overloading the api server.
return err
Expand All @@ -470,6 +483,9 @@ func (dc *DeploymentController) cleanupDeployment(oldRSs []*apps.ReplicaSet, dep

// syncDeploymentStatus checks if the status is up-to-date and sync it if necessary
func (dc *DeploymentController) syncDeploymentStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error {
// Get span from annotations and set to ctx
ctx := httptrace.SpanContextFromAnnotations(context.Background(), d.GetAnnotations())

newStatus := calculateStatus(allRSs, newRS, d)

if reflect.DeepEqual(d.Status, newStatus) {
Expand All @@ -478,7 +494,7 @@ func (dc *DeploymentController) syncDeploymentStatus(allRSs []*apps.ReplicaSet,

newDeployment := d
newDeployment.Status = newStatus
_, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(context.TODO(), newDeployment, metav1.UpdateOptions{})
_, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(ctx, newDeployment, metav1.UpdateOptions{})
return err
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"time"

batch "k8s.io/api/batch/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -776,7 +776,7 @@ func (jm *Controller) manageJob(activePods []*v1.Pod, succeeded int32, job *batc
for i := int32(0); i < batchSize; i++ {
go func() {
defer wait.Done()
err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind))
err := jm.podControl.CreatePodsWithControllerRef(context.Background(), job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind))
if err != nil {
if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
// If the namespace is being torn down, we can safely ignore
Expand Down
Loading