Skip to content

Commit

Permalink
Trace by opentelemetry
Browse files Browse the repository at this point in the history
Signed-off-by: zouyu <zouy.fnst@cn.fujitsu.com>
  • Loading branch information
zouy414 committed Oct 22, 2020
1 parent 901911d commit 7d98ac5
Show file tree
Hide file tree
Showing 10 changed files with 202 additions and 46 deletions.
30 changes: 16 additions & 14 deletions pkg/controller/controller_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
_ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/apis/core/validation"
hashutil "k8s.io/kubernetes/pkg/util/hash"
"k8s.io/kubernetes/pkg/util/httptrace"
taintutils "k8s.io/kubernetes/pkg/util/taints"
"k8s.io/utils/integer"

Expand Down Expand Up @@ -448,12 +449,12 @@ func (r RealControllerRevisionControl) PatchControllerRevision(namespace, name s
// created as an interface to allow testing.
type PodControlInterface interface {
// CreatePods creates new pods according to the spec.
CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object) error
CreatePods(ctx context.Context, namespace string, template *v1.PodTemplateSpec, object runtime.Object) error
// CreatePodsOnNode creates a new pod according to the spec on the specified node,
// and sets the ControllerRef.
CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error
CreatePodsOnNode(ctx context.Context, nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error
// CreatePodsWithControllerRef creates new pods according to the spec, and sets object as the pod's controller.
CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error
CreatePodsWithControllerRef(ctx context.Context, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error
// DeletePod deletes the pod identified by podID.
DeletePod(namespace string, podID string, object runtime.Object) error
// PatchPod patches the pod.
Expand Down Expand Up @@ -518,22 +519,22 @@ func validateControllerRef(controllerRef *metav1.OwnerReference) error {
return nil
}

func (r RealPodControl) CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object) error {
return r.createPods("", namespace, template, object, nil)
func (r RealPodControl) CreatePods(ctx context.Context, namespace string, template *v1.PodTemplateSpec, object runtime.Object) error {
return r.createPods(ctx, "", namespace, template, object, nil)
}

func (r RealPodControl) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error {
func (r RealPodControl) CreatePodsWithControllerRef(ctx context.Context, namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error {
if err := validateControllerRef(controllerRef); err != nil {
return err
}
return r.createPods("", namespace, template, controllerObject, controllerRef)
return r.createPods(ctx, "", namespace, template, controllerObject, controllerRef)
}

func (r RealPodControl) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
func (r RealPodControl) CreatePodsOnNode(ctx context.Context, nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
if err := validateControllerRef(controllerRef); err != nil {
return err
}
return r.createPods(nodeName, namespace, template, object, controllerRef)
return r.createPods(ctx, nodeName, namespace, template, object, controllerRef)
}

func (r RealPodControl) PatchPod(namespace, name string, data []byte) error {
Expand Down Expand Up @@ -566,7 +567,7 @@ func GetPodFromTemplate(template *v1.PodTemplateSpec, parentObject runtime.Objec
return pod, nil
}

func (r RealPodControl) createPods(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
func (r RealPodControl) createPods(ctx context.Context, nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
pod, err := GetPodFromTemplate(template, object, controllerRef)
if err != nil {
return err
Expand All @@ -577,7 +578,8 @@ func (r RealPodControl) createPods(nodeName, namespace string, template *v1.PodT
if len(labels.Set(pod.Labels)) == 0 {
return fmt.Errorf("unable to create pods, no labels")
}
newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
httptrace.SpanContextToAnnotations(ctx, &pod.Annotations)
newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{})
if err != nil {
// only send an event if the namespace isn't terminating
if !apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
Expand Down Expand Up @@ -638,7 +640,7 @@ func (f *FakePodControl) PatchPod(namespace, name string, data []byte) error {
return nil
}

func (f *FakePodControl) CreatePods(namespace string, spec *v1.PodTemplateSpec, object runtime.Object) error {
func (f *FakePodControl) CreatePods(ctx context.Context, namespace string, spec *v1.PodTemplateSpec, object runtime.Object) error {
f.Lock()
defer f.Unlock()
f.CreateCallCount++
Expand All @@ -652,7 +654,7 @@ func (f *FakePodControl) CreatePods(namespace string, spec *v1.PodTemplateSpec,
return nil
}

func (f *FakePodControl) CreatePodsWithControllerRef(namespace string, spec *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
func (f *FakePodControl) CreatePodsWithControllerRef(ctx context.Context, namespace string, spec *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
f.Lock()
defer f.Unlock()
f.CreateCallCount++
Expand All @@ -667,7 +669,7 @@ func (f *FakePodControl) CreatePodsWithControllerRef(namespace string, spec *v1.
return nil
}

func (f *FakePodControl) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
func (f *FakePodControl) CreatePodsOnNode(ctx context.Context, nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
f.Lock()
defer f.Unlock()
f.CreateCallCount++
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/controller_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func TestCreatePods(t *testing.T) {
controllerSpec := newReplicationController(1)

// Make sure createReplica sends a POST to the apiserver with a pod from the controllers pod template
err := podControl.CreatePods(ns, controllerSpec.Spec.Template, controllerSpec)
err := podControl.CreatePods(context.Background(), ns, controllerSpec.Spec.Template, controllerSpec)
assert.NoError(t, err, "unexpected error: %v", err)

expectedPod := v1.Pod{
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/daemon/daemon_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,7 @@ func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nod
podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity(
podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix])

err := dsc.podControl.CreatePodsWithControllerRef(ds.Namespace, podTemplate,
err := dsc.podControl.CreatePodsWithControllerRef(context.Background(), ds.Namespace, podTemplate,
ds, metav1.NewControllerRef(ds, controllerKind))

if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/daemon/daemon_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,10 @@ func newFakePodControl() *fakePodControl {
}
}

func (f *fakePodControl) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
func (f *fakePodControl) CreatePodsOnNode(ctx context.Context, nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
f.Lock()
defer f.Unlock()
if err := f.FakePodControl.CreatePodsOnNode(nodeName, namespace, template, object, controllerRef); err != nil {
if err := f.FakePodControl.CreatePodsOnNode(context.Background(), nodeName, namespace, template, object, controllerRef); err != nil {
return fmt.Errorf("failed to create pod on node %q", nodeName)
}

Expand Down Expand Up @@ -267,10 +267,10 @@ func (f *fakePodControl) CreatePodsOnNode(nodeName, namespace string, template *
return nil
}

func (f *fakePodControl) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
func (f *fakePodControl) CreatePodsWithControllerRef(ctx context.Context, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
f.Lock()
defer f.Unlock()
if err := f.FakePodControl.CreatePodsWithControllerRef(namespace, template, object, controllerRef); err != nil {
if err := f.FakePodControl.CreatePodsWithControllerRef(context.Background(), namespace, template, object, controllerRef); err != nil {
return fmt.Errorf("failed to create pod for DaemonSet")
}

Expand Down
38 changes: 27 additions & 11 deletions pkg/controller/deployment/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ import (
"strconv"

apps "k8s.io/api/apps/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller"
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
"k8s.io/kubernetes/pkg/util/httptrace"
labelsutil "k8s.io/kubernetes/pkg/util/labels"
)

Expand Down Expand Up @@ -72,6 +73,9 @@ func (dc *DeploymentController) sync(d *apps.Deployment, rsList []*apps.ReplicaS
// These conditions are needed so that we won't accidentally report lack of progress for resumed deployments
// that were paused for longer than progressDeadlineSeconds.
func (dc *DeploymentController) checkPausedConditions(d *apps.Deployment) error {
// Get span from annotations and set to ctx
ctx := httptrace.SpanContextFromAnnotations(context.Background(), d.GetAnnotations())

if !deploymentutil.HasProgressDeadline(d) {
return nil
}
Expand All @@ -98,7 +102,7 @@ func (dc *DeploymentController) checkPausedConditions(d *apps.Deployment) error
}

var err error
_, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
_, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
return err
}

Expand Down Expand Up @@ -136,6 +140,9 @@ const (
// 3. If there's no existing new RS and createIfNotExisted is true, create one with appropriate revision number (maxOldRevision + 1) and replicas.
// Note that the pod-template-hash will be added to adopted RSes and pods.
func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, oldRSs []*apps.ReplicaSet, createIfNotExisted bool) (*apps.ReplicaSet, error) {
// Get span from annotations and set to ctx
ctx := httptrace.SpanContextFromAnnotations(context.Background(), d.GetAnnotations())

existingNewRS := deploymentutil.FindNewReplicaSet(d, rsList)

// Calculate the max revision number among all old RSes
Expand All @@ -155,7 +162,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old
minReadySecondsNeedsUpdate := rsCopy.Spec.MinReadySeconds != d.Spec.MinReadySeconds
if annotationsUpdated || minReadySecondsNeedsUpdate {
rsCopy.Spec.MinReadySeconds = d.Spec.MinReadySeconds
return dc.client.AppsV1().ReplicaSets(rsCopy.ObjectMeta.Namespace).Update(context.TODO(), rsCopy, metav1.UpdateOptions{})
return dc.client.AppsV1().ReplicaSets(rsCopy.ObjectMeta.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{})
}

// Should use the revision in existingNewRS's annotation, since it set by before
Expand All @@ -173,7 +180,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old

if needsUpdate {
var err error
if _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{}); err != nil {
if _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -220,7 +227,8 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old
// hash collisions. If there is any other error, we need to report it in the status of
// the Deployment.
alreadyExists := false
createdRS, err := dc.client.AppsV1().ReplicaSets(d.Namespace).Create(context.TODO(), &newRS, metav1.CreateOptions{})

createdRS, err := dc.client.AppsV1().ReplicaSets(d.Namespace).Create(ctx, &newRS, metav1.CreateOptions{})
switch {
// We may end up hitting this due to a slow cache or a fast resync of the Deployment.
case errors.IsAlreadyExists(err):
Expand Down Expand Up @@ -252,7 +260,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old
*d.Status.CollisionCount++
// Update the collisionCount for the Deployment and let it requeue by returning the original
// error.
_, dErr := dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
_, dErr := dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
if dErr == nil {
klog.V(2).Infof("Found a hash collision for deployment %q - bumping collisionCount (%d->%d) to resolve it", d.Name, preCollisionCount, *d.Status.CollisionCount)
}
Expand All @@ -268,7 +276,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old
// We don't really care about this error at this point, since we have a bigger issue to report.
// TODO: Identify which errors are permanent and switch DeploymentIsFailed to take into account
// these reasons as well. Related issue: https://github.com/kubernetes/kubernetes/issues/18568
_, _ = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
_, _ = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
}
dc.eventRecorder.Eventf(d, v1.EventTypeWarning, deploymentutil.FailedRSCreateReason, msg)
return nil, err
Expand All @@ -285,7 +293,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old
needsUpdate = true
}
if needsUpdate {
_, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
_, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
}
return createdRS, err
}
Expand Down Expand Up @@ -409,6 +417,8 @@ func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *apps.ReplicaSe
}

func (dc *DeploymentController) scaleReplicaSet(rs *apps.ReplicaSet, newScale int32, deployment *apps.Deployment, scalingOperation string) (bool, *apps.ReplicaSet, error) {
// Get span from annotations and set to ctx
ctx := httptrace.SpanContextFromAnnotations(context.Background(), rs.GetAnnotations())

sizeNeedsUpdate := *(rs.Spec.Replicas) != newScale

Expand All @@ -420,7 +430,7 @@ func (dc *DeploymentController) scaleReplicaSet(rs *apps.ReplicaSet, newScale in
rsCopy := rs.DeepCopy()
*(rsCopy.Spec.Replicas) = newScale
deploymentutil.SetReplicasAnnotations(rsCopy, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(*deployment))
rs, err = dc.client.AppsV1().ReplicaSets(rsCopy.Namespace).Update(context.TODO(), rsCopy, metav1.UpdateOptions{})
rs, err = dc.client.AppsV1().ReplicaSets(rsCopy.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{})
if err == nil && sizeNeedsUpdate {
scaled = true
dc.eventRecorder.Eventf(deployment, v1.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale)
Expand All @@ -433,6 +443,9 @@ func (dc *DeploymentController) scaleReplicaSet(rs *apps.ReplicaSet, newScale in
// where N=d.Spec.RevisionHistoryLimit. Old replica sets are older versions of the podtemplate of a deployment kept
// around by default 1) for historical reasons and 2) for the ability to rollback a deployment.
func (dc *DeploymentController) cleanupDeployment(oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) error {
// Get span from annotations and set to ctx
ctx := httptrace.SpanContextFromAnnotations(context.Background(), deployment.GetAnnotations())

if !deploymentutil.HasRevisionHistoryLimit(deployment) {
return nil
}
Expand All @@ -458,7 +471,7 @@ func (dc *DeploymentController) cleanupDeployment(oldRSs []*apps.ReplicaSet, dep
continue
}
klog.V(4).Infof("Trying to cleanup replica set %q for deployment %q", rs.Name, deployment.Name)
if err := dc.client.AppsV1().ReplicaSets(rs.Namespace).Delete(context.TODO(), rs.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
if err := dc.client.AppsV1().ReplicaSets(rs.Namespace).Delete(ctx, rs.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
// Return error instead of aggregating and continuing DELETEs on the theory
// that we may be overloading the api server.
return err
Expand All @@ -470,6 +483,9 @@ func (dc *DeploymentController) cleanupDeployment(oldRSs []*apps.ReplicaSet, dep

// syncDeploymentStatus checks if the status is up-to-date and sync it if necessary
func (dc *DeploymentController) syncDeploymentStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error {
// Get span from annotations and set to ctx
ctx := httptrace.SpanContextFromAnnotations(context.Background(), d.GetAnnotations())

newStatus := calculateStatus(allRSs, newRS, d)

if reflect.DeepEqual(d.Status, newStatus) {
Expand All @@ -478,7 +494,7 @@ func (dc *DeploymentController) syncDeploymentStatus(allRSs []*apps.ReplicaSet,

newDeployment := d
newDeployment.Status = newStatus
_, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(context.TODO(), newDeployment, metav1.UpdateOptions{})
_, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(ctx, newDeployment, metav1.UpdateOptions{})
return err
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"time"

batch "k8s.io/api/batch/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -776,7 +776,7 @@ func (jm *Controller) manageJob(activePods []*v1.Pod, succeeded int32, job *batc
for i := int32(0); i < batchSize; i++ {
go func() {
defer wait.Done()
err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind))
err := jm.podControl.CreatePodsWithControllerRef(context.Background(), job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind))
if err != nil {
if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
// If the namespace is being torn down, we can safely ignore
Expand Down
Loading

0 comments on commit 7d98ac5

Please sign in to comment.