Skip to content

Commit

Permalink
fix: can't build images at the same time when creating multiple deplo…
Browse files Browse the repository at this point in the history
…yments
  • Loading branch information
yetone committed Dec 22, 2022
1 parent e2feda6 commit 63839c4
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 116 deletions.
160 changes: 59 additions & 101 deletions controllers/bentodeployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"

Expand Down Expand Up @@ -201,6 +200,65 @@ func (r *BentoDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}
r.Recorder.Event(bentoDeployment, corev1.EventTypeNormal, "GetYataiVersion", "Successfully fetched yatai version")

bentoTag := fmt.Sprintf("%s:%s", bento.Repository.Name, bento.Version)
imageName := GetBentoImageName(dockerRegistry, &bento.BentoWithRepositorySchema, true)
r.Recorder.Eventf(bentoDeployment, corev1.EventTypeNormal, "CheckImageExists", "Checking image %s exists", imageName)
imageExists, err := checkImageExists(dockerRegistry, imageName)
if err != nil {
r.Recorder.Eventf(bentoDeployment, corev1.EventTypeWarning, "CheckImageExists", "Failed to check image %s exists: %v", imageName, err)
err = errors.Wrapf(err, "failed to check image %s exists for bento %s", imageName, bentoTag)
return
}
if !imageExists {
r.Recorder.Eventf(bentoDeployment, corev1.EventTypeNormal, "CheckImageExists", "Image %s does not exist", imageName)
r.Recorder.Eventf(bentoDeployment, corev1.EventTypeNormal, "BentoImageBuilder", "Bento image builder is starting")
var pod *corev1.Pod
pod, err = services.ImageBuilderService.CreateImageBuilderPod(ctx, services.CreateImageBuilderPodOption{
ImageName: imageName,
Bento: &bento.BentoWithRepositorySchema,
YataiClient: yataiClient,
DockerRegistry: dockerRegistry,
RecreateIfFailed: true,
})
if err != nil {
r.Recorder.Eventf(bentoDeployment, corev1.EventTypeWarning, "BentoImageBuilder", "Failed to create image builder pod: %v", err)
err = errors.Wrapf(err, "failed to create image builder pod for bento %s", bentoTag)
return
}
if pod.Status.Phase == corev1.PodSucceeded {
result = ctrl.Result{
Requeue: true,
}
return
}
if pod.Status.Phase == corev1.PodFailed {
err = errors.Errorf("image builder pod %s in namespace %s failed: %s", pod.Name, pod.Namespace, pod.Status.Message)
return
}
if pod.Status.Phase == corev1.PodUnknown {
err = errors.Errorf("pod %s in namespace %s is in unknown state: %s", pod.Name, pod.Namespace, pod.Status.Message)
return
}
if pod.Status.Phase == corev1.PodRunning {
r.Recorder.Eventf(bentoDeployment, corev1.EventTypeNormal, "BentoImageBuilder", "Bento image builder pod %s/%s is running", pod.Namespace, pod.Name)
result = ctrl.Result{
RequeueAfter: 10 * time.Second,
}
return
}
if pod.Status.Phase == corev1.PodPending {
r.Recorder.Eventf(bentoDeployment, corev1.EventTypeNormal, "BentoImageBuilder", "Bento image builder pod %s/%s is pending", pod.Namespace, pod.Name)
result = ctrl.Result{
RequeueAfter: 10 * time.Second,
}
return
}
err = errors.Errorf("pod %s in namespace %s is in unknown state: %s", pod.Name, pod.Namespace, pod.Status.Message)
return
}

r.Recorder.Eventf(bentoDeployment, corev1.EventTypeNormal, "CheckImageExists", "Image %s exists", imageName)

modified := false

if bento.Manifest != nil {
Expand Down Expand Up @@ -1657,56 +1715,6 @@ func GetModelImageName(dockerRegistry modelschemas.DockerRegistrySchema, model *
return imageName
}

// wait image builder pod complete
func (r *BentoDeploymentReconciler) waitImageBuilderPodComplete(ctx context.Context, namespace, podName string) (modelschemas.ImageBuildStatus, error) {
logs := log.Log.WithValues("func", "waitImageBuilderPodComplete", "namespace", namespace, "pod", podName)

// Interval to poll for objects.
pollInterval := 3 * time.Second
// How long to wait for objects.
waitTimeout := 60 * time.Minute

imageBuildStatus := modelschemas.ImageBuildStatusPending

restConf := config.GetConfigOrDie()
cliset, err := kubernetes.NewForConfig(restConf)
if err != nil {
err = errors.Wrapf(err, "create kubernetes client for %s", restConf.Host)
return imageBuildStatus, err
}

podCli := cliset.CoreV1().Pods(namespace)

// Wait for the image builder pod to be Complete.
if err := wait.PollImmediate(pollInterval, waitTimeout, func() (done bool, err error) {
pod, err_ := podCli.Get(ctx, podName, metav1.GetOptions{})
if err_ != nil {
logs.Error(err_, "failed to get pod")
return true, err_
}
if pod.Status.Phase == corev1.PodSucceeded {
imageBuildStatus = modelschemas.ImageBuildStatusSuccess
return true, nil
}
if pod.Status.Phase == corev1.PodFailed {
imageBuildStatus = modelschemas.ImageBuildStatusFailed
return true, errors.Errorf("pod %s in namespace %s failed", pod.Name, pod.Namespace)
}
if pod.Status.Phase == corev1.PodUnknown {
imageBuildStatus = modelschemas.ImageBuildStatusFailed
return true, errors.Errorf("pod %s in namespace %s is in unknown state", pod.Name, pod.Namespace)
}
if pod.Status.Phase == corev1.PodRunning {
imageBuildStatus = modelschemas.ImageBuildStatusBuilding
}
return false, nil
}); err != nil {
err = errors.Wrapf(err, "failed to wait for pod %s in namespace %s to be ready", podName, namespace)
return imageBuildStatus, err
}
return imageBuildStatus, nil
}

type generatePodTemplateSpecOption struct {
bentoDeployment *servingv1alpha3.BentoDeployment
bento *schemasv1.BentoFullSchema
Expand Down Expand Up @@ -1846,56 +1854,6 @@ func (r *BentoDeploymentReconciler) generatePodTemplateSpec(ctx context.Context,
volumes := make([]corev1.Volume, 0)
volumeMounts := make([]corev1.VolumeMount, 0)

// prepare images
var eg errsgroup.Group
eg.Go(func() error {
bentoTag := fmt.Sprintf("%s:%s", opt.bento.Repository.Name, opt.bento.Version)
imageName := GetBentoImageName(opt.dockerRegistry, &opt.bento.BentoWithRepositorySchema, true)
r.Recorder.Eventf(opt.bentoDeployment, corev1.EventTypeNormal, "CheckImageExists", "Checking image %s exists", imageName)
imageExists, err := checkImageExists(opt.dockerRegistry, imageName)
if err != nil {
r.Recorder.Eventf(opt.bentoDeployment, corev1.EventTypeWarning, "CheckImageExists", "Failed to check image %s exists: %v", imageName, err)
err = errors.Wrapf(err, "failed to check image %s exists for bento %s", imageName, bentoTag)
return err
}
if imageExists {
r.Recorder.Eventf(opt.bentoDeployment, corev1.EventTypeNormal, "CheckImageExists", "Image %s exists", imageName)
return nil
}
r.Recorder.Eventf(opt.bentoDeployment, corev1.EventTypeNormal, "CheckImageExists", "Image %s does not exist", imageName)
r.Recorder.Eventf(opt.bentoDeployment, corev1.EventTypeNormal, "BentoImageBuilder", "Bento image builder is starting")
pod, err := services.ImageBuilderService.CreateImageBuilderPod(ctx, services.CreateImageBuilderPodOption{
ImageName: imageName,
Bento: &opt.bento.BentoWithRepositorySchema,
YataiClient: opt.yataiClient,
DockerRegistry: opt.dockerRegistry,
RecreateIfFailed: true,
})
if err != nil {
r.Recorder.Eventf(opt.bentoDeployment, corev1.EventTypeWarning, "BentoImageBuilder", "Failed to create image builder pod: %v", err)
err = errors.Wrapf(err, "failed to create image builder pod for bento %s", bentoTag)
return err
}
r.Recorder.Eventf(opt.bentoDeployment, corev1.EventTypeNormal, "BentoImageBuilder", "Building image %s..., the image builder pod is %s in namespace %s", imageName, pod.Name, pod.Namespace)

_, err = r.waitImageBuilderPodComplete(ctx, pod.Namespace, pod.Name)

if err != nil {
r.Recorder.Eventf(opt.bentoDeployment, corev1.EventTypeWarning, "BentoImageBuilder", "Failed to build image %s, the image builder pod is %s in namespace %s has an error: %s", imageName, pod.Name, pod.Namespace, err.Error())
err = errors.Wrapf(err, "failed to build image %s for bento %s", imageName, bentoTag)
return err
}

r.Recorder.Eventf(opt.bentoDeployment, corev1.EventTypeNormal, "BentoImageBuilder", "Image %s has been built successfully", imageName)

return nil
})

err = eg.Wait()
if err != nil {
return
}

args := make([]string, 0)

isOldVersion := false
Expand Down
41 changes: 26 additions & 15 deletions services/image_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,6 @@ type CreateImageBuilderPodOption struct {
}

func (s *imageBuilderService) CreateImageBuilderPod(ctx context.Context, opt CreateImageBuilderPodOption) (pod *corev1.Pod, err error) {
kubeName := strcase.ToKebab(fmt.Sprintf("yatai-bento-image-builder-%s-%s", opt.Bento.Repository.Name, opt.Bento.Version))
kubeLabels := map[string]string{
consts.KubeLabelYataiBentoRepository: opt.Bento.Repository.Name,
consts.KubeLabelYataiBento: opt.Bento.Version,
}
logrus.Infof("Creating image builder pod %s", kubeName)
restConfig := config.GetConfigOrDie()
kubeCli, err := kubernetes.NewForConfig(restConfig)
if err != nil {
Expand All @@ -186,6 +180,25 @@ func (s *imageBuilderService) CreateImageBuilderPod(ctx context.Context, opt Cre
}

kubeNamespace := consts.KubeNamespaceYataiModelImageBuilder
kubeName := strcase.ToKebab(fmt.Sprintf("yatai-bento-image-builder-%s-%s", opt.Bento.Repository.Name, opt.Bento.Version))

podsCli := kubeCli.CoreV1().Pods(kubeNamespace)

oldPod, err := podsCli.Get(ctx, kubeName, metav1.GetOptions{})
oldPodIsNotFound := apierrors.IsNotFound(err)
if !oldPodIsNotFound && err != nil {
return
}
if !oldPodIsNotFound {
pod = oldPod
return
}

kubeLabels := map[string]string{
consts.KubeLabelYataiBentoRepository: opt.Bento.Repository.Name,
consts.KubeLabelYataiBento: opt.Bento.Version,
}
logrus.Infof("Creating image builder pod %s", kubeName)

err = k8sutils.MakesureNamespaceExists(ctx, kubeCli, kubeNamespace)
if err != nil {
Expand Down Expand Up @@ -256,13 +269,13 @@ func (s *imageBuilderService) CreateImageBuilderPod(ctx context.Context, opt Cre
}

_, err = kubeCli.CoreV1().Secrets(kubeNamespace).Get(ctx, secretName, metav1.GetOptions{})
isNotFound := apierrors.IsNotFound(err)
if err != nil && !isNotFound {
secretIsNotFound := apierrors.IsNotFound(err)
if err != nil && !secretIsNotFound {
err = errors.Wrapf(err, "failed to get secret %s", secretName)
return
}

if isNotFound {
if secretIsNotFound {
_, err = kubeCli.CoreV1().Secrets(kubeNamespace).Create(ctx, yataiSecret, metav1.CreateOptions{})
isExists := apierrors.IsAlreadyExists(err)
if err != nil && !isExists {
Expand Down Expand Up @@ -548,8 +561,6 @@ echo "Done"

builderImage := internalImages.Kaniko

podsCli := kubeCli.CoreV1().Pods(kubeNamespace)

pod = &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: kubeName,
Expand Down Expand Up @@ -608,12 +619,12 @@ echo "Done"
}
}

oldPod, err := podsCli.Get(ctx, kubeName, metav1.GetOptions{})
isNotFound = apierrors.IsNotFound(err)
if !isNotFound && err != nil {
oldPod, err = podsCli.Get(ctx, kubeName, metav1.GetOptions{})
oldPodIsNotFound = apierrors.IsNotFound(err)
if !oldPodIsNotFound && err != nil {
return
}
if isNotFound {
if oldPodIsNotFound {
_, err = podsCli.Create(ctx, pod, metav1.CreateOptions{})
isExists := apierrors.IsAlreadyExists(err)
if err != nil && !isExists {
Expand Down

0 comments on commit 63839c4

Please sign in to comment.