Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide detailed error for removed apiVersions #809

Merged
merged 5 commits into from
Sep 19, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Remove redundant server version code
  • Loading branch information
lblackstone committed Sep 19, 2019
commit be8a925c1db21a7fa6c25a43fe331e9d3efed1b6
9 changes: 5 additions & 4 deletions pkg/await/await.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/golang/glog"
"github.com/pulumi/pulumi-kubernetes/pkg/clients"
"github.com/pulumi/pulumi-kubernetes/pkg/cluster"
"github.com/pulumi/pulumi-kubernetes/pkg/logging"
"github.com/pulumi/pulumi-kubernetes/pkg/metadata"
"github.com/pulumi/pulumi-kubernetes/pkg/openapi"
Expand Down Expand Up @@ -422,7 +423,7 @@ func Deletion(c DeleteConfig) error {
return nilIfGVKDeleted(err)
}

err = deleteResource(c.Name, client, ServerVersion(c.ClientSet.DiscoveryClientCached))
err = deleteResource(c.Name, client, cluster.GetServerVersion(c.ClientSet.DiscoveryClientCached))
if err != nil {
return nilIfGVKDeleted(err)
}
Expand Down Expand Up @@ -500,15 +501,15 @@ func Deletion(c DeleteConfig) error {
return waitErr
}

func deleteResource(name string, client dynamic.ResourceInterface, version serverVersion) error {
func deleteResource(name string, client dynamic.ResourceInterface, version cluster.ServerVersion) error {
// Manually set delete propagation for Kubernetes versions < 1.6 to avoid bugs.
deleteOpts := metav1.DeleteOptions{}
if version.Compare(1, 6) < 0 {
if version.Compare(cluster.ServerVersion{Major: 1, Minor: 6}) < 0 {
// 1.5.x option.
boolFalse := false
// nolint
deleteOpts.OrphanDependents = &boolFalse
} else if version.Compare(1, 7) < 0 {
} else if version.Compare(cluster.ServerVersion{Major: 1, Minor: 7}) < 0 {
// 1.6.x option. Background delete propagation is broken in k8s v1.6.
fg := metav1.DeletePropagationForeground
deleteOpts.PropagationPolicy = &fg
Expand Down
17 changes: 9 additions & 8 deletions pkg/await/core_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"reflect"
"time"

"github.com/pulumi/pulumi-kubernetes/pkg/cluster"
"github.com/pulumi/pulumi/pkg/util/cmdutil"

"github.com/golang/glog"
Expand Down Expand Up @@ -140,7 +141,7 @@ func (sia *serviceInitAwaiter) Await() error {
}
defer endpointWatcher.Stop()

version := ServerVersion(sia.config.clientSet.DiscoveryClientCached)
version := cluster.GetServerVersion(sia.config.clientSet.DiscoveryClientCached)

timeout := metadata.TimeoutDuration(sia.config.timeout, sia.config.currentInputs, DefaultServiceTimeoutMins*60)
return sia.await(serviceWatcher, endpointWatcher, time.After(timeout), make(chan struct{}), version)
Expand Down Expand Up @@ -175,14 +176,14 @@ func (sia *serviceInitAwaiter) Read() error {
endpointList = &unstructured.UnstructuredList{Items: []unstructured.Unstructured{}}
}

version := ServerVersion(sia.config.clientSet.DiscoveryClientCached)
version := cluster.GetServerVersion(sia.config.clientSet.DiscoveryClientCached)

return sia.read(service, endpointList, version)
}

func (sia *serviceInitAwaiter) read(
service *unstructured.Unstructured, endpoints *unstructured.UnstructuredList,
version serverVersion,
version cluster.ServerVersion,
) error {
sia.processServiceEvent(watchAddedEvent(service))

Expand Down Expand Up @@ -216,7 +217,7 @@ func (sia *serviceInitAwaiter) await(
serviceWatcher, endpointWatcher watch.Interface,
timeout <-chan time.Time,
settled chan struct{},
version serverVersion,
version cluster.ServerVersion,
) error {
sia.config.logStatus(diag.Info, "[1/3] Finding Pods to direct traffic to")

Expand Down Expand Up @@ -394,14 +395,14 @@ func (sia *serviceInitAwaiter) emptyHeadlessOrExternalName() bool {
//
// [1]: https://github.com/kubernetes/dns/issues/174
// [2]: https://github.com/kubernetes/kubernetes/commit/1c0137252465574519baf99252df8d75048f1304
func (sia *serviceInitAwaiter) hasHeadlessServicePortBug(version serverVersion) bool {
func (sia *serviceInitAwaiter) hasHeadlessServicePortBug(version cluster.ServerVersion) bool {
// This bug only affects headless or external name Services.
if !sia.isHeadlessService() && !sia.isExternalNameService() {
return false
}

// k8s versions < 1.12 have the bug.
if version.Compare(1, 12) < 0 {
if version.Compare(cluster.ServerVersion{Major: 1, Minor: 12}) < 0 {
portsI, _ := openapi.Pluck(sia.service.Object, "spec", "ports")
ports, _ := portsI.([]map[string]interface{})
hasPorts := len(ports) > 0
Expand All @@ -416,7 +417,7 @@ func (sia *serviceInitAwaiter) hasHeadlessServicePortBug(version serverVersion)
}

// shouldWaitForPods determines whether to wait for Pods to be ready before marking the Service ready.
func (sia *serviceInitAwaiter) shouldWaitForPods(version serverVersion) bool {
func (sia *serviceInitAwaiter) shouldWaitForPods(version cluster.ServerVersion) bool {
// For these special cases, skip the wait for Pod logic.
if sia.emptyHeadlessOrExternalName() || sia.hasHeadlessServicePortBug(version) {
sia.endpointsReady = true
Expand All @@ -426,7 +427,7 @@ func (sia *serviceInitAwaiter) shouldWaitForPods(version serverVersion) bool {
return true
}

func (sia *serviceInitAwaiter) checkAndLogStatus(version serverVersion) bool {
func (sia *serviceInitAwaiter) checkAndLogStatus(version cluster.ServerVersion) bool {
if !sia.shouldWaitForPods(version) {
return sia.serviceReady
}
Expand Down
13 changes: 7 additions & 6 deletions pkg/await/core_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

"github.com/pulumi/pulumi-kubernetes/pkg/cluster"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
Expand All @@ -15,7 +16,7 @@ func Test_Core_Service(t *testing.T) {
description string
serviceInput func(namespace, name string) *unstructured.Unstructured
do func(services, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time)
version serverVersion
version cluster.ServerVersion
expectedError error
}{
{
Expand Down Expand Up @@ -190,7 +191,7 @@ func Test_Core_Service(t *testing.T) {
{
description: "Should succeed if non-empty headless service doesn't target any Pods before k8s 1.12",
serviceInput: headlessNonemptyServiceInput,
version: serverVersion{1, 11},
version: cluster.ServerVersion{Major: 1, Minor: 11},
do: func(services, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) {
services <- watchAddedEvent(headlessNonemptyServiceOutput("default", "foo-4setj4y6"))

Expand All @@ -201,7 +202,7 @@ func Test_Core_Service(t *testing.T) {
{
description: "Should fail if non-empty headless service doesn't target any Pods",
serviceInput: headlessNonemptyServiceInput,
version: serverVersion{1, 12},
version: cluster.ServerVersion{Major: 1, Minor: 12},
do: func(services, endpoints chan watch.Event, settled chan struct{}, timeout chan time.Time) {
services <- watchAddedEvent(headlessNonemptyServiceOutput("default", "foo-4setj4y6"))

Expand Down Expand Up @@ -239,7 +240,7 @@ func Test_Core_Service_Read(t *testing.T) {
serviceInput func(namespace, name string) *unstructured.Unstructured
service func(namespace, name string) *unstructured.Unstructured
endpoint func(namespace, name string) *unstructured.Unstructured
version serverVersion
version cluster.ServerVersion
expectedSubErrors []string
}{
{
Expand Down Expand Up @@ -280,13 +281,13 @@ func Test_Core_Service_Read(t *testing.T) {
description: "Read succeed if headless non-empty Service doesn't target any Pods before k8s 1.12",
serviceInput: headlessNonemptyServiceInput,
service: headlessNonemptyServiceInput,
version: serverVersion{1, 11},
version: cluster.ServerVersion{Major: 1, Minor: 11},
},
{
description: "Read fail if headless non-empty Service doesn't target any Pods",
serviceInput: headlessNonemptyServiceInput,
service: headlessNonemptyServiceInput,
version: serverVersion{1, 12},
version: cluster.ServerVersion{Major: 1, Minor: 12},
expectedSubErrors: []string{
"Service does not target any Pods. Selected Pods may not be ready, or " +
"field '.spec.selector' may not match labels on any Pods"},
Expand Down
36 changes: 0 additions & 36 deletions pkg/await/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package await

import (
"fmt"
"log"
"sort"

Expand All @@ -27,7 +26,6 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
)

Expand Down Expand Up @@ -66,17 +64,6 @@ func watchAddedEvent(obj runtime.Object) watch.Event {
}
}

// nolint
func stringifyEvents(events []v1.Event) string {
var output string
for _, e := range events {
output += fmt.Sprintf("\n * %s (%s): %s: %s",
e.InvolvedObject.Name, e.InvolvedObject.Kind,
e.Reason, e.Message)
}
return output
}

// nolint
func getLastWarningsForObject(
clientForEvents dynamic.ResourceInterface, namespace, name, kind string, limit int,
Expand Down Expand Up @@ -152,29 +139,6 @@ func getLastWarningsForObject(

// --------------------------------------------------------------------------

// Version helpers.

// --------------------------------------------------------------------------

// ServerVersion attempts to retrieve the server version from k8s.
// Returns the configured default version in case this fails.
func ServerVersion(cdi discovery.CachedDiscoveryInterface) serverVersion {
var version serverVersion
if sv, err := cdi.ServerVersion(); err == nil {
if v, err := parseVersion(sv); err == nil {
version = v
} else {
version = defaultVersion()
}
} else {
version = defaultVersion()
}

return version
}

// --------------------------------------------------------------------------

// Response helpers.

// --------------------------------------------------------------------------
Expand Down
126 changes: 1 addition & 125 deletions pkg/await/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,131 +14,7 @@

package await

import (
"fmt"
"regexp"
"strconv"
"strings"

"github.com/pulumi/pulumi/pkg/diag"
"github.com/pulumi/pulumi/pkg/util/cmdutil"
"k8s.io/apimachinery/pkg/version"
)

// Format v0.0.0(-master+$Format:%h$)
var gitVersionRe = regexp.MustCompile(`v([0-9]+).([0-9]+).([0-9]+).*`)

// serverVersion captures k8s major.minor version in a parsed form
type serverVersion struct {
Major, Minor int
}

// Compare returns -1/0/+1 iff v is less than / equal / greater than major.minor
func (v serverVersion) Compare(major, minor int) int {
a := v.Major
b := major

if a == b {
a = v.Minor
b = minor
}

var res int
if a > b {
res = 1
} else if a == b {
res = 0
} else {
res = -1
}
return res
}

func (v serverVersion) String() string {
return fmt.Sprintf("%d.%d", v.Major, v.Minor)
}

// gitVersion captures k8s major.minor.patch version in a parsed form
type gitVersion struct {
Major, Minor, Patch int
}

func (gv gitVersion) String() string {
return fmt.Sprintf("%d.%d.%d", gv.Major, gv.Minor, gv.Patch)
}

// DefaultVersion takes a wild guess (v1.9) at the version of a Kubernetes cluster.
func defaultVersion() serverVersion {
cmdutil.Diag().Warningf(
diag.Message("", "Cluster failed to report its version number; falling back to 1.9"), false)

//
// Fallback behavior to work around [1]. Some versions of minikube erroneously report a blank
// `version.Info`, which will cause us to break. It is necessary for us to check this version for
// `Delete`, because of bugs and quirks in various Kubernetes versions. Currently it is only
// important that we know the version is above or below 1.5, so here we (hopefully) temporarily
// choose to fall back to 1.9, which is what most people running minikube use out of the box.
//
// [1]: https://github.com/kubernetes/minikube/issues/2505
//
return serverVersion{Major: 1, Minor: 9}
}

func parseGitVersion(versionString string) (gitVersion, error) {
parsedVersion := gitVersionRe.FindStringSubmatch(versionString)
if len(parsedVersion) != 4 {
err := fmt.Errorf("unable to parse git version %q", versionString)
return gitVersion{}, err
}

var gv gitVersion
var err error
gv.Major, err = strconv.Atoi(parsedVersion[1])
if err != nil {
return gitVersion{}, err
}
gv.Minor, err = strconv.Atoi(parsedVersion[2])
if err != nil {
return gitVersion{}, err
}
gv.Patch, err = strconv.Atoi(parsedVersion[3])
if err != nil {
return gitVersion{}, err
}

return gv, nil
}

// parseVersion parses version.Info into a serverVersion struct
func parseVersion(v *version.Info) (serverVersion, error) {
fallbackToGitVersion := false

major, err := strconv.Atoi(v.Major)
if err != nil {
fallbackToGitVersion = true
}

// trim "+" in minor version (happened on GKE)
v.Minor = strings.TrimSuffix(v.Minor, "+")

minor, err := strconv.Atoi(v.Minor)
if err != nil {
fallbackToGitVersion = true
}

if fallbackToGitVersion {
gv, err := parseGitVersion(v.GitVersion)
if err != nil {
return serverVersion{}, err
}

return serverVersion{Major: gv.Major, Minor: gv.Minor}, nil
}

return serverVersion{Major: major, Minor: minor}, nil
}

// canonicalizeDeploymentAPIVersion unifies the various pre-release apiVerion values for a
// canonicalizeDeploymentAPIVersion unifies the various pre-release apiVersion values for a
// Deployment into "apps/v1".
func canonicalizeDeploymentAPIVersion(ver string) string {
switch ver {
Expand Down
Loading