diff --git a/pkg/k8s/k8s.go b/pkg/k8s/k8s.go index dfbed8d..6ab87be 100644 --- a/pkg/k8s/k8s.go +++ b/pkg/k8s/k8s.go @@ -3,6 +3,7 @@ package k8s import ( "fmt" + apierrs "k8s.io/apimachinery/pkg/api/errors" "os" "path/filepath" @@ -94,3 +95,8 @@ func getClusterConfig() (*rest.Config, error) { kubeconfig := filepath.Join(os.Getenv("HOME"), ".kube", "config") return clientcmd.BuildConfigFromFlags("", kubeconfig) } + +// isNotFound checks if the error is a NotFound error +func isNotFound(err error) bool { + return apierrs.IsNotFound(err) +} diff --git a/pkg/k8s/k8s_daemonset.go b/pkg/k8s/k8s_daemonset.go new file mode 100644 index 0000000..e672a14 --- /dev/null +++ b/pkg/k8s/k8s_daemonset.go @@ -0,0 +1,132 @@ +package k8s + +import ( + "context" + "fmt" + "github.com/sirupsen/logrus" + appv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "time" +) + +// DaemonSetExists checks if a daemonset exists. +func DaemonSetExists(namespace, name string) (bool, error) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if !IsInitialized() { + return false, fmt.Errorf("knuu is not initialized") + } + _, err := Clientset().AppsV1().DaemonSets(namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + if isNotFound(err) { + return false, nil + } + return false, fmt.Errorf("error getting daemonset %s: %w", name, err) + } + return true, nil +} + +// GetDaemonSet retrieves a daemonset. +func GetDaemonSet(namespace, name string) (*appv1.DaemonSet, error) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if !IsInitialized() { + return nil, fmt.Errorf("knuu is not initialized") + } + ds, err := Clientset().AppsV1().DaemonSets(namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("error getting daemonset %s: %w", name, err) + } + return ds, nil +} + +// CreateDaemonSet creates a new daemonset. +func CreateDaemonSet(namespace, name string, labels map[string]string, initContainers []v1.Container, containers []v1.Container) (*appv1.DaemonSet, error) { + + ds, err := prepareDaemonSet(namespace, name, labels, initContainers, containers) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if !IsInitialized() { + return nil, fmt.Errorf("knuu is not initialized") + } + created, err := Clientset().AppsV1().DaemonSets(namespace).Create(ctx, ds, metav1.CreateOptions{}) + if err != nil { + return nil, fmt.Errorf("error creating daemonset %s: %w", name, err) + } + logrus.Debugf("DaemonSet %s created in namespace %s", name, namespace) + return created, nil +} + +// UpdateDaemonSet updates an existing daemonset. +func UpdateDaemonSet(namespace, name string, labels map[string]string, initContainers []v1.Container, containers []v1.Container) (*appv1.DaemonSet, error) { + + ds, err := prepareDaemonSet(namespace, name, labels, initContainers, containers) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if !IsInitialized() { + return nil, fmt.Errorf("knuu is not initialized") + } + updated, err := Clientset().AppsV1().DaemonSets(namespace).Update(ctx, ds, metav1.UpdateOptions{}) + if err != nil { + return nil, fmt.Errorf("error updating daemonset %s: %w", name, err) + } + logrus.Debugf("DaemonSet %s updated in namespace %s", name, namespace) + return updated, nil +} + +// DeleteDaemonSet deletes an existing daemonset. +func DeleteDaemonSet(namespace, name string) error { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if !IsInitialized() { + return fmt.Errorf("knuu is not initialized") + } + if err := Clientset().AppsV1().DaemonSets(namespace).Delete(ctx, name, metav1.DeleteOptions{}); err != nil { + return fmt.Errorf("error deleting daemonset %s: %w", name, err) + } + logrus.Debugf("DaemonSet %s deleted in namespace %s", name, namespace) + return nil +} + +// prepareService constructs a new Service object with the specified parameters. +func prepareDaemonSet(namespace, name string, labels map[string]string, initContainers []v1.Container, containers []v1.Container) (*appv1.DaemonSet, error) { + + ds := &appv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Labels: labels, + }, + Spec: appv1.DaemonSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + }, + Spec: v1.PodSpec{ + InitContainers: initContainers, + Containers: containers, + }, + }, + }, + } + + return ds, nil + +} diff --git a/pkg/k8s/k8s_pod.go b/pkg/k8s/k8s_pod.go index aaf6690..3efe864 100644 --- a/pkg/k8s/k8s_pod.go +++ b/pkg/k8s/k8s_pod.go @@ -74,13 +74,13 @@ type PodConfig struct { CPURequest string // CPU request for the container } -// ReplacePod replaces a pod in the given namespace and returns the new Pod object. -func ReplacePod(podConfig PodConfig) (*v1.Pod, error) { +// ReplacePodWithGracePeriod replaces a pod in the given namespace and returns the new Pod object with a grace period. +func ReplacePodWithGracePeriod(podConfig PodConfig, gracePeriod *int64) (*v1.Pod, error) { // Log a debug message to indicate that we are replacing a pod logrus.Debugf("Replacing pod %s", podConfig.Name) // Delete the existing pod (if any) - if err := DeletePod(podConfig.Namespace, podConfig.Name); err != nil { + if err := DeletePodWithGracePeriod(podConfig.Namespace, podConfig.Name, gracePeriod); err != nil { return nil, fmt.Errorf("failed to delete pod: %v", err) } @@ -103,6 +103,11 @@ func ReplacePod(podConfig PodConfig) (*v1.Pod, error) { return pod, nil } +// ReplacePod replaces a pod in the given namespace and returns the new Pod object. +func ReplacePod(podConfig PodConfig) (*v1.Pod, error) { + return ReplacePodWithGracePeriod(podConfig, nil) +} + // IsPodRunning returns true if the pod is running. func IsPodRunning(namespace, name string) (bool, error) { // Get the pod from Kubernetes API server @@ -173,9 +178,8 @@ func RunCommandInPod(namespace, podName, containerName string, cmd []string) (st return stdout.String(), nil } -// DeletePod deletes a pod with the given name in the specified namespace. -// Skips the deletion if the pod does not exist. -func DeletePod(namespace, name string) error { +// DeletePodWithGracePeriod deletes a pod with the given name in the specified namespace. +func DeletePodWithGracePeriod(namespace, name string, gracePeriodSeconds *int64) error { // Get the Pod object from the API server _, err := getPod(namespace, name) if err != nil { @@ -190,14 +194,21 @@ func DeletePod(namespace, name string) error { if !IsInitialized() { return fmt.Errorf("knuu is not initialized") } - if err := Clientset().CoreV1().Pods(namespace).Delete(ctx, name, metav1.DeleteOptions{}); err != nil { + deleteOptions := metav1.DeleteOptions{ + GracePeriodSeconds: gracePeriodSeconds, + } + if err := Clientset().CoreV1().Pods(namespace).Delete(ctx, name, deleteOptions); err != nil { return fmt.Errorf("failed to delete pod %s: %v", name, err) } - logrus.Debugf("Pod %s deleted in namespace %s", name, namespace) return nil } +// DeletePod deletes a pod with the given name in the specified namespace. +func DeletePod(namespace, name string) error { + return DeletePodWithGracePeriod(namespace, name, nil) +} + // buildEnv builds an environment variable configuration for a Pod based on the given map of key-value pairs. func buildEnv(envMap map[string]string) []v1.EnvVar { envVars := make([]v1.EnvVar, 0, len(envMap)) diff --git a/pkg/knuu/instance.go b/pkg/knuu/instance.go index c061fa0..08afdf2 100644 --- a/pkg/knuu/instance.go +++ b/pkg/knuu/instance.go @@ -110,6 +110,40 @@ func (i *Instance) SetImage(image string) error { return nil } +// SetImageInstant sets the image of the instance without a grace period. +// Instant means that the pod is replaced without a grace period of 1 second. +// It is only allowed in the 'Running' state. +func (i *Instance) SetImageInstant(image string) error { + // Check if setting the image is allowed in the current state + if !i.IsInState(Started) { + return fmt.Errorf("setting image is only allowed in state 'Started'. Current state is '%s'", i.state.String()) + } + + // Generate the pod configuration + podConfig := k8s.PodConfig{ + Namespace: k8s.Namespace(), + Name: i.k8sName, + Labels: i.kubernetesPod.Labels, + Image: image, + Command: i.command, + Args: i.args, + Env: i.env, + Volumes: i.volumes, + MemoryRequest: i.memoryRequest, + MemoryLimit: i.memoryLimit, + CPURequest: i.cpuRequest, + } + // Replace the pod with a new one, using the given image + gracePeriod := int64(1) + _, err := k8s.ReplacePodWithGracePeriod(podConfig, &gracePeriod) + if err != nil { + return fmt.Errorf("error replacing pod: %s", err.Error()) + } + i.WaitInstanceIsRunning() + + return nil +} + // SetCommand sets the command to run in the instance // This function can only be called when the instance is in state 'Preparing' or 'Committed' func (i *Instance) SetCommand(command ...string) error { diff --git a/pkg/knuu/preloader.go b/pkg/knuu/preloader.go new file mode 100644 index 0000000..62d6443 --- /dev/null +++ b/pkg/knuu/preloader.go @@ -0,0 +1,107 @@ +package knuu + +import ( + "fmt" + "github.com/celestiaorg/knuu/pkg/k8s" + v1 "k8s.io/api/core/v1" +) + +// Preloader is a struct that contains the list of preloaded images. +// A preloader makes sure that the images are preloaded before the test suite starts. +// Hint: If you use a Preloader per test suite, you can save resources +type Preloader struct { + k8sName string `json:"k8sName"` + images []string `json:"images"` +} + +// NewPreloader creates a new preloader +func NewPreloader() (*Preloader, error) { + k8sName, err := generateK8sName("knuu-preloader") + if err != nil { + return nil, fmt.Errorf("error generating k8s name for preloader: %w", err) + } + return &Preloader{ + k8sName: k8sName, + images: []string{}, + }, nil +} + +// Images returns the list of preloaded images +func (p *Preloader) Images() []string { + return p.images +} + +// AddImage adds an image to the list of preloaded images +func (p *Preloader) AddImage(image string) error { + // dont add duplicates + for _, v := range p.images { + if v == image { + return nil + } + } + p.images = append(p.images, image) + return p.preloadImages() +} + +// RemoveImage removes an image from the list of preloaded images +func (p *Preloader) RemoveImage(image string) error { + for i, v := range p.images { + if v == image { + p.images = append(p.images[:i], p.images[i+1:]...) + } + } + return p.preloadImages() +} + +// EmptyImages empties the list of preloaded images +func (p *Preloader) EmptyImages() error { + p.images = []string{} + return p.preloadImages() +} + +// preloadImages preloads all images in the list of preloaded images +func (p *Preloader) preloadImages() error { + // delete the daemonset if no images are preloaded + if len(p.images) == 0 { + return k8s.DeleteDaemonSet(k8s.Namespace(), p.k8sName) + } + var initContainers []v1.Container + + for i, image := range p.images { + initContainers = append(initContainers, v1.Container{ + Name: fmt.Sprintf("image%d-preloader", i), + Image: image, + Command: []string{ + "/bin/sh", + "-c", + "exit 0", + }, + }) + } + + var containers []v1.Container + + containers = append(containers, v1.Container{ + Name: "pause-container", + Image: "k8s.gcr.io/pause", + }) + + labels := map[string]string{ + "app": p.k8sName, + } + + exists, err := k8s.DaemonSetExists(k8s.Namespace(), p.k8sName) + if err != nil { + return err + } + + // update the daemonset if it already exists + if exists { + _, err = k8s.UpdateDaemonSet(k8s.Namespace(), p.k8sName, labels, initContainers, containers) + return err + } + + // create the daemonset if it doesn't exist + _, err = k8s.CreateDaemonSet(k8s.Namespace(), p.k8sName, labels, initContainers, containers) + return err +}