diff --git a/cmd/yurt-node-servant/node-servant.go b/cmd/yurt-node-servant/node-servant.go index 8a5bd3210dc..001cb63ea59 100644 --- a/cmd/yurt-node-servant/node-servant.go +++ b/cmd/yurt-node-servant/node-servant.go @@ -28,6 +28,7 @@ import ( "github.com/openyurtio/openyurt/cmd/yurt-node-servant/convert" preflightconvert "github.com/openyurtio/openyurt/cmd/yurt-node-servant/preflight-convert" "github.com/openyurtio/openyurt/cmd/yurt-node-servant/revert" + upgrade "github.com/openyurtio/openyurt/cmd/yurt-node-servant/static-pod-upgrade" "github.com/openyurtio/openyurt/pkg/projectinfo" ) @@ -49,6 +50,7 @@ func main() { rootCmd.AddCommand(revert.NewRevertCmd()) rootCmd.AddCommand(preflightconvert.NewxPreflightConvertCmd()) rootCmd.AddCommand(config.NewConfigCmd()) + rootCmd.AddCommand(upgrade.NewUpgradeCmd()) if err := rootCmd.Execute(); err != nil { // run command os.Exit(1) diff --git a/cmd/yurt-node-servant/static-pod-upgrade/upgrade.go b/cmd/yurt-node-servant/static-pod-upgrade/upgrade.go new file mode 100644 index 00000000000..dd2c8177e93 --- /dev/null +++ b/cmd/yurt-node-servant/static-pod-upgrade/upgrade.go @@ -0,0 +1,58 @@ +/* +Copyright 2023 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package upgrade + +import ( + "github.com/spf13/cobra" + "github.com/spf13/pflag" + "k8s.io/klog/v2" + + upgrade "github.com/openyurtio/openyurt/pkg/node-servant/static-pod-upgrade" +) + +// NewUpgradeCmd generates a new upgrade command +func NewUpgradeCmd() *cobra.Command { + o := upgrade.NewUpgradeOptions() + cmd := &cobra.Command{ + Use: "static-pod-upgrade", + Short: "", + Run: func(cmd *cobra.Command, args []string) { + cmd.Flags().VisitAll(func(flag *pflag.Flag) { + klog.Infof("FLAG: --%s=%q", flag.Name, flag.Value) + }) + + if err := o.Validate(); err != nil { + klog.Fatalf("Fail to validate static pod upgrade args, %v", err) + } + + ctrl, err := upgrade.NewWithOptions(o) + if err != nil { + klog.Fatalf("Fail to create static-pod-upgrade controller, %v", err) + } + + if err = ctrl.Upgrade(); err != nil { + klog.Fatalf("Fail to upgrade static pod, %v", err) + } + + klog.Info("Static pod upgrade Success") + }, + Args: cobra.NoArgs, + } + o.AddFlags(cmd.Flags()) + + return cmd +} diff --git a/pkg/node-servant/static-pod-upgrade/options.go b/pkg/node-servant/static-pod-upgrade/options.go new file mode 100644 index 00000000000..970e5b6f13d --- /dev/null +++ b/pkg/node-servant/static-pod-upgrade/options.go @@ -0,0 +1,65 @@ +/* +Copyright 2023 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package upgrade + +import ( + "fmt" + "time" + + "github.com/spf13/pflag" +) + +const ( + DefaultStaticPodRunningCheckTimeout = 2 * time.Minute +) + +// Options has the information that required by static-pod-upgrade operation +type Options struct { + name string + namespace string + manifest string + hash string + mode string + timeout time.Duration +} + +// NewUpgradeOptions creates a new Options +func NewUpgradeOptions() *Options { + return &Options{ + timeout: DefaultStaticPodRunningCheckTimeout, + } +} + +// AddFlags sets flags. +func (o *Options) AddFlags(fs *pflag.FlagSet) { + fs.StringVar(&o.name, "name", o.name, "The name of static pod which needs be upgraded") + fs.StringVar(&o.namespace, "namespace", o.namespace, "The namespace of static pod which needs be upgraded") + fs.StringVar(&o.manifest, "manifest", o.manifest, "The manifest file name of static pod which needs be upgraded") + fs.StringVar(&o.hash, "hash", o.hash, "The hash value of new static pod specification") + fs.StringVar(&o.mode, "mode", o.mode, "The upgrade mode which is used") + fs.DurationVar(&o.timeout, "timeout", o.timeout, "The timeout for upgrade success check.") +} + +// Validate validates Options +func (o *Options) Validate() error { + if len(o.name) == 0 || len(o.namespace) == 0 || len(o.manifest) == 0 || len(o.hash) == 0 || len(o.mode) == 0 { + return fmt.Errorf("args can not be empty, name is %s, namespace is %s,manifest is %s, hash is %s,mode is %s", + o.name, o.namespace, o.manifest, o.hash, o.mode) + } + + return nil +} diff --git a/pkg/node-servant/static-pod-upgrade/upgrade.go b/pkg/node-servant/static-pod-upgrade/upgrade.go new file mode 100644 index 00000000000..b4c4c82db56 --- /dev/null +++ b/pkg/node-servant/static-pod-upgrade/upgrade.go @@ -0,0 +1,184 @@ +/* +Copyright 2023 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package upgrade + +import ( + "fmt" + "os" + "path/filepath" + "time" + + "k8s.io/klog/v2" + + "github.com/openyurtio/openyurt/pkg/node-servant/static-pod-upgrade/util" +) + +const ( + // TODO: use constant value of static-pod controller + OTA = "ota" + Auto = "auto" +) + +var ( + DefaultConfigmapPath = "/data" + DefaultManifestPath = "/etc/kubernetes/manifests" + DefaultUpgradePath = "/tmp/manifests" +) + +type Controller struct { + // Name of static pod + name string + // Namespace of static pod + namespace string + // Manifest file name of static pod + manifest string + // The latest static pod hash + hash string + // Only support `OTA` and `Auto` + upgradeMode string + // Timeout for upgrade success check + timeout time.Duration + + // Manifest path of static pod, default `/etc/kubernetes/manifests/manifestName.yaml` + manifestPath string + // The backup manifest path, default `/etc/kubernetes/manifests/openyurtio-upgrade/manifestName.bak` + bakManifestPath string + // Default is `/data/podName` + configMapDataPath string + // The latest manifest path, default `/etc/kubernetes/manifests/openyurtio-upgrade/manifestName.upgrade` + upgradeManifestPath string +} + +func NewWithOptions(o *Options) (*Controller, error) { + ctrl := &Controller{ + name: o.name, + namespace: o.namespace, + manifest: o.manifest, + hash: o.hash, + upgradeMode: o.mode, + timeout: o.timeout, + } + + ctrl.manifestPath = filepath.Join(DefaultManifestPath, util.WithYamlSuffix(ctrl.manifest)) + ctrl.bakManifestPath = filepath.Join(DefaultUpgradePath, util.WithBackupSuffix(ctrl.manifest)) + ctrl.configMapDataPath = filepath.Join(DefaultConfigmapPath, ctrl.manifest) + ctrl.upgradeManifestPath = filepath.Join(DefaultUpgradePath, util.WithUpgradeSuffix(ctrl.manifest)) + + return ctrl, nil +} + +func (ctrl *Controller) Upgrade() error { + // 1. Check old manifest and the latest manifest exist + if err := ctrl.checkManifestFileExist(); err != nil { + return err + } + klog.Info("Check old manifest and new manifest files existence success") + + // 2. prepare the latest manifest + if err := ctrl.prepareManifest(); err != nil { + return err + } + klog.Info("Prepare upgrade manifest success") + + // 3. execute upgrade operations + switch ctrl.upgradeMode { + case Auto: + return ctrl.AutoUpgrade() + } + + return nil +} + +func (ctrl *Controller) AutoUpgrade() error { + // (1) Back up the old manifest in case of upgrade failure + if err := ctrl.backupManifest(); err != nil { + return err + } + klog.Info("Auto upgrade backupManifest success") + + // (2) Replace manifest and kubelet will upgrade the static pod automatically + if err := ctrl.replaceManifest(); err != nil { + return err + } + klog.Info("Auto upgrade replaceManifest success") + + // (3) Verify the new static pod is running + ok, err := ctrl.verify() + if err != nil { + if err := ctrl.rollbackManifest(); err != nil { + klog.Errorf("Fail to rollback manifest when upgrade failed, %v", err) + } + return err + } + if !ok { + if err := ctrl.rollbackManifest(); err != nil { + klog.Errorf("Fail to rollback manifest when upgrade failed, %v", err) + } + return fmt.Errorf("the latest static pod is not running") + } + klog.Info("Auto upgrade verify success") + + return nil +} + +// checkManifestFileExist check if the specified files exist +func (ctrl *Controller) checkManifestFileExist() error { + check := []string{ctrl.manifestPath, ctrl.configMapDataPath} + for _, c := range check { + _, err := os.Stat(c) + if os.IsNotExist(err) { + return fmt.Errorf("manifest %s does not exist", c) + } + } + + return nil +} + +// prepareManifest move the latest manifest to DefaultUpgradePath and set `.upgrade` suffix +// TODO: In kubernetes when mount configmap file to the sub path of hostpath mount, it will not be persistent +// TODO: Init configmap(latest manifest) to a default place and move it to `DefaultUpgradePath` to save it persistent +func (ctrl *Controller) prepareManifest() error { + // Make sure upgrade dir exist + if _, err := os.Stat(DefaultUpgradePath); os.IsNotExist(err) { + if err = os.Mkdir(DefaultUpgradePath, 0755); err != nil { + return err + } + } + + return util.CopyFile(ctrl.configMapDataPath, ctrl.upgradeManifestPath) +} + +// backUpManifest backup the old manifest in order to roll back when errors occur +func (ctrl *Controller) backupManifest() error { + return util.CopyFile(ctrl.manifestPath, ctrl.bakManifestPath) +} + +// replaceManifest replace old manifest with the latest one, it achieves static pod upgrade +func (ctrl *Controller) replaceManifest() error { + return util.CopyFile(ctrl.upgradeManifestPath, ctrl.manifestPath) +} + +// rollbackManifest replace new manifest with the backup +func (ctrl *Controller) rollbackManifest() error { + return util.CopyFile(ctrl.bakManifestPath, ctrl.manifestPath) +} + +// verify make sure the latest static pod is running +// return false when the latest static pod failed or check status time out +func (ctrl *Controller) verify() (bool, error) { + return util.WaitForPodRunning(ctrl.namespace, ctrl.name, ctrl.hash, ctrl.timeout) +} diff --git a/pkg/node-servant/static-pod-upgrade/upgrade_test.go b/pkg/node-servant/static-pod-upgrade/upgrade_test.go new file mode 100644 index 00000000000..441b31cc226 --- /dev/null +++ b/pkg/node-servant/static-pod-upgrade/upgrade_test.go @@ -0,0 +1,123 @@ +/* +Copyright 2023 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package upgrade + +import ( + "os" + "path/filepath" + "strings" + "testing" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" + + upgradeUtil "github.com/openyurtio/openyurt/pkg/node-servant/static-pod-upgrade/util" + "github.com/openyurtio/openyurt/pkg/yurthub/util" +) + +const ( + TestPodName = "nginx" + TestHashValue = "789c7f9f47" + TestManifest = "manifest" +) + +func Test(t *testing.T) { + // Temporarily modify the manifest path in order to test + DefaultManifestPath = t.TempDir() + DefaultConfigmapPath = t.TempDir() + DefaultUpgradePath = t.TempDir() + _, _ = os.Create(filepath.Join(DefaultManifestPath, upgradeUtil.WithYamlSuffix(TestManifest))) + _, _ = os.Create(filepath.Join(DefaultConfigmapPath, TestManifest)) + + runningStaticPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: TestPodName, + Namespace: metav1.NamespaceDefault, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + } + + modes := []string{"ota", "auto"} + + for _, mode := range modes { + /* + 1. Prepare the test environment + */ + c := fake.NewSimpleClientset(runningStaticPod) + // Add watch event for verify + watcher := watch.NewFake() + c.PrependWatchReactor("pods", k8stesting.DefaultWatchReactor(watcher, nil)) + go func() { + watcher.Add(runningStaticPod) + }() + + /* + 2. Test + */ + o := &Options{ + name: TestPodName, + namespace: metav1.NamespaceDefault, + manifest: TestManifest, + hash: TestHashValue, + mode: mode, + timeout: DefaultStaticPodRunningCheckTimeout, + } + ctrl, err := NewWithOptions(o) + if err != nil { + t.Errorf("Fail to get upgrade controller, %v", err) + } + + if err := ctrl.Upgrade(); err != nil { + if strings.Contains(err.Error(), "fail to access yurthub pods API") { + t.Errorf("Fail to upgrade, %v", err) + } + } + + /* + 3. Verify OTA upgrade mode + */ + if mode == "ota" { + ok, err := util.FileExists(ctrl.upgradeManifestPath) + if err != nil { + t.Errorf("Fail to check manifest existence for ota upgrade, %v", err) + } + if !ok { + t.Errorf("Manifest for ota upgrade does not exist") + } + } + /* + 4. Verify Auto upgrade mode + */ + if mode == "auto" { + checkFiles := []string{ctrl.upgradeManifestPath, ctrl.bakManifestPath} + for _, file := range checkFiles { + ok, err := util.FileExists(file) + if err != nil { + t.Errorf("Fail to check %s manifest existence for auto upgrade, %v", file, err) + } + if !ok { + t.Errorf("Manifest %s for auto upgrade does not exist", file) + } + } + } + } +} diff --git a/pkg/node-servant/static-pod-upgrade/util/pods.go b/pkg/node-servant/static-pod-upgrade/util/pods.go new file mode 100644 index 00000000000..b19c1a59a54 --- /dev/null +++ b/pkg/node-servant/static-pod-upgrade/util/pods.go @@ -0,0 +1,98 @@ +/* +Copyright 2023 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + "io/ioutil" + "math/rand" + "net/http" + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" +) + +const ( + YurtHubAddress = "http://127.0.0.1:10267" + YurtHubAPIPath = "/pods" +) + +func GetPodFromYurtHub(namespace, name string) (*v1.Pod, error) { + podList, err := GetPodsFromYurtHub(YurtHubAddress + YurtHubAPIPath) + if err != nil { + return nil, err + } + + for i, pod := range podList.Items { + if pod.Namespace == namespace && pod.Name == name { + return &podList.Items[i], nil + } + } + + return nil, fmt.Errorf("fail to find pod %s/%s", namespace, name) +} + +func GetPodsFromYurtHub(url string) (*v1.PodList, error) { + data, err := getPodsDataFromYurtHub(url) + if err != nil { + return nil, err + } + + podList, err := decodePods(data) + if err != nil { + return nil, err + } + + return podList, nil +} + +func getPodsDataFromYurtHub(url string) ([]byte, error) { + // avoid accessing conflict + rand.Seed(time.Now().UnixNano()) + time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) + + resp, err := http.Get(url) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("fail to access yurthub pods API, returned status: %v", resp.Status) + } + + data, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + return data, nil +} + +func decodePods(data []byte) (*v1.PodList, error) { + codecFactory := serializer.NewCodecFactory(runtime.NewScheme()) + codec := codecFactory.LegacyCodec(schema.GroupVersion{Group: v1.GroupName, Version: "v1"}) + + podList := new(v1.PodList) + if _, _, err := codec.Decode(data, nil, podList); err != nil { + return nil, fmt.Errorf("failed to decode pod list: %s", err) + } + return podList, nil +} diff --git a/pkg/node-servant/static-pod-upgrade/util/util.go b/pkg/node-servant/static-pod-upgrade/util/util.go new file mode 100644 index 00000000000..97f8de76e4b --- /dev/null +++ b/pkg/node-servant/static-pod-upgrade/util/util.go @@ -0,0 +1,112 @@ +/* +Copyright 2023 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "context" + "fmt" + "io" + "os" + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" +) + +const ( + YamlSuffix string = ".yaml" + BackupSuffix = ".bak" + UpgradeSuffix string = ".upgrade" + + StaticPodHashAnnotation = "openyurt.io/static-pod-hash" +) + +func WithYamlSuffix(path string) string { + return path + YamlSuffix +} + +func WithBackupSuffix(path string) string { + return path + BackupSuffix +} + +func WithUpgradeSuffix(path string) string { + return path + UpgradeSuffix +} + +// CopyFile copy file content from src to dst, if destination file not exist, then create it +func CopyFile(src, dst string) error { + in, err := os.Open(src) + if err != nil { + return err + } + defer in.Close() + + out, err := os.OpenFile(dst, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0666) + if err != nil { + return err + } + defer out.Close() + + _, err = io.Copy(out, in) + if err != nil { + return err + } + return nil +} + +// WaitForPodRunning waits static pod to run +// Success: Static pod annotation `StaticPodHashAnnotation` value equals to function argument hash +// Failed: Receive PodFailed event +func WaitForPodRunning(namespace, name, hash string, timeout time.Duration) (bool, error) { + klog.Infof("WaitForPodRuning namespace is %s, name is %s", namespace, name) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + checkPod := func(pod *v1.Pod) (hasResult, result bool) { + h := pod.Annotations[StaticPodHashAnnotation] + if pod.Status.Phase == v1.PodRunning && h == hash { + return true, true + } + + if pod.Status.Phase == v1.PodFailed { + return true, false + } + + return false, false + } + + for { + select { + case <-ctx.Done(): + return false, fmt.Errorf("timeout waiting for static pod %s/%s to be running", namespace, name) + case <-ticker.C: + pod, err := GetPodFromYurtHub(namespace, name) + if err != nil { + klog.V(4).Infof("Temporarily fail to get pod from YurtHub, %v", err) + } + if pod != nil { + hasResult, result := checkPod(pod) + if hasResult { + return result, nil + } + } + } + } +}