Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: can't build images at the same time when creating multiple deployments #78

Merged
merged 1 commit into from
Dec 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 59 additions & 101 deletions controllers/bentodeployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"

Expand Down Expand Up @@ -201,6 +200,65 @@ func (r *BentoDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}
r.Recorder.Event(bentoDeployment, corev1.EventTypeNormal, "GetYataiVersion", "Successfully fetched yatai version")

bentoTag := fmt.Sprintf("%s:%s", bento.Repository.Name, bento.Version)
imageName := GetBentoImageName(dockerRegistry, &bento.BentoWithRepositorySchema, true)
r.Recorder.Eventf(bentoDeployment, corev1.EventTypeNormal, "CheckImageExists", "Checking image %s exists", imageName)
imageExists, err := checkImageExists(dockerRegistry, imageName)
if err != nil {
r.Recorder.Eventf(bentoDeployment, corev1.EventTypeWarning, "CheckImageExists", "Failed to check image %s exists: %v", imageName, err)
err = errors.Wrapf(err, "failed to check image %s exists for bento %s", imageName, bentoTag)
return
}
if !imageExists {
r.Recorder.Eventf(bentoDeployment, corev1.EventTypeNormal, "CheckImageExists", "Image %s does not exist", imageName)
r.Recorder.Eventf(bentoDeployment, corev1.EventTypeNormal, "BentoImageBuilder", "Bento image builder is starting")
var pod *corev1.Pod
pod, err = services.ImageBuilderService.CreateImageBuilderPod(ctx, services.CreateImageBuilderPodOption{
ImageName: imageName,
Bento: &bento.BentoWithRepositorySchema,
YataiClient: yataiClient,
DockerRegistry: dockerRegistry,
RecreateIfFailed: true,
})
if err != nil {
r.Recorder.Eventf(bentoDeployment, corev1.EventTypeWarning, "BentoImageBuilder", "Failed to create image builder pod: %v", err)
err = errors.Wrapf(err, "failed to create image builder pod for bento %s", bentoTag)
return
}
if pod.Status.Phase == corev1.PodSucceeded {
result = ctrl.Result{
Requeue: true,
}
return
}
if pod.Status.Phase == corev1.PodFailed {
err = errors.Errorf("image builder pod %s in namespace %s failed: %s", pod.Name, pod.Namespace, pod.Status.Message)
return
}
if pod.Status.Phase == corev1.PodUnknown {
err = errors.Errorf("pod %s in namespace %s is in unknown state: %s", pod.Name, pod.Namespace, pod.Status.Message)
return
}
if pod.Status.Phase == corev1.PodRunning {
r.Recorder.Eventf(bentoDeployment, corev1.EventTypeNormal, "BentoImageBuilder", "Bento image builder pod %s/%s is running", pod.Namespace, pod.Name)
result = ctrl.Result{
RequeueAfter: 10 * time.Second,
}
return
}
if pod.Status.Phase == corev1.PodPending {
r.Recorder.Eventf(bentoDeployment, corev1.EventTypeNormal, "BentoImageBuilder", "Bento image builder pod %s/%s is pending", pod.Namespace, pod.Name)
result = ctrl.Result{
RequeueAfter: 10 * time.Second,
}
return
}
err = errors.Errorf("pod %s in namespace %s is in unknown phase: %s", pod.Name, pod.Namespace, pod.Status.Phase)
return
}

r.Recorder.Eventf(bentoDeployment, corev1.EventTypeNormal, "CheckImageExists", "Image %s exists", imageName)

modified := false

if bento.Manifest != nil {
Expand Down Expand Up @@ -1657,56 +1715,6 @@ func GetModelImageName(dockerRegistry modelschemas.DockerRegistrySchema, model *
return imageName
}

// wait image builder pod complete
func (r *BentoDeploymentReconciler) waitImageBuilderPodComplete(ctx context.Context, namespace, podName string) (modelschemas.ImageBuildStatus, error) {
logs := log.Log.WithValues("func", "waitImageBuilderPodComplete", "namespace", namespace, "pod", podName)

// Interval to poll for objects.
pollInterval := 3 * time.Second
// How long to wait for objects.
waitTimeout := 60 * time.Minute

imageBuildStatus := modelschemas.ImageBuildStatusPending

restConf := config.GetConfigOrDie()
cliset, err := kubernetes.NewForConfig(restConf)
if err != nil {
err = errors.Wrapf(err, "create kubernetes client for %s", restConf.Host)
return imageBuildStatus, err
}

podCli := cliset.CoreV1().Pods(namespace)

// Wait for the image builder pod to be Complete.
if err := wait.PollImmediate(pollInterval, waitTimeout, func() (done bool, err error) {
pod, err_ := podCli.Get(ctx, podName, metav1.GetOptions{})
if err_ != nil {
logs.Error(err_, "failed to get pod")
return true, err_
}
if pod.Status.Phase == corev1.PodSucceeded {
imageBuildStatus = modelschemas.ImageBuildStatusSuccess
return true, nil
}
if pod.Status.Phase == corev1.PodFailed {
imageBuildStatus = modelschemas.ImageBuildStatusFailed
return true, errors.Errorf("pod %s in namespace %s failed", pod.Name, pod.Namespace)
}
if pod.Status.Phase == corev1.PodUnknown {
imageBuildStatus = modelschemas.ImageBuildStatusFailed
return true, errors.Errorf("pod %s in namespace %s is in unknown state", pod.Name, pod.Namespace)
}
if pod.Status.Phase == corev1.PodRunning {
imageBuildStatus = modelschemas.ImageBuildStatusBuilding
}
return false, nil
}); err != nil {
err = errors.Wrapf(err, "failed to wait for pod %s in namespace %s to be ready", podName, namespace)
return imageBuildStatus, err
}
return imageBuildStatus, nil
}

type generatePodTemplateSpecOption struct {
bentoDeployment *servingv1alpha3.BentoDeployment
bento *schemasv1.BentoFullSchema
Expand Down Expand Up @@ -1846,56 +1854,6 @@ func (r *BentoDeploymentReconciler) generatePodTemplateSpec(ctx context.Context,
volumes := make([]corev1.Volume, 0)
volumeMounts := make([]corev1.VolumeMount, 0)

// prepare images
var eg errsgroup.Group
eg.Go(func() error {
bentoTag := fmt.Sprintf("%s:%s", opt.bento.Repository.Name, opt.bento.Version)
imageName := GetBentoImageName(opt.dockerRegistry, &opt.bento.BentoWithRepositorySchema, true)
r.Recorder.Eventf(opt.bentoDeployment, corev1.EventTypeNormal, "CheckImageExists", "Checking image %s exists", imageName)
imageExists, err := checkImageExists(opt.dockerRegistry, imageName)
if err != nil {
r.Recorder.Eventf(opt.bentoDeployment, corev1.EventTypeWarning, "CheckImageExists", "Failed to check image %s exists: %v", imageName, err)
err = errors.Wrapf(err, "failed to check image %s exists for bento %s", imageName, bentoTag)
return err
}
if imageExists {
r.Recorder.Eventf(opt.bentoDeployment, corev1.EventTypeNormal, "CheckImageExists", "Image %s exists", imageName)
return nil
}
r.Recorder.Eventf(opt.bentoDeployment, corev1.EventTypeNormal, "CheckImageExists", "Image %s does not exist", imageName)
r.Recorder.Eventf(opt.bentoDeployment, corev1.EventTypeNormal, "BentoImageBuilder", "Bento image builder is starting")
pod, err := services.ImageBuilderService.CreateImageBuilderPod(ctx, services.CreateImageBuilderPodOption{
ImageName: imageName,
Bento: &opt.bento.BentoWithRepositorySchema,
YataiClient: opt.yataiClient,
DockerRegistry: opt.dockerRegistry,
RecreateIfFailed: true,
})
if err != nil {
r.Recorder.Eventf(opt.bentoDeployment, corev1.EventTypeWarning, "BentoImageBuilder", "Failed to create image builder pod: %v", err)
err = errors.Wrapf(err, "failed to create image builder pod for bento %s", bentoTag)
return err
}
r.Recorder.Eventf(opt.bentoDeployment, corev1.EventTypeNormal, "BentoImageBuilder", "Building image %s..., the image builder pod is %s in namespace %s", imageName, pod.Name, pod.Namespace)

_, err = r.waitImageBuilderPodComplete(ctx, pod.Namespace, pod.Name)

if err != nil {
r.Recorder.Eventf(opt.bentoDeployment, corev1.EventTypeWarning, "BentoImageBuilder", "Failed to build image %s, the image builder pod is %s in namespace %s has an error: %s", imageName, pod.Name, pod.Namespace, err.Error())
err = errors.Wrapf(err, "failed to build image %s for bento %s", imageName, bentoTag)
return err
}

r.Recorder.Eventf(opt.bentoDeployment, corev1.EventTypeNormal, "BentoImageBuilder", "Image %s has been built successfully", imageName)

return nil
})

err = eg.Wait()
if err != nil {
return
}

args := make([]string, 0)

isOldVersion := false
Expand Down
43 changes: 27 additions & 16 deletions services/image_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,6 @@ type CreateImageBuilderPodOption struct {
}

func (s *imageBuilderService) CreateImageBuilderPod(ctx context.Context, opt CreateImageBuilderPodOption) (pod *corev1.Pod, err error) {
kubeName := strcase.ToKebab(fmt.Sprintf("yatai-bento-image-builder-%s-%s", opt.Bento.Repository.Name, opt.Bento.Version))
kubeLabels := map[string]string{
consts.KubeLabelYataiBentoRepository: opt.Bento.Repository.Name,
consts.KubeLabelYataiBento: opt.Bento.Version,
}
logrus.Infof("Creating image builder pod %s", kubeName)
restConfig := config.GetConfigOrDie()
kubeCli, err := kubernetes.NewForConfig(restConfig)
if err != nil {
Expand All @@ -186,6 +180,25 @@ func (s *imageBuilderService) CreateImageBuilderPod(ctx context.Context, opt Cre
}

kubeNamespace := consts.KubeNamespaceYataiModelImageBuilder
kubeName := strcase.ToKebab(fmt.Sprintf("yatai-bento-image-builder-%s-%s", opt.Bento.Repository.Name, opt.Bento.Version))

podsCli := kubeCli.CoreV1().Pods(kubeNamespace)

oldPod, err := podsCli.Get(ctx, kubeName, metav1.GetOptions{})
oldPodIsNotFound := apierrors.IsNotFound(err)
if !oldPodIsNotFound && err != nil {
return
}
if !oldPodIsNotFound {
pod = oldPod
return
}

kubeLabels := map[string]string{
consts.KubeLabelYataiBentoRepository: opt.Bento.Repository.Name,
consts.KubeLabelYataiBento: opt.Bento.Version,
}
logrus.Infof("Creating image builder pod %s", kubeName)

err = k8sutils.MakesureNamespaceExists(ctx, kubeCli, kubeNamespace)
if err != nil {
Expand Down Expand Up @@ -256,13 +269,13 @@ func (s *imageBuilderService) CreateImageBuilderPod(ctx context.Context, opt Cre
}

_, err = kubeCli.CoreV1().Secrets(kubeNamespace).Get(ctx, secretName, metav1.GetOptions{})
isNotFound := apierrors.IsNotFound(err)
if err != nil && !isNotFound {
secretIsNotFound := apierrors.IsNotFound(err)
if err != nil && !secretIsNotFound {
err = errors.Wrapf(err, "failed to get secret %s", secretName)
return
}

if isNotFound {
if secretIsNotFound {
_, err = kubeCli.CoreV1().Secrets(kubeNamespace).Create(ctx, yataiSecret, metav1.CreateOptions{})
isExists := apierrors.IsAlreadyExists(err)
if err != nil && !isExists {
Expand Down Expand Up @@ -548,8 +561,6 @@ echo "Done"

builderImage := internalImages.Kaniko

podsCli := kubeCli.CoreV1().Pods(kubeNamespace)

pod = &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: kubeName,
Expand Down Expand Up @@ -608,13 +619,13 @@ echo "Done"
}
}

oldPod, err := podsCli.Get(ctx, kubeName, metav1.GetOptions{})
isNotFound = apierrors.IsNotFound(err)
if !isNotFound && err != nil {
oldPod, err = podsCli.Get(ctx, kubeName, metav1.GetOptions{})
oldPodIsNotFound = apierrors.IsNotFound(err)
if !oldPodIsNotFound && err != nil {
return
}
if isNotFound {
_, err = podsCli.Create(ctx, pod, metav1.CreateOptions{})
if oldPodIsNotFound {
pod, err = podsCli.Create(ctx, pod, metav1.CreateOptions{})
isExists := apierrors.IsAlreadyExists(err)
if err != nil && !isExists {
err = errors.Wrapf(err, "failed to create pod %s", kubeName)
Expand Down