Skip to content

Commit

Permalink
Do not use context.Background() on write requests to the k8s API (ela…
Browse files Browse the repository at this point in the history
…stic#5649)

Introduces the correct context into all requests made with the k8s client-go library that do not use the cache. So the rule is: Create, Update, Delete need a correct context. List, Get do not because the cached client dismisses the context argument anyway. I added one where it was in scope but did not do additional refactoring to introduce one where there was not context in scope. The main motivation for the distinction between reads and writes is to limit scope of the refactoring somewhat.
  • Loading branch information
pebrc authored and fantapsody committed Jan 3, 2023
1 parent 652bd54 commit 35b6e4a
Show file tree
Hide file tree
Showing 134 changed files with 703 additions and 538 deletions.
3 changes: 2 additions & 1 deletion cmd/licensing-info/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package main

import (
"context"
"encoding/json"
"flag"
"fmt"
Expand Down Expand Up @@ -40,7 +41,7 @@ func main() {
var operatorNamespace string
flag.StringVar(&operatorNamespace, "operator-namespace", "elastic-system", "indicates the namespace where the operator is deployed")
flag.Parse()
licensingInfo, err := license.NewResourceReporter(newK8sClient(), operatorNamespace).Get()
licensingInfo, err := license.NewResourceReporter(newK8sClient(), operatorNamespace, nil).Get(context.Background())
if err != nil {
log.Fatal(err, "Failed to get licensing info")
}
Expand Down
103 changes: 61 additions & 42 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ func startOperator(ctx context.Context) error {
}

if viper.GetBool(operator.EnableWebhookFlag) {
setupWebhook(mgr, params.CertRotation, params.ValidateStorageClass, clientset, exposedNodeLabels, managedNamespaces)
setupWebhook(ctx, mgr, params.CertRotation, params.ValidateStorageClass, clientset, exposedNodeLabels, managedNamespaces, tracer)
}

enforceRbacOnRefs := viper.GetBool(operator.EnforceRBACOnRefsFlag)
Expand All @@ -613,7 +613,7 @@ func startOperator(ctx context.Context) error {

disableTelemetry := viper.GetBool(operator.DisableTelemetryFlag)
telemetryInterval := viper.GetDuration(operator.TelemetryIntervalFlag)
go asyncTasks(mgr, cfg, managedNamespaces, operatorNamespace, operatorInfo, disableTelemetry, telemetryInterval)
go asyncTasks(ctx, mgr, cfg, managedNamespaces, operatorNamespace, operatorInfo, disableTelemetry, telemetryInterval, tracer)

log.Info("Starting the manager", "uuid", operatorInfo.OperatorUUID,
"namespace", operatorNamespace, "version", operatorInfo.BuildInfo.Version,
Expand Down Expand Up @@ -663,41 +663,49 @@ func readOptionalCA(caDir string) (*certificates.CA, error) {

// asyncTasks schedules some tasks to be started when this instance of the operator is elected
func asyncTasks(
ctx context.Context,
mgr manager.Manager,
cfg *rest.Config,
managedNamespaces []string,
operatorNamespace string,
operatorInfo about.OperatorInfo,
disableTelemetry bool,
telemetryInterval time.Duration,
tracer *apm.Tracer,
) {
<-mgr.Elected() // wait for this operator instance to be elected

// Report this instance as elected through Prometheus
metrics.Leader.WithLabelValues(string(operatorInfo.OperatorUUID), operatorNamespace).Set(1)

time.Sleep(10 * time.Second) // wait some arbitrary time for the manager to start
mgr.GetCache().WaitForCacheSync(context.Background()) // wait until k8s client cache is initialized
time.Sleep(10 * time.Second) // wait some arbitrary time for the manager to start
mgr.GetCache().WaitForCacheSync(ctx) // wait until k8s client cache is initialized

// Start the resource reporter
go func() {
r := licensing.NewResourceReporter(mgr.GetClient(), operatorNamespace)
r.Start(licensing.ResourceReporterFrequency)
r := licensing.NewResourceReporter(mgr.GetClient(), operatorNamespace, tracer)
r.Start(ctx, licensing.ResourceReporterFrequency)
}()

if !disableTelemetry {
// Start the telemetry reporter
go func() {
tr := telemetry.NewReporter(operatorInfo, mgr.GetClient(), operatorNamespace, managedNamespaces, telemetryInterval)
tr.Start()
tr := telemetry.NewReporter(operatorInfo, mgr.GetClient(), operatorNamespace, managedNamespaces, telemetryInterval, tracer)
tr.Start(ctx)
}()
}

// Garbage collect orphaned secrets leftover from deleted resources while the operator was not running
// - association user secrets
garbageCollectUsers(cfg, managedNamespaces)
gcCtx := tracing.NewContextTransaction(ctx, tracer, "garbage-collection", "on_operator_start", nil)
err := garbageCollectUsers(gcCtx, cfg, managedNamespaces)
if err != nil {
log.Error(err, "exiting due to unrecoverable error")
os.Exit(1)
}
// - soft-owned secrets
garbageCollectSoftOwnedSecrets(mgr.GetClient())
garbageCollectSoftOwnedSecrets(gcCtx, mgr.GetClient())
tracing.EndContextTransaction(gcCtx)
}

func chooseAndValidateIPFamily(ipFamilyStr string, ipFamilyDefault corev1.IPFamily) (corev1.IPFamily, error) {
Expand Down Expand Up @@ -826,11 +834,13 @@ func validateCertExpirationFlags(validityFlag string, rotateBeforeFlag string) (
return certValidity, certRotateBefore, nil
}

func garbageCollectUsers(cfg *rest.Config, managedNamespaces []string) {
func garbageCollectUsers(ctx context.Context, cfg *rest.Config, managedNamespaces []string) error {
span, ctx := apm.StartSpan(ctx, "gc_users", tracing.SpanTypeApp)
defer span.End()

ugc, err := association.NewUsersGarbageCollector(cfg, managedNamespaces)
if err != nil {
log.Error(err, "user garbage collector creation failed")
os.Exit(1)
return fmt.Errorf("user garbage collector creation failed: %w", err)
}
err = ugc.
For(&apmv1.ApmServerList{}, associationctl.ApmAssociationLabelNamespace, associationctl.ApmAssociationLabelName).
Expand All @@ -839,15 +849,18 @@ func garbageCollectUsers(cfg *rest.Config, managedNamespaces []string) {
For(&beatv1beta1.BeatList{}, associationctl.BeatAssociationLabelNamespace, associationctl.BeatAssociationLabelName).
For(&agentv1alpha1.AgentList{}, associationctl.AgentAssociationLabelNamespace, associationctl.AgentAssociationLabelName).
For(&emsv1alpha1.ElasticMapsServerList{}, associationctl.MapsESAssociationLabelNamespace, associationctl.MapsESAssociationLabelName).
DoGarbageCollection()
DoGarbageCollection(ctx)
if err != nil {
log.Error(err, "user garbage collector failed")
os.Exit(1)
return fmt.Errorf("user garbage collector failed: %w", err)
}
return nil
}

func garbageCollectSoftOwnedSecrets(k8sClient k8s.Client) {
if err := reconciler.GarbageCollectAllSoftOwnedOrphanSecrets(k8sClient, map[string]client.Object{
func garbageCollectSoftOwnedSecrets(ctx context.Context, k8sClient k8s.Client) {
span, ctx := apm.StartSpan(ctx, "gc_soft_owned_secrets", tracing.SpanTypeApp)
defer span.End()

if err := reconciler.GarbageCollectAllSoftOwnedOrphanSecrets(ctx, k8sClient, map[string]client.Object{
esv1.Kind: &esv1.Elasticsearch{},
apmv1.Kind: &apmv1.ApmServer{},
kbv1.Kind: &kbv1.Kibana{},
Expand All @@ -863,40 +876,20 @@ func garbageCollectSoftOwnedSecrets(k8sClient k8s.Client) {
}

func setupWebhook(
ctx context.Context,
mgr manager.Manager,
certRotation certificates.RotationParams,
validateStorageClass bool,
clientset kubernetes.Interface,
exposedNodeLabels esvalidation.NodeLabels,
managedNamespaces []string) {
managedNamespaces []string,
tracer *apm.Tracer) {
manageWebhookCerts := viper.GetBool(operator.ManageWebhookCertsFlag)
if manageWebhookCerts {
log.Info("Automatic management of the webhook certificates enabled")
// Ensure that all the certificates needed by the webhook server are already created
webhookParams := webhook.Params{
Name: viper.GetString(operator.WebhookNameFlag),
Namespace: viper.GetString(operator.OperatorNamespaceFlag),
SecretName: viper.GetString(operator.WebhookSecretFlag),
Rotation: certRotation,
}

// retrieve the current webhook configuration interface
wh, err := webhookParams.NewAdmissionControllerInterface(context.Background(), clientset)
if err != nil {
log.Error(err, "unable to setup the webhook certificates")
os.Exit(1)
}

// Force a first reconciliation to create the resources before the server is started
if err := webhookParams.ReconcileResources(context.Background(), clientset, wh); err != nil {
if err := reconcileWebhookCertsAndAddController(ctx, mgr, certRotation, clientset, tracer); err != nil {
log.Error(err, "unable to setup the webhook certificates")
os.Exit(1)
}

if err := webhook.Add(mgr, webhookParams, clientset, wh); err != nil {
log.Error(err, "unable to create controller", "controller", webhook.ControllerName)
os.Exit(1)
}
}

// setup webhooks for supported types
Expand Down Expand Up @@ -954,3 +947,29 @@ func setupWebhook(
os.Exit(1)
}
}

func reconcileWebhookCertsAndAddController(ctx context.Context, mgr manager.Manager, certRotation certificates.RotationParams, clientset kubernetes.Interface, tracer *apm.Tracer) error {
ctx = tracing.NewContextTransaction(ctx, tracer, "webhook", "reconcile", nil)
defer tracing.EndContextTransaction(ctx)
log.Info("Automatic management of the webhook certificates enabled")
// Ensure that all the certificates needed by the webhook server are already created
webhookParams := webhook.Params{
Name: viper.GetString(operator.WebhookNameFlag),
Namespace: viper.GetString(operator.OperatorNamespaceFlag),
SecretName: viper.GetString(operator.WebhookSecretFlag),
Rotation: certRotation,
}

// retrieve the current webhook configuration interface
wh, err := webhookParams.NewAdmissionControllerInterface(ctx, clientset)
if err != nil {
return err
}

// Force a first reconciliation to create the resources before the server is started
if err := webhookParams.ReconcileResources(ctx, clientset, wh); err != nil {
return err
}

return webhook.Add(mgr, webhookParams, clientset, wh, tracer)
}
2 changes: 1 addition & 1 deletion cmd/manager/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func Test_garbageCollectSoftOwnedSecrets(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := k8s.NewFakeClient(tt.runtimeObjs...)
garbageCollectSoftOwnedSecrets(c)
garbageCollectSoftOwnedSecrets(context.Background(), c)
tt.assert(c, t)
})
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func reconcileConfig(params Params, configHash hash.Hash) *reconciler.Results {
},
}

if _, err = reconciler.ReconcileSecret(params.Client, expected, &params.Agent); err != nil {
if _, err = reconciler.ReconcileSecret(params.Context, params.Client, expected, &params.Agent); err != nil {
return results.WithError(err)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/agent/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (r *ReconcileAgent) Reconcile(ctx context.Context, request reconcile.Reques

results, status := r.doReconcile(ctx, *agent)

if err := updateStatus(*agent, r.Client, status); err != nil {
if err := updateStatus(ctx, *agent, r.Client, status); err != nil {
if apierrors.IsConflict(err) {
return results.WithResult(reconcile.Result{Requeue: true}).Aggregate()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/agent/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func applyEnvVars(params Params, builder *defaults.PodTemplateBuilder) (*default
if err := cleanupEnvVarsSecret(params); err != nil {
return nil, err
}
} else if _, err := reconciler.ReconcileSecret(params.Client, envVarsSecret, &params.Agent); err != nil {
} else if _, err := reconciler.ReconcileSecret(params.Context, params.Client, envVarsSecret, &params.Agent); err != nil {
return nil, err
}

Expand Down
14 changes: 8 additions & 6 deletions pkg/controller/agent/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func reconcilePodVehicle(params Params, podTemplate corev1.PodTemplateSpec) (*re
}

ready, desired, err := reconciliationFunc(ReconciliationParams{
ctx: params.Context,
client: params.Client,
agent: params.Agent,
podTemplate: podTemplate,
Expand All @@ -68,11 +69,11 @@ func reconcilePodVehicle(params Params, podTemplate corev1.PodTemplateSpec) (*re
}

// clean up the other one
if err := params.Client.Get(context.Background(), types.NamespacedName{
if err := params.Client.Get(params.Context, types.NamespacedName{
Namespace: params.Agent.Namespace,
Name: name,
}, toDelete); err == nil {
results.WithError(params.Client.Delete(context.Background(), toDelete))
results.WithError(params.Client.Delete(params.Context, toDelete))
} else if !apierrors.IsNotFound(err) {
results.WithError(err)
}
Expand All @@ -99,7 +100,7 @@ func reconcileDeployment(rp ReconciliationParams) (int32, int32, error) {
return 0, 0, err
}

reconciled, err := deployment.Reconcile(rp.client, d, &rp.agent)
reconciled, err := deployment.Reconcile(rp.ctx, rp.client, d, &rp.agent)
if err != nil {
return 0, 0, err
}
Expand All @@ -121,7 +122,7 @@ func reconcileDaemonSet(rp ReconciliationParams) (int32, int32, error) {
return 0, 0, err
}

reconciled, err := daemonset.Reconcile(rp.client, ds, &rp.agent)
reconciled, err := daemonset.Reconcile(rp.ctx, rp.client, ds, &rp.agent)
if err != nil {
return 0, 0, err
}
Expand All @@ -131,6 +132,7 @@ func reconcileDaemonSet(rp ReconciliationParams) (int32, int32, error) {

// ReconciliationParams are the parameters used during an Elastic Agent's reconciliation.
type ReconciliationParams struct {
ctx context.Context
client k8s.Client
agent agentv1alpha1.Agent
podTemplate corev1.PodTemplateSpec
Expand Down Expand Up @@ -159,10 +161,10 @@ func calculateStatus(params *Params, ready, desired int32) (agentv1alpha1.AgentS
}

// updateStatus will update the Elastic Agent's status within the k8s cluster, using the given Elastic Agent and status.
func updateStatus(agent agentv1alpha1.Agent, client client.Client, status agentv1alpha1.AgentStatus) error {
func updateStatus(ctx context.Context, agent agentv1alpha1.Agent, client client.Client, status agentv1alpha1.AgentStatus) error {
if reflect.DeepEqual(agent.Status, status) {
return nil
}
agent.Status = status
return common.UpdateStatus(client, &agent)
return common.UpdateStatus(ctx, client, &agent)
}
5 changes: 3 additions & 2 deletions pkg/controller/apmserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package apmserver

import (
"context"
"fmt"
"path"
"path/filepath"
Expand Down Expand Up @@ -41,7 +42,7 @@ func certificatesDir(associationType commonv1.AssociationType) string {

// reconcileApmServerConfig reconciles the configuration of the APM server: it first creates the configuration from the APM
// specification and then reconcile the underlying secret.
func reconcileApmServerConfig(client k8s.Client, as *apmv1.ApmServer) (corev1.Secret, error) {
func reconcileApmServerConfig(ctx context.Context, client k8s.Client, as *apmv1.ApmServer) (corev1.Secret, error) {
// Create a new configuration from the APM object spec.
cfg, err := newConfigFromSpec(client, as)
if err != nil {
Expand All @@ -64,7 +65,7 @@ func reconcileApmServerConfig(client k8s.Client, as *apmv1.ApmServer) (corev1.Se
ApmCfgSecretKey: cfgBytes,
},
}
return reconciler.ReconcileSecret(client, expectedConfigSecret, as)
return reconciler.ReconcileSecret(ctx, client, expectedConfigSecret, as)
}

func newConfigFromSpec(c k8s.Client, as *apmv1.ApmServer) (*settings.CanonicalConfig, error) {
Expand Down
Loading

0 comments on commit 35b6e4a

Please sign in to comment.