diff --git a/cmd/fluxctl/portforward.go b/cmd/fluxctl/portforward.go index 3e8cef2cd..5b418c7d5 100644 --- a/cmd/fluxctl/portforward.go +++ b/cmd/fluxctl/portforward.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "strings" @@ -14,11 +15,11 @@ import ( // Attempt to create PortForwards to fluxes that match the label selectors until a Flux // is found or an error is returned. -func tryPortforwards(context string, ns string, selectors ...metav1.LabelSelector) (p *portforward.PortForward, err error) { +func tryPortforwards(kubeConfigContext string, ns string, selectors ...metav1.LabelSelector) (p *portforward.PortForward, err error) { message := fmt.Sprintf("No pod found in namespace %q using the following selectors:", ns) for _, selector := range selectors { - p, err = tryPortforward(context, ns, selector) + p, err = tryPortforward(context.TODO(), kubeConfigContext, ns, selector) if err == nil { return } @@ -39,7 +40,7 @@ func tryPortforwards(context string, ns string, selectors ...metav1.LabelSelecto } // Attempt to create a portforward in the namespace for the provided LabelSelector -func tryPortforward(kubeConfigContext string, ns string, selector metav1.LabelSelector) (*portforward.PortForward, error) { +func tryPortforward(ctx context.Context, kubeConfigContext string, ns string, selector metav1.LabelSelector) (*portforward.PortForward, error) { portforwarder := &portforward.PortForward{ Namespace: ns, Labels: selector, @@ -66,7 +67,7 @@ func tryPortforward(kubeConfigContext string, ns string, selector metav1.LabelSe return portforwarder, errors.Wrap(err, "Could not create kubernetes client") } - err = portforwarder.Start() + err = portforwarder.Start(context.TODO()) if err != nil { return portforwarder, err } diff --git a/pkg/cluster/kubernetes/cached_disco.go b/pkg/cluster/kubernetes/cached_disco.go index 0a1a1af84..50b2f6f66 100644 --- a/pkg/cluster/kubernetes/cached_disco.go +++ b/pkg/cluster/kubernetes/cached_disco.go @@ -1,6 +1,7 @@ package kubernetes import ( + "context" "fmt" "sync" "time" @@ -67,10 +68,10 @@ func MakeCachedDiscovery(d discovery.DiscoveryInterface, c crd.Interface, shutdo crdClient := c.ApiextensionsV1().CustomResourceDefinitions() lw := &toolscache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return crdClient.List(options) + return crdClient.List(context.TODO(), options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return crdClient.Watch(options) + return crdClient.Watch(context.TODO(), options) }, } handle := toolscache.ResourceEventHandlerFuncs{ diff --git a/pkg/cluster/kubernetes/cached_disco_test.go b/pkg/cluster/kubernetes/cached_disco_test.go index c7ca92250..80f0bded2 100644 --- a/pkg/cluster/kubernetes/cached_disco_test.go +++ b/pkg/cluster/kubernetes/cached_disco_test.go @@ -1,6 +1,7 @@ package kubernetes import ( + "context" "testing" "time" @@ -69,7 +70,7 @@ func TestCachedDiscovery(t *testing.T) { coreClient.Fake.Resources = append(apiResources, updatedAPI) // Provoke the cached discovery client into invalidating - _, err = crdClient.ApiextensionsV1().CustomResourceDefinitions().Update(myCRD) + _, err = crdClient.ApiextensionsV1().CustomResourceDefinitions().Update(context.TODO(), myCRD, metav1.UpdateOptions{}) if err != nil { t.Fatal(err) } diff --git a/pkg/cluster/kubernetes/images.go b/pkg/cluster/kubernetes/images.go index 4dc2f3c42..390a81a2c 100644 --- a/pkg/cluster/kubernetes/images.go +++ b/pkg/cluster/kubernetes/images.go @@ -55,7 +55,7 @@ func mergeCredentials(log func(...interface{}) error, saName = "default" } - sa, err := client.CoreV1().ServiceAccounts(namespace).Get(saName, meta_v1.GetOptions{}) + sa, err := client.CoreV1().ServiceAccounts(namespace).Get(context.TODO(), saName, meta_v1.GetOptions{}) if err == nil { for _, ips := range sa.ImagePullSecrets { imagePullSecrets = append(imagePullSecrets, ips.Name) @@ -73,7 +73,7 @@ func mergeCredentials(log func(...interface{}) error, continue } - secret, err := client.CoreV1().Secrets(namespace).Get(name, meta_v1.GetOptions{}) + secret, err := client.CoreV1().Secrets(namespace).Get(context.TODO(), name, meta_v1.GetOptions{}) if err != nil { log("err", errors.Wrapf(err, "getting secret %q from namespace %q", name, namespace)) imagePullSecretCache[namespacedSecretName] = registry.NoCredentials() diff --git a/pkg/cluster/kubernetes/kubernetes.go b/pkg/cluster/kubernetes/kubernetes.go index 3a99ffb8a..67b8d04b0 100644 --- a/pkg/cluster/kubernetes/kubernetes.go +++ b/pkg/cluster/kubernetes/kubernetes.go @@ -248,7 +248,7 @@ func (c *Cluster) Export(ctx context.Context) ([]byte, error) { defer encoder.Close() for _, ns := range namespaces { - namespace, err := c.client.CoreV1().Namespaces().Get(ns, meta_v1.GetOptions{}) + namespace, err := c.client.CoreV1().Namespaces().Get(ctx, ns, meta_v1.GetOptions{}) if err != nil { return nil, err } @@ -311,7 +311,7 @@ func (c *Cluster) getAllowedAndExistingNamespaces(ctx context.Context) ([]string if err := ctx.Err(); err != nil { return nil, err } - ns, err := c.client.CoreV1().Namespaces().Get(name, meta_v1.GetOptions{}) + ns, err := c.client.CoreV1().Namespaces().Get(ctx, name, meta_v1.GetOptions{}) switch { case err == nil: c.updateLoggedAllowedNS(name, false) // reset, so if the namespace goes away we'll log it again diff --git a/pkg/cluster/kubernetes/resourcekinds.go b/pkg/cluster/kubernetes/resourcekinds.go index ce4670720..59b2cd953 100644 --- a/pkg/cluster/kubernetes/resourcekinds.go +++ b/pkg/cluster/kubernetes/resourcekinds.go @@ -116,7 +116,7 @@ func (dk *deploymentKind) getWorkload(ctx context.Context, c *Cluster, namespace if err := ctx.Err(); err != nil { return workload{}, err } - deployment, err := c.client.AppsV1().Deployments(namespace).Get(name, meta_v1.GetOptions{}) + deployment, err := c.client.AppsV1().Deployments(namespace).Get(context.TODO(), name, meta_v1.GetOptions{}) if err != nil { return workload{}, err } @@ -128,7 +128,7 @@ func (dk *deploymentKind) getWorkloads(ctx context.Context, c *Cluster, namespac if err := ctx.Err(); err != nil { return nil, err } - deployments, err := c.client.AppsV1().Deployments(namespace).List(meta_v1.ListOptions{}) + deployments, err := c.client.AppsV1().Deployments(namespace).List(context.TODO(), meta_v1.ListOptions{}) if err != nil { return nil, err } @@ -199,7 +199,7 @@ func (dk *daemonSetKind) getWorkload(ctx context.Context, c *Cluster, namespace, if err := ctx.Err(); err != nil { return workload{}, err } - daemonSet, err := c.client.AppsV1().DaemonSets(namespace).Get(name, meta_v1.GetOptions{}) + daemonSet, err := c.client.AppsV1().DaemonSets(namespace).Get(context.TODO(), name, meta_v1.GetOptions{}) if err != nil { return workload{}, err } @@ -211,7 +211,7 @@ func (dk *daemonSetKind) getWorkloads(ctx context.Context, c *Cluster, namespace if err := ctx.Err(); err != nil { return nil, err } - daemonSets, err := c.client.AppsV1().DaemonSets(namespace).List(meta_v1.ListOptions{}) + daemonSets, err := c.client.AppsV1().DaemonSets(namespace).List(context.TODO(), meta_v1.ListOptions{}) if err != nil { return nil, err } @@ -266,7 +266,7 @@ func (dk *statefulSetKind) getWorkload(ctx context.Context, c *Cluster, namespac if err := ctx.Err(); err != nil { return workload{}, err } - statefulSet, err := c.client.AppsV1().StatefulSets(namespace).Get(name, meta_v1.GetOptions{}) + statefulSet, err := c.client.AppsV1().StatefulSets(namespace).Get(context.TODO(), name, meta_v1.GetOptions{}) if err != nil { return workload{}, err } @@ -278,7 +278,7 @@ func (dk *statefulSetKind) getWorkloads(ctx context.Context, c *Cluster, namespa if err := ctx.Err(); err != nil { return nil, err } - statefulSets, err := c.client.AppsV1().StatefulSets(namespace).List(meta_v1.ListOptions{}) + statefulSets, err := c.client.AppsV1().StatefulSets(namespace).List(context.TODO(), meta_v1.ListOptions{}) if err != nil { return nil, err } @@ -365,7 +365,7 @@ func (dk *cronJobKind) getWorkload(ctx context.Context, c *Cluster, namespace, n if err := ctx.Err(); err != nil { return workload{}, err } - cronJob, err := c.client.BatchV1beta1().CronJobs(namespace).Get(name, meta_v1.GetOptions{}) + cronJob, err := c.client.BatchV1beta1().CronJobs(namespace).Get(context.TODO(), name, meta_v1.GetOptions{}) if err != nil { return workload{}, err } @@ -377,7 +377,7 @@ func (dk *cronJobKind) getWorkloads(ctx context.Context, c *Cluster, namespace s if err := ctx.Err(); err != nil { return nil, err } - cronJobs, err := c.client.BatchV1beta1().CronJobs(namespace).List(meta_v1.ListOptions{}) + cronJobs, err := c.client.BatchV1beta1().CronJobs(namespace).List(context.TODO(), meta_v1.ListOptions{}) if err != nil { return nil, err } @@ -437,7 +437,7 @@ func (hr *helmReleaseKind) getWorkloads(ctx context.Context, c *Cluster, namespa } func makeHelmReleaseStableWorkload(helmRelease *hr_v1.HelmRelease) workload { - containers := createK8sHRContainers(helmRelease.ObjectMeta.Annotations, helmRelease.Spec.Values.Data) + containers := createK8sHRContainers(helmRelease.ObjectMeta.Annotations, helmRelease.GetValues()) podTemplate := apiv1.PodTemplateSpec{ ObjectMeta: helmRelease.ObjectMeta, diff --git a/pkg/cluster/kubernetes/sshkeyring.go b/pkg/cluster/kubernetes/sshkeyring.go index f85603d82..253136e9a 100644 --- a/pkg/cluster/kubernetes/sshkeyring.go +++ b/pkg/cluster/kubernetes/sshkeyring.go @@ -1,6 +1,7 @@ package kubernetes import ( + "context" "encoding/base64" "encoding/json" "os" @@ -8,8 +9,9 @@ import ( "sync" "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/kubernetes/typed/core/v1" + v1 "k8s.io/client-go/kubernetes/typed/core/v1" "github.com/fluxcd/flux/pkg/ssh" ) @@ -129,7 +131,7 @@ func (skr *sshKeyRing) Regenerate() error { return err } - _, err = skr.SecretAPI.Patch(skr.SecretName, types.StrategicMergePatchType, jsonPatch) + _, err = skr.SecretAPI.Patch(context.TODO(), skr.SecretName, types.StrategicMergePatchType, jsonPatch, metav1.PatchOptions{}) if err != nil { return err } diff --git a/pkg/cluster/kubernetes/sync.go b/pkg/cluster/kubernetes/sync.go index e7203ef9f..155e9ec55 100644 --- a/pkg/cluster/kubernetes/sync.go +++ b/pkg/cluster/kubernetes/sync.go @@ -8,13 +8,14 @@ import ( "encoding/base64" "encoding/hex" "fmt" - "github.com/ryanuber/go-glob" "io" "os/exec" "sort" "strings" "time" + "github.com/ryanuber/go-glob" + "github.com/go-kit/kit/log" "github.com/imdario/mergo" "github.com/pkg/errors" @@ -336,7 +337,7 @@ func (c *Cluster) listAllowedResources( if !namespaced { // The resource is not namespaced, everything is allowed resourceClient := c.client.dynamicClient.Resource(gvr) - data, err := resourceClient.List(options) + data, err := resourceClient.List(context.TODO(), options) if err != nil { return nil, err } @@ -350,7 +351,7 @@ func (c *Cluster) listAllowedResources( } var result []unstructured.Unstructured for _, ns := range namespaces { - data, err := c.client.dynamicClient.Resource(gvr).Namespace(ns).List(options) + data, err := c.client.dynamicClient.Resource(gvr).Namespace(ns).List(context.TODO(), options) if err != nil { return result, err } diff --git a/pkg/cluster/kubernetes/sync_test.go b/pkg/cluster/kubernetes/sync_test.go index 494615423..a034a4df6 100644 --- a/pkg/cluster/kubernetes/sync_test.go +++ b/pkg/cluster/kubernetes/sync_test.go @@ -1,6 +1,7 @@ package kubernetes import ( + "context" "fmt" "os" "sort" @@ -37,6 +38,10 @@ const ( func fakeClients() (ExtendedClient, func()) { scheme := runtime.NewScheme() + listMapping := map[schema.GroupVersionResource]string{ + {Group: "", Version: "v1", Resource: "namespaces"}: "List", + {Group: "apps", Version: "v1", Resource: "deployments"}: "List", + } // Set this to `true` to output a trace of the API actions called // while running the tests @@ -63,7 +68,7 @@ func fakeClients() (ExtendedClient, func()) { coreClient := corefake.NewSimpleClientset(&corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: defaultTestNamespace}}) hrClient := helmopfake.NewSimpleClientset() - dynamicClient := dynamicfake.NewSimpleDynamicClient(scheme) + dynamicClient := dynamicfake.NewSimpleDynamicClientWithCustomListKinds(scheme, listMapping) crdClient := crdfake.NewSimpleClientset() shutdown := make(chan struct{}) discoveryClient := MakeCachedDiscovery(coreClient.Discovery(), crdClient, shutdown) @@ -153,12 +158,12 @@ func (a fakeApplier) apply(_ log.Logger, cs changeSet, errored map[resource.ID]e name := res.GetName() if cmd == "apply" { - _, err := dc.Get(name, metav1.GetOptions{}) + _, err := dc.Get(context.TODO(), name, metav1.GetOptions{}) switch { case errors.IsNotFound(err): - _, err = dc.Create(res, metav1.CreateOptions{}) + _, err = dc.Create(context.TODO(), res, metav1.CreateOptions{}) case err == nil: - _, err = dc.Update(res, metav1.UpdateOptions{}) + _, err = dc.Update(context.TODO(), res, metav1.UpdateOptions{}) } if err != nil { errs = append(errs, cluster.ResourceError{obj.ResourceID, obj.Source, err}) @@ -172,12 +177,12 @@ func (a fakeApplier) apply(_ log.Logger, cs changeSet, errored map[resource.ID]e errs = append(errs, cluster.ResourceError{obj.ResourceID, obj.Source, err}) return } - _, err := a.coreClient.CoreV1().Namespaces().Get(ns.Name, metav1.GetOptions{}) + _, err := a.coreClient.CoreV1().Namespaces().Get(context.TODO(), ns.Name, metav1.GetOptions{}) switch { case errors.IsNotFound(err): - _, err = a.coreClient.CoreV1().Namespaces().Create(&ns) + _, err = a.coreClient.CoreV1().Namespaces().Create(context.TODO(), &ns, metav1.CreateOptions{}) case err == nil: - _, err = a.coreClient.CoreV1().Namespaces().Update(&ns) + _, err = a.coreClient.CoreV1().Namespaces().Update(context.TODO(), &ns, metav1.UpdateOptions{}) } if err != nil { errs = append(errs, cluster.ResourceError{obj.ResourceID, obj.Source, err}) @@ -186,14 +191,14 @@ func (a fakeApplier) apply(_ log.Logger, cs changeSet, errored map[resource.ID]e } } else if cmd == "delete" { - if err := dc.Delete(name, &metav1.DeleteOptions{}); err != nil { + if err := dc.Delete(context.TODO(), name, metav1.DeleteOptions{}); err != nil { errs = append(errs, cluster.ResourceError{obj.ResourceID, obj.Source, err}) return } if res.GetKind() == "Namespace" { // We also create namespaces in the core fake client since the dynamic client // and core clients don't share resources - if err := a.coreClient.CoreV1().Namespaces().Delete(res.GetName(), &metav1.DeleteOptions{}); err != nil { + if err := a.coreClient.CoreV1().Namespaces().Delete(context.TODO(), res.GetName(), metav1.DeleteOptions{}); err != nil { errs = append(errs, cluster.ResourceError{obj.ResourceID, obj.Source, err}) return } @@ -564,12 +569,12 @@ metadata: Resource: "deployments", } client := kube.client.dynamicClient.Resource(gvr).Namespace(depNS) - depActual, err := client.Get(depName, metav1.GetOptions{}) + depActual, err := client.Get(context.TODO(), depName, metav1.GetOptions{}) assert.NoError(t, err) depCopy := depActual.DeepCopy() depCopyName := depName + "copy" depCopy.SetName(depCopyName) - depCopyActual, err := client.Create(depCopy, metav1.CreateOptions{}) + depCopyActual, err := client.Create(context.TODO(), depCopy, metav1.CreateOptions{}) assert.NoError(t, err) // Check that both dep and its copy have the same GCmark label @@ -581,9 +586,9 @@ metadata: test(t, kube, "", "", false) // Check that defs1 is removed from the cluster but its copy isn't, due to having a different name - _, err = client.Get(depName, metav1.GetOptions{}) + _, err = client.Get(context.TODO(), depName, metav1.GetOptions{}) assert.Error(t, err) - _, err = client.Get(depCopyName, metav1.GetOptions{}) + _, err = client.Get(context.TODO(), depCopyName, metav1.GetOptions{}) assert.NoError(t, err) }) @@ -674,14 +679,14 @@ spec: Version: "v1", Resource: "deployments", }) - res, err := rc.Namespace("foobar").Get("dep1", metav1.GetOptions{}) + res, err := rc.Namespace("foobar").Get(context.TODO(), "dep1", metav1.GetOptions{}) if err != nil { t.Fatal(err) } annots := res.GetAnnotations() annots["flux.weave.works/ignore"] = "true" res.SetAnnotations(annots) - if _, err = rc.Namespace("foobar").Update(res, metav1.UpdateOptions{}); err != nil { + if _, err = rc.Namespace("foobar").Update(context.TODO(), res, metav1.UpdateOptions{}); err != nil { t.Fatal(err) } @@ -753,14 +758,14 @@ metadata: Version: "v1", Resource: "deployments", }) - res, err := rc.Namespace("foobar").Get("dep1", metav1.GetOptions{}) + res, err := rc.Namespace("foobar").Get(context.TODO(), "dep1", metav1.GetOptions{}) if err != nil { t.Fatal(err) } annots := res.GetAnnotations() annots["flux.weave.works/ignore"] = "true" res.SetAnnotations(annots) - if _, err = rc.Namespace("foobar").Update(res, metav1.UpdateOptions{}); err != nil { + if _, err = rc.Namespace("foobar").Update(context.TODO(), res, metav1.UpdateOptions{}); err != nil { t.Fatal(err) } @@ -792,10 +797,10 @@ spec: err = yaml.Unmarshal([]byte(ns1), &ns1obj) assert.NoError(t, err) // Put the pre-existing resource in the cluster - _, err = kube.client.coreClient.CoreV1().Namespaces().Create(&ns1obj) + _, err = kube.client.coreClient.CoreV1().Namespaces().Create(context.TODO(), &ns1obj, metav1.CreateOptions{}) assert.NoError(t, err) dc := kube.client.dynamicClient.Resource(gvr).Namespace(dep1res.GetNamespace()) - _, err = dc.Create(dep1res, metav1.CreateOptions{}) + _, err = dc.Create(context.TODO(), dep1res, metav1.CreateOptions{}) assert.NoError(t, err) // Check that our resource-getting also sees the pre-existing resource @@ -808,7 +813,7 @@ spec: test(t, kube, "", "", false) // .. but, our resource is still there. - r, err := dc.Get(dep1res.GetName(), metav1.GetOptions{}) + r, err := dc.Get(context.TODO(), dep1res.GetName(), metav1.GetOptions{}) assert.NoError(t, err) assert.NotNil(t, r) @@ -827,7 +832,7 @@ spec: test(t, kube, update, "", false) // Check that it still exists, as created - r, err = dc.Get(dep1res.GetName(), metav1.GetOptions{}) + r, err = dc.Get(context.TODO(), dep1res.GetName(), metav1.GetOptions{}) assert.NoError(t, err) assert.NotNil(t, r) checkSame(t, []byte(existing), r) diff --git a/pkg/portforward/portforward.go b/pkg/portforward/portforward.go index c752aad41..cf2586bcd 100644 --- a/pkg/portforward/portforward.go +++ b/pkg/portforward/portforward.go @@ -4,6 +4,7 @@ package portforward // licensed under the Apache License 2.0 import ( + "context" "fmt" "io/ioutil" "net" @@ -71,7 +72,7 @@ func NewPortForwarder(namespace string, labels metav1.LabelSelector, port int) ( } // Start a port forward to a pod - blocks until the tunnel is ready for use. -func (p *PortForward) Start() error { +func (p *PortForward) Start(ctx context.Context) error { p.stopChan = make(chan struct{}, 1) readyChan := make(chan struct{}, 1) errChan := make(chan error, 1) @@ -81,7 +82,7 @@ func (p *PortForward) Start() error { return errors.Wrap(err, "Could not find a port to bind to") } - dialer, err := p.dialer() + dialer, err := p.dialer(ctx) if err != nil { return errors.Wrap(err, "Could not create a dialer") } @@ -146,8 +147,8 @@ func (p *PortForward) getFreePort() (int, error) { } // Create an httpstream.Dialer for use with portforward.New -func (p *PortForward) dialer() (httpstream.Dialer, error) { - pod, err := p.getPodName() +func (p *PortForward) dialer(ctx context.Context) (httpstream.Dialer, error) { + pod, err := p.getPodName(ctx) if err != nil { return nil, errors.Wrap(err, "Could not get pod name") } @@ -169,10 +170,10 @@ func (p *PortForward) dialer() (httpstream.Dialer, error) { // Gets the pod name to port forward to, if Name is set, Name is returned. Otherwise, // it will call findPodByLabels(). -func (p *PortForward) getPodName() (string, error) { +func (p *PortForward) getPodName(ctx context.Context) (string, error) { var err error if p.Name == "" { - p.Name, err = p.findPodByLabels() + p.Name, err = p.findPodByLabels(ctx) } return p.Name, err } @@ -180,12 +181,12 @@ func (p *PortForward) getPodName() (string, error) { // Find the name of a pod by label, returns an error if the label returns // more or less than one pod. // It searches for the labels specified by labels. -func (p *PortForward) findPodByLabels() (string, error) { +func (p *PortForward) findPodByLabels(ctx context.Context) (string, error) { if len(p.Labels.MatchLabels) == 0 && len(p.Labels.MatchExpressions) == 0 { return "", errors.New("No pod labels specified") } - pods, err := p.Clientset.CoreV1().Pods(p.Namespace).List(metav1.ListOptions{ + pods, err := p.Clientset.CoreV1().Pods(p.Namespace).List(ctx, metav1.ListOptions{ LabelSelector: metav1.FormatLabelSelector(&p.Labels), FieldSelector: fields.OneTermEqualSelector("status.phase", string(v1.PodRunning)).String(), }) diff --git a/pkg/portforward/portforward_test.go b/pkg/portforward/portforward_test.go index 3399fb118..cf98a21f0 100644 --- a/pkg/portforward/portforward_test.go +++ b/pkg/portforward/portforward_test.go @@ -4,6 +4,7 @@ package portforward // licensed under the Apache License 2.0 import ( + "context" "testing" "github.com/stretchr/testify/assert" @@ -42,7 +43,7 @@ func TestFindPodByLabels(t *testing.T) { }, } - pod, err := pf.findPodByLabels() + pod, err := pf.findPodByLabels(context.TODO()) assert.Nil(t, err) assert.Equal(t, "mypod2", pod) } @@ -60,7 +61,7 @@ func TestFindPodByLabelsNoneExist(t *testing.T) { }, } - _, err := pf.findPodByLabels() + _, err := pf.findPodByLabels(context.TODO()) assert.NotNil(t, err) assert.Equal(t, "Could not find running pod for selector: labels \"name=flux\"", err.Error()) } @@ -82,7 +83,7 @@ func TestFindPodByLabelsMultiple(t *testing.T) { }, } - _, err := pf.findPodByLabels() + _, err := pf.findPodByLabels(context.TODO()) assert.NotNil(t, err) assert.Equal(t, "Ambiguous pod: found more than one pod for selector: labels \"name=flux\"", err.Error()) } @@ -108,7 +109,7 @@ func TestFindPodByLabelsExpression(t *testing.T) { }, } - pod, err := pf.findPodByLabels() + pod, err := pf.findPodByLabels(context.TODO()) assert.Nil(t, err) assert.Equal(t, "mypod2", pod) } @@ -134,7 +135,7 @@ func TestFindPodByLabelsExpressionNotFound(t *testing.T) { }, } - _, err := pf.findPodByLabels() + _, err := pf.findPodByLabels(context.TODO()) assert.NotNil(t, err) assert.Equal(t, "Could not find running pod for selector: labels \"name in (flux,fluxd)\"", err.Error()) } @@ -144,7 +145,7 @@ func TestGetPodNameNameSet(t *testing.T) { Name: "hello", } - pod, err := pf.getPodName() + pod, err := pf.getPodName(context.TODO()) assert.Nil(t, err) assert.Equal(t, "hello", pod) } @@ -162,7 +163,7 @@ func TestGetPodNameNoNameSet(t *testing.T) { }, } - pod, err := pf.getPodName() + pod, err := pf.getPodName(context.TODO()) assert.Nil(t, err) assert.Equal(t, "mypod", pod) assert.Equal(t, pf.Name, pod) diff --git a/pkg/sync/secret.go b/pkg/sync/secret.go index c7461d43a..1589f18ab 100644 --- a/pkg/sync/secret.go +++ b/pkg/sync/secret.go @@ -7,7 +7,7 @@ import ( meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" kubernetes "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/typed/core/v1" + v1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" ) @@ -46,37 +46,39 @@ func (p NativeSyncProvider) String() string { // GetRevision gets the revision of the current sync marker (representing the place flux has synced to). func (p NativeSyncProvider) GetRevision(ctx context.Context) (string, error) { - resource, err := p.resourceAPI.Get(p.resourceName, meta_v1.GetOptions{}) + resource, err := p.resourceAPI.Get(ctx, p.resourceName, meta_v1.GetOptions{}) if err != nil { return "", err } revision, exists := resource.Annotations[syncMarkerKey] if !exists { - return "", p.setRevision("") + return "", p.setRevision(ctx, "") } return revision, nil } // UpdateMarker updates the revision the sync marker points to. func (p NativeSyncProvider) UpdateMarker(ctx context.Context, revision string) error { - return p.setRevision(revision) + return p.setRevision(ctx, revision) } // DeleteMarker resets the state of the object. func (p NativeSyncProvider) DeleteMarker(ctx context.Context) error { - return p.setRevision("") + return p.setRevision(ctx, "") } -func (p NativeSyncProvider) setRevision(revision string) error { +func (p NativeSyncProvider) setRevision(ctx context.Context, revision string) error { jsonPatch, err := json.Marshal(patch(revision)) if err != nil { return err } _, err = p.resourceAPI.Patch( + ctx, p.resourceName, types.StrategicMergePatchType, jsonPatch, + meta_v1.PatchOptions{}, ) return err }