Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use dynamic client for labels #782

Merged
merged 6 commits into from
Jul 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

288 changes: 85 additions & 203 deletions pkg/skaffold/deploy/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,25 @@ package deploy
import (
"context"
"encoding/json"
"fmt"
"io"
"time"

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/build"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/constants"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubernetes"
kubectx "github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubernetes/context"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
appsv1beta1 "k8s.io/api/apps/v1beta1"
appsv1beta2 "k8s.io/api/apps/v1beta2"
corev1 "k8s.io/api/core/v1"
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
patch "k8s.io/apimachinery/pkg/util/strategicpatch"
clientgo "k8s.io/client-go/kubernetes"

"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
)

// Labeller can give key/value labels to set on deployed resources.
Expand Down Expand Up @@ -80,176 +82,21 @@ func merge(sources ...Labeller) map[string]string {
return merged
}

type objectType int

// List of API Objects supported by the Skaffold Labeler
// retry 3 times to give the object time to propagate to the API server
const (
_ = iota
corev1Pod
appsv1Deployment
appsv1Beta1Deployment
appsv1Beta2Deployment
extensionsv1Beta1Deployment
corev1Service
appv1StatefulSet
appsv1Beta1StatefulSet
appsv1Beta2StatefulSet
extensionsv1Beta1DaemonSet
appsv1ReplicaSet
appsv1Beta2ReplicaSet
tries = 3
sleeptime = 300 * time.Millisecond
)

// patcher is responsible for applying a given patch to the provided object
type patcher func(clientgo.Interface, string, string, []byte) error

// objectMeta is responsible for returning a generic runtime.Object's metadata
type objectMeta func(runtime.Object) (*metav1.ObjectMeta, bool)

var patchers = map[objectType]patcher{
corev1Pod: func(client clientgo.Interface, ns string, name string, p []byte) error {
_, err := client.CoreV1().Pods(ns).Patch(name, types.StrategicMergePatchType, p)
return err
},
appsv1Deployment: func(client clientgo.Interface, ns string, name string, p []byte) error {
_, err := client.AppsV1().Deployments(ns).Patch(name, types.StrategicMergePatchType, p)
return err
},
appsv1Beta1Deployment: func(client clientgo.Interface, ns string, name string, p []byte) error {
_, err := client.AppsV1beta1().Deployments(ns).Patch(name, types.StrategicMergePatchType, p)
return err
},
appsv1Beta2Deployment: func(client clientgo.Interface, ns string, name string, p []byte) error {
_, err := client.AppsV1beta2().Deployments(ns).Patch(name, types.StrategicMergePatchType, p)
return err
},
extensionsv1Beta1Deployment: func(client clientgo.Interface, ns string, name string, p []byte) error {
_, err := client.ExtensionsV1beta1().Deployments(ns).Patch(name, types.StrategicMergePatchType, p)
return err
},
corev1Service: func(client clientgo.Interface, ns string, name string, p []byte) error {
_, err := client.CoreV1().Services(ns).Patch(name, types.StrategicMergePatchType, p)
return err
},
appv1StatefulSet: func(client clientgo.Interface, ns string, name string, p []byte) error {
_, err := client.AppsV1().StatefulSets(ns).Patch(name, types.StrategicMergePatchType, p)
return err
},
appsv1Beta1StatefulSet: func(client clientgo.Interface, ns string, name string, p []byte) error {
_, err := client.AppsV1beta1().StatefulSets(ns).Patch(name, types.StrategicMergePatchType, p)
return err
},
appsv1Beta2StatefulSet: func(client clientgo.Interface, ns string, name string, p []byte) error {
_, err := client.AppsV1beta2().StatefulSets(ns).Patch(name, types.StrategicMergePatchType, p)
return err
},
extensionsv1Beta1DaemonSet: func(client clientgo.Interface, ns string, name string, p []byte) error {
_, err := client.ExtensionsV1beta1().DaemonSets(ns).Patch(name, types.StrategicMergePatchType, p)
return err
},
appsv1ReplicaSet: func(client clientgo.Interface, ns string, name string, p []byte) error {
_, err := client.AppsV1().ReplicaSets(ns).Patch(name, types.StrategicMergePatchType, p)
return err
},
appsv1Beta2ReplicaSet: func(client clientgo.Interface, ns string, name string, p []byte) error {
_, err := client.AppsV1beta2().ReplicaSets(ns).Patch(name, types.StrategicMergePatchType, p)
return err
},
}

var objectMetas = map[objectType]objectMeta{
corev1Pod: func(r runtime.Object) (*metav1.ObjectMeta, bool) {
obj, ok := r.(*corev1.Pod)
if !ok {
return nil, ok
}
return &obj.ObjectMeta, ok
},
appsv1Deployment: func(r runtime.Object) (*metav1.ObjectMeta, bool) {
obj, ok := r.(*appsv1.Deployment)
if !ok {
return nil, ok
}
return &obj.ObjectMeta, ok
},
appsv1Beta1Deployment: func(r runtime.Object) (*metav1.ObjectMeta, bool) {
obj, ok := r.(*appsv1beta1.Deployment)
if !ok {
return nil, ok
}
return &obj.ObjectMeta, ok
},
appsv1Beta2Deployment: func(r runtime.Object) (*metav1.ObjectMeta, bool) {
obj, ok := r.(*appsv1beta2.Deployment)
if !ok {
return nil, ok
}
return &obj.ObjectMeta, ok
},
extensionsv1Beta1Deployment: func(r runtime.Object) (*metav1.ObjectMeta, bool) {
obj, ok := r.(*extensionsv1beta1.Deployment)
if !ok {
return nil, ok
}
return &obj.ObjectMeta, ok
},
corev1Service: func(r runtime.Object) (*metav1.ObjectMeta, bool) {
obj, ok := r.(*corev1.Service)
if !ok {
return nil, ok
}
return &obj.ObjectMeta, ok
},
appv1StatefulSet: func(r runtime.Object) (*metav1.ObjectMeta, bool) {
obj, ok := r.(*appsv1.StatefulSet)
if !ok {
return nil, ok
}
return &obj.ObjectMeta, ok
},
appsv1Beta1StatefulSet: func(r runtime.Object) (*metav1.ObjectMeta, bool) {
obj, ok := r.(*appsv1beta1.StatefulSet)
if !ok {
return nil, ok
}
return &obj.ObjectMeta, ok
},
appsv1Beta2StatefulSet: func(r runtime.Object) (*metav1.ObjectMeta, bool) {
obj, ok := r.(*appsv1beta1.StatefulSet)
if !ok {
return nil, ok
}
return &obj.ObjectMeta, ok
},
extensionsv1Beta1DaemonSet: func(r runtime.Object) (*metav1.ObjectMeta, bool) {
obj, ok := r.(*extensionsv1beta1.DaemonSet)
if !ok {
return nil, ok
}
return &obj.ObjectMeta, ok
},
appsv1ReplicaSet: func(r runtime.Object) (*metav1.ObjectMeta, bool) {
obj, ok := r.(*appsv1.ReplicaSet)
if !ok {
return nil, ok
}
return &obj.ObjectMeta, ok
},
appsv1Beta2ReplicaSet: func(r runtime.Object) (*metav1.ObjectMeta, bool) {
obj, ok := r.(*appsv1beta2.ReplicaSet)
if !ok {
return nil, ok
}
return &obj.ObjectMeta, ok
},
}

// retry 3 times to give the object time to propagate to the API server
const tries int = 3
const sleeptime time.Duration = 300 * time.Millisecond

func labelDeployResults(labels map[string]string, results []Artifact) {
// use the kubectl client to update all k8s objects with a skaffold watermark
client, err := kubernetes.Client()
dynClient, err := kubernetes.DynamicClient()
if err != nil {
logrus.Warnf("error retrieving kubernetes dynamic client: %s", err.Error())
return
}

client, err := kubernetes.GetClientset()
if err != nil {
logrus.Warnf("error retrieving kubernetes client: %s", err.Error())
return
Expand All @@ -258,7 +105,7 @@ func labelDeployResults(labels map[string]string, results []Artifact) {
for _, res := range results {
err = nil
for i := 0; i < tries; i++ {
if err = updateRuntimeObject(client, labels, res); err == nil {
if err = updateRuntimeObject(dynClient, client.Discovery(), labels, res); err == nil {
break
}
time.Sleep(sleeptime)
Expand All @@ -269,46 +116,81 @@ func labelDeployResults(labels map[string]string, results []Artifact) {
}
}

func addSkaffoldLabels(labels map[string]string, m *metav1.ObjectMeta) {
if m.Labels == nil {
m.Labels = map[string]string{}
func addLabels(labels map[string]string, accessor metav1.Object) {
objLabels := accessor.GetLabels()
if objLabels == nil {
objLabels = make(map[string]string)
}
for k, v := range constants.Labels.DefaultLabels {
if _, ok := objLabels[k]; !ok {
objLabels[k] = v
}
}
for k, v := range labels {
m.Labels[k] = v
for key, value := range labels {
objLabels[key] = value
}
accessor.SetLabels(objLabels)
}

func retrieveNamespace(ns string, m metav1.ObjectMeta) string {
func updateRuntimeObject(client dynamic.Interface, disco discovery.DiscoveryInterface, labels map[string]string, res Artifact) error {
originalJSON, _ := json.Marshal(*res.Obj)
modifiedObj := (*res.Obj).DeepCopyObject()
accessor, err := meta.Accessor(modifiedObj)
if err != nil {
return errors.Wrap(err, "getting metadata accessor")
}
name := accessor.GetName()
namespace := accessor.GetNamespace()
addLabels(labels, accessor)

modifiedJSON, _ := json.Marshal(modifiedObj)
p, _ := patch.CreateTwoWayMergePatch(originalJSON, modifiedJSON, modifiedObj)
gvr, err := groupVersionResource(disco, modifiedObj.GetObjectKind().GroupVersionKind())
if err != nil {
return errors.Wrap(err, "getting group version resource from obj")
}
ns, err := resolveNamespace(namespace)
if err != nil {
return errors.Wrap(err, "resolving namespace")
}
if _, err := client.Resource(gvr).Namespace(ns).Patch(name, types.StrategicMergePatchType, p); err != nil {
return errors.Wrapf(err, "patching resource %s/%s", namespace, name)
}

return nil
}

func resolveNamespace(ns string) (string, error) {
if ns != "" {
return ns
return ns, nil
}
if m.Namespace != "" {
return m.Namespace
cfg, err := kubectx.CurrentConfig()
if err != nil {
return "", errors.Wrap(err, "getting kubeconfig")
}
return "default"

current, present := cfg.Contexts[cfg.CurrentContext]
if present && current.Namespace != "" {
return current.Namespace, nil
}
return "default", nil
}

// TODO(nkubala): change this to use the client-go dynamic client or something equally clean
func updateRuntimeObject(client clientgo.Interface, labels map[string]string, res Artifact) error {
for k, v := range constants.Labels.DefaultLabels {
labels[k] = v
func groupVersionResource(disco discovery.DiscoveryInterface, gvk schema.GroupVersionKind) (schema.GroupVersionResource, error) {
resources, err := disco.ServerResourcesForGroupVersion(gvk.GroupVersion().String())
if err != nil {
return schema.GroupVersionResource{}, errors.Wrap(err, "getting server resources for group version")
}
var err error
applied := false
var metadata *metav1.ObjectMeta
originalJSON, _ := json.Marshal(*res.Obj)
modifiedObj := (*res.Obj).DeepCopyObject()
for typeStr, m := range objectMetas {
if metadata, applied = m(modifiedObj); applied {
addSkaffoldLabels(labels, metadata)
modifiedJSON, _ := json.Marshal(modifiedObj)
p, _ := patch.CreateTwoWayMergePatch(originalJSON, modifiedJSON, modifiedObj)
err = patchers[typeStr](client, retrieveNamespace(res.Namespace, *metadata), metadata.GetName(), p)
break // we should only ever apply one patch, so stop here

for _, r := range resources.APIResources {
if r.Kind == gvk.Kind {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when I hacked on this I had trouble converting the kind string into something that could be treated as a resource, mostly with capitalization (i.e. pod needs to be converted to Pod, etc.). not sure if this is something you ran into here, but might be worth addressing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I didn't have trouble with that. The only thing that seemed a little odd was that the dynamic client needed a non-empty namespace and parsing the kubeconfig uses "" instead of "default"

return schema.GroupVersionResource{
Group: gvk.Group,
Version: gvk.Version,
Resource: r.Name,
}, nil
}
}
if !applied {
logrus.Infof("unknown runtime.Object, skipping label")
}
return err

return schema.GroupVersionResource{}, fmt.Errorf("Could not find resource for %s", gvk.String())
}
21 changes: 18 additions & 3 deletions pkg/skaffold/kubernetes/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,37 @@ import (
"fmt"

"github.com/pkg/errors"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"

// Initialize all known client auth plugins
_ "k8s.io/client-go/plugin/pkg/client/auth"
)

func GetClientset() (kubernetes.Interface, error) {
config, err := getClientConfig()
if err != nil {
return nil, errors.Wrap(err, "getting client config for kubernetes client")
}
return kubernetes.NewForConfig(config)
}

func getClientConfig() (*restclient.Config, error) {
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, &clientcmd.ConfigOverrides{})
clientConfig, err := kubeConfig.ClientConfig()
if err != nil {
return nil, fmt.Errorf("Error creating kubeConfig: %s", err)
}
client, err := kubernetes.NewForConfig(clientConfig)
return clientConfig, nil
}

func GetDynamicClient() (dynamic.Interface, error) {
config, err := getClientConfig()
if err != nil {
return nil, errors.Wrap(err, "Error creating new client from kubeConfig.ClientConfig()")
return nil, errors.Wrap(err, "getting client config for dynamic client")
}
return client, nil
return dynamic.NewForConfig(config)
}
Loading