Skip to content

Commit

Permalink
Feature/preload images (#80)
Browse files Browse the repository at this point in the history
* feat: add check if knuu was initialized

Signed-off-by: Smuu <18609909+Smuu@users.noreply.github.com>

* feat: add preloader

Signed-off-by: Smuu <18609909+Smuu@users.noreply.github.com>

---------

Signed-off-by: Smuu <18609909+Smuu@users.noreply.github.com>
  • Loading branch information
smuu authored Jun 16, 2023
1 parent 89c9d90 commit 355fd0f
Show file tree
Hide file tree
Showing 5 changed files with 298 additions and 8 deletions.
6 changes: 6 additions & 0 deletions pkg/k8s/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package k8s

import (
"fmt"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"os"
"path/filepath"

Expand Down Expand Up @@ -94,3 +95,8 @@ func getClusterConfig() (*rest.Config, error) {
kubeconfig := filepath.Join(os.Getenv("HOME"), ".kube", "config")
return clientcmd.BuildConfigFromFlags("", kubeconfig)
}

// isNotFound checks if the error is a NotFound error
func isNotFound(err error) bool {
return apierrs.IsNotFound(err)
}
132 changes: 132 additions & 0 deletions pkg/k8s/k8s_daemonset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package k8s

import (
"context"
"fmt"
"github.com/sirupsen/logrus"
appv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"time"
)

// DaemonSetExists checks if a daemonset exists.
func DaemonSetExists(namespace, name string) (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

if !IsInitialized() {
return false, fmt.Errorf("knuu is not initialized")
}
_, err := Clientset().AppsV1().DaemonSets(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
if isNotFound(err) {
return false, nil
}
return false, fmt.Errorf("error getting daemonset %s: %w", name, err)
}
return true, nil
}

// GetDaemonSet retrieves a daemonset.
func GetDaemonSet(namespace, name string) (*appv1.DaemonSet, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

if !IsInitialized() {
return nil, fmt.Errorf("knuu is not initialized")
}
ds, err := Clientset().AppsV1().DaemonSets(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error getting daemonset %s: %w", name, err)
}
return ds, nil
}

// CreateDaemonSet creates a new daemonset.
func CreateDaemonSet(namespace, name string, labels map[string]string, initContainers []v1.Container, containers []v1.Container) (*appv1.DaemonSet, error) {

ds, err := prepareDaemonSet(namespace, name, labels, initContainers, containers)
if err != nil {
return nil, err
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

if !IsInitialized() {
return nil, fmt.Errorf("knuu is not initialized")
}
created, err := Clientset().AppsV1().DaemonSets(namespace).Create(ctx, ds, metav1.CreateOptions{})
if err != nil {
return nil, fmt.Errorf("error creating daemonset %s: %w", name, err)
}
logrus.Debugf("DaemonSet %s created in namespace %s", name, namespace)
return created, nil
}

// UpdateDaemonSet updates an existing daemonset.
func UpdateDaemonSet(namespace, name string, labels map[string]string, initContainers []v1.Container, containers []v1.Container) (*appv1.DaemonSet, error) {

ds, err := prepareDaemonSet(namespace, name, labels, initContainers, containers)
if err != nil {
return nil, err
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

if !IsInitialized() {
return nil, fmt.Errorf("knuu is not initialized")
}
updated, err := Clientset().AppsV1().DaemonSets(namespace).Update(ctx, ds, metav1.UpdateOptions{})
if err != nil {
return nil, fmt.Errorf("error updating daemonset %s: %w", name, err)
}
logrus.Debugf("DaemonSet %s updated in namespace %s", name, namespace)
return updated, nil
}

// DeleteDaemonSet deletes an existing daemonset.
func DeleteDaemonSet(namespace, name string) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

if !IsInitialized() {
return fmt.Errorf("knuu is not initialized")
}
if err := Clientset().AppsV1().DaemonSets(namespace).Delete(ctx, name, metav1.DeleteOptions{}); err != nil {
return fmt.Errorf("error deleting daemonset %s: %w", name, err)
}
logrus.Debugf("DaemonSet %s deleted in namespace %s", name, namespace)
return nil
}

// prepareService constructs a new Service object with the specified parameters.
func prepareDaemonSet(namespace, name string, labels map[string]string, initContainers []v1.Container, containers []v1.Container) (*appv1.DaemonSet, error) {

ds := &appv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: labels,
},
Spec: appv1.DaemonSetSpec{
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: v1.PodSpec{
InitContainers: initContainers,
Containers: containers,
},
},
},
}

return ds, nil

}
27 changes: 19 additions & 8 deletions pkg/k8s/k8s_pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,13 @@ type PodConfig struct {
CPURequest string // CPU request for the container
}

// ReplacePod replaces a pod in the given namespace and returns the new Pod object.
func ReplacePod(podConfig PodConfig) (*v1.Pod, error) {
// ReplacePodWithGracePeriod replaces a pod in the given namespace and returns the new Pod object with a grace period.
func ReplacePodWithGracePeriod(podConfig PodConfig, gracePeriod *int64) (*v1.Pod, error) {
// Log a debug message to indicate that we are replacing a pod
logrus.Debugf("Replacing pod %s", podConfig.Name)

// Delete the existing pod (if any)
if err := DeletePod(podConfig.Namespace, podConfig.Name); err != nil {
if err := DeletePodWithGracePeriod(podConfig.Namespace, podConfig.Name, gracePeriod); err != nil {
return nil, fmt.Errorf("failed to delete pod: %v", err)
}

Expand All @@ -103,6 +103,11 @@ func ReplacePod(podConfig PodConfig) (*v1.Pod, error) {
return pod, nil
}

// ReplacePod replaces a pod in the given namespace and returns the new Pod object.
func ReplacePod(podConfig PodConfig) (*v1.Pod, error) {
return ReplacePodWithGracePeriod(podConfig, nil)
}

// IsPodRunning returns true if the pod is running.
func IsPodRunning(namespace, name string) (bool, error) {
// Get the pod from Kubernetes API server
Expand Down Expand Up @@ -173,9 +178,8 @@ func RunCommandInPod(namespace, podName, containerName string, cmd []string) (st
return stdout.String(), nil
}

// DeletePod deletes a pod with the given name in the specified namespace.
// Skips the deletion if the pod does not exist.
func DeletePod(namespace, name string) error {
// DeletePodWithGracePeriod deletes a pod with the given name in the specified namespace.
func DeletePodWithGracePeriod(namespace, name string, gracePeriodSeconds *int64) error {
// Get the Pod object from the API server
_, err := getPod(namespace, name)
if err != nil {
Expand All @@ -190,14 +194,21 @@ func DeletePod(namespace, name string) error {
if !IsInitialized() {
return fmt.Errorf("knuu is not initialized")
}
if err := Clientset().CoreV1().Pods(namespace).Delete(ctx, name, metav1.DeleteOptions{}); err != nil {
deleteOptions := metav1.DeleteOptions{
GracePeriodSeconds: gracePeriodSeconds,
}
if err := Clientset().CoreV1().Pods(namespace).Delete(ctx, name, deleteOptions); err != nil {
return fmt.Errorf("failed to delete pod %s: %v", name, err)
}

logrus.Debugf("Pod %s deleted in namespace %s", name, namespace)
return nil
}

// DeletePod deletes a pod with the given name in the specified namespace.
func DeletePod(namespace, name string) error {
return DeletePodWithGracePeriod(namespace, name, nil)
}

// buildEnv builds an environment variable configuration for a Pod based on the given map of key-value pairs.
func buildEnv(envMap map[string]string) []v1.EnvVar {
envVars := make([]v1.EnvVar, 0, len(envMap))
Expand Down
34 changes: 34 additions & 0 deletions pkg/knuu/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,40 @@ func (i *Instance) SetImage(image string) error {
return nil
}

// SetImageInstant sets the image of the instance without a grace period.
// Instant means that the pod is replaced without a grace period of 1 second.
// It is only allowed in the 'Running' state.
func (i *Instance) SetImageInstant(image string) error {
// Check if setting the image is allowed in the current state
if !i.IsInState(Started) {
return fmt.Errorf("setting image is only allowed in state 'Started'. Current state is '%s'", i.state.String())
}

// Generate the pod configuration
podConfig := k8s.PodConfig{
Namespace: k8s.Namespace(),
Name: i.k8sName,
Labels: i.kubernetesPod.Labels,
Image: image,
Command: i.command,
Args: i.args,
Env: i.env,
Volumes: i.volumes,
MemoryRequest: i.memoryRequest,
MemoryLimit: i.memoryLimit,
CPURequest: i.cpuRequest,
}
// Replace the pod with a new one, using the given image
gracePeriod := int64(1)
_, err := k8s.ReplacePodWithGracePeriod(podConfig, &gracePeriod)
if err != nil {
return fmt.Errorf("error replacing pod: %s", err.Error())
}
i.WaitInstanceIsRunning()

return nil
}

// SetCommand sets the command to run in the instance
// This function can only be called when the instance is in state 'Preparing' or 'Committed'
func (i *Instance) SetCommand(command ...string) error {
Expand Down
107 changes: 107 additions & 0 deletions pkg/knuu/preloader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package knuu

import (
"fmt"
"github.com/celestiaorg/knuu/pkg/k8s"
v1 "k8s.io/api/core/v1"
)

// Preloader is a struct that contains the list of preloaded images.
// A preloader makes sure that the images are preloaded before the test suite starts.
// Hint: If you use a Preloader per test suite, you can save resources
type Preloader struct {
k8sName string `json:"k8sName"`
images []string `json:"images"`
}

// NewPreloader creates a new preloader
func NewPreloader() (*Preloader, error) {
k8sName, err := generateK8sName("knuu-preloader")
if err != nil {
return nil, fmt.Errorf("error generating k8s name for preloader: %w", err)
}
return &Preloader{
k8sName: k8sName,
images: []string{},
}, nil
}

// Images returns the list of preloaded images
func (p *Preloader) Images() []string {
return p.images
}

// AddImage adds an image to the list of preloaded images
func (p *Preloader) AddImage(image string) error {
// dont add duplicates
for _, v := range p.images {
if v == image {
return nil
}
}
p.images = append(p.images, image)
return p.preloadImages()
}

// RemoveImage removes an image from the list of preloaded images
func (p *Preloader) RemoveImage(image string) error {
for i, v := range p.images {
if v == image {
p.images = append(p.images[:i], p.images[i+1:]...)
}
}
return p.preloadImages()
}

// EmptyImages empties the list of preloaded images
func (p *Preloader) EmptyImages() error {
p.images = []string{}
return p.preloadImages()
}

// preloadImages preloads all images in the list of preloaded images
func (p *Preloader) preloadImages() error {
// delete the daemonset if no images are preloaded
if len(p.images) == 0 {
return k8s.DeleteDaemonSet(k8s.Namespace(), p.k8sName)
}
var initContainers []v1.Container

for i, image := range p.images {
initContainers = append(initContainers, v1.Container{
Name: fmt.Sprintf("image%d-preloader", i),
Image: image,
Command: []string{
"/bin/sh",
"-c",
"exit 0",
},
})
}

var containers []v1.Container

containers = append(containers, v1.Container{
Name: "pause-container",
Image: "k8s.gcr.io/pause",
})

labels := map[string]string{
"app": p.k8sName,
}

exists, err := k8s.DaemonSetExists(k8s.Namespace(), p.k8sName)
if err != nil {
return err
}

// update the daemonset if it already exists
if exists {
_, err = k8s.UpdateDaemonSet(k8s.Namespace(), p.k8sName, labels, initContainers, containers)
return err
}

// create the daemonset if it doesn't exist
_, err = k8s.CreateDaemonSet(k8s.Namespace(), p.k8sName, labels, initContainers, containers)
return err
}

0 comments on commit 355fd0f

Please sign in to comment.