Skip to content

Commit

Permalink
Reflect the VM statuses to the KubeVirtCluster status
Browse files Browse the repository at this point in the history
If not all the cluster VMs are ready, CAPK operator now sets the new
`AllMachinesAreReady` condition in the KubeVirtCluster CR to `False`.

Signed-off-by: Nahshon Unna-Tsameret <nunnatsa@redhat.com>
  • Loading branch information
nunnatsa committed Dec 17, 2023
1 parent 35cc2f1 commit 82bfd09
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 23 deletions.
2 changes: 1 addition & 1 deletion config/default/manager_image_patch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ spec:
spec:
containers:
# Change the value of image field below to your controller image URL
- image: controller
- image: localhost:40797/capk-manager-amd64:dev
name: manager
24 changes: 14 additions & 10 deletions controllers/kubevirtmachine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,13 @@ import (
"regexp"
"time"

"sigs.k8s.io/controller-runtime/pkg/builder"

"github.com/pkg/errors"
"gopkg.in/yaml.v3"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
infrav1 "sigs.k8s.io/cluster-api-provider-kubevirt/api/v1alpha1"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/context"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/infracluster"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/kubevirt"
kubevirthandler "sigs.k8s.io/cluster-api-provider-kubevirt/pkg/kubevirt"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/ssh"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/workloadcluster"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
capierrors "sigs.k8s.io/cluster-api/errors"
"sigs.k8s.io/cluster-api/util"
Expand All @@ -46,10 +37,19 @@ import (
"sigs.k8s.io/cluster-api/util/patch"
"sigs.k8s.io/cluster-api/util/predicates"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"

infrav1 "sigs.k8s.io/cluster-api-provider-kubevirt/api/v1alpha1"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/context"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/infracluster"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/kubevirt"
kubevirthandler "sigs.k8s.io/cluster-api-provider-kubevirt/pkg/kubevirt"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/ssh"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/workloadcluster"
)

// KubevirtMachineReconciler reconciles a KubevirtMachine object.
Expand All @@ -66,6 +66,7 @@ type KubevirtMachineReconciler struct {
// +kubebuilder:rbac:groups="",resources=secrets;,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=kubevirt.io,resources=virtualmachines;,verbs=get;create;update;patch;delete
// +kubebuilder:rbac:groups=kubevirt.io,resources=virtualmachineinstances;,verbs=get;delete
// +kubebuilder:rbac:groups=cdi.kubevirt.io,resources=datavolumes;,verbs=get;list,watch

Check failure on line 69 in controllers/kubevirtmachine_controller.go

View workflow job for this annotation

GitHub Actions / check-gen

expected equals, got "" (at <input>:1:67)

Check failure on line 69 in controllers/kubevirtmachine_controller.go

View workflow job for this annotation

GitHub Actions / check-gen

expected equals, got "" (at <input>:1:67)

Check failure on line 69 in controllers/kubevirtmachine_controller.go

View workflow job for this annotation

GitHub Actions / check-gen

expected equals, got "" (at <input>:1:67)

Check failure on line 69 in controllers/kubevirtmachine_controller.go

View workflow job for this annotation

GitHub Actions / check-gen

expected equals, got "" (at <input>:1:67)

// Reconcile handles KubevirtMachine events.
func (r *KubevirtMachineReconciler) Reconcile(goctx gocontext.Context, req ctrl.Request) (_ ctrl.Result, rerr error) {
Expand Down Expand Up @@ -277,6 +278,9 @@ func (r *KubevirtMachineReconciler) reconcileNormal(ctx *context.MachineContext)
// Mark VMProvisionedCondition to indicate that the VM has successfully started
conditions.MarkTrue(ctx.KubevirtMachine, infrav1.VMProvisionedCondition)
} else {
reason, message := externalMachine.GetVMUnscheduledReason(ctx)
conditions.MarkFalse(ctx.KubevirtMachine, infrav1.VMProvisionedCondition, reason, clusterv1.ConditionSeverityInfo, message)

// Waiting for VM to boot
ctx.KubevirtMachine.Status.Ready = false
ctx.Logger.Info("KubeVirt VM is not fully provisioned and running...")
Expand Down Expand Up @@ -476,7 +480,7 @@ func (r *KubevirtMachineReconciler) reconcileDelete(ctx *context.MachineContext)

// SetupWithManager will add watches for this controller.
func (r *KubevirtMachineReconciler) SetupWithManager(goctx gocontext.Context, mgr ctrl.Manager, options controller.Options) error {
clusterToKubevirtMachines, err := util.ClusterToObjectsMapper(mgr.GetClient(), &infrav1.KubevirtMachineList{}, mgr.GetScheme())
clusterToKubevirtMachines, err := util.ClusterToTypedObjectsMapper(mgr.GetClient(), &infrav1.KubevirtMachineList{}, mgr.GetScheme())
if err != nil {
return err
}
Expand Down
38 changes: 28 additions & 10 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ import (
"flag"
"math/rand"
"os"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/webhookhandler"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/webhook"
"time"

"github.com/spf13/pflag"
Expand All @@ -35,23 +31,27 @@ import (
"k8s.io/klog/v2"
"k8s.io/klog/v2/klogr"
kubevirtv1 "kubevirt.io/api/core/v1"
cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/feature"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
k8sclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/webhook"

infrav1 "sigs.k8s.io/cluster-api-provider-kubevirt/api/v1alpha1"
"sigs.k8s.io/cluster-api-provider-kubevirt/controllers"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/infracluster"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/kubevirt"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/webhookhandler"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/workloadcluster"
// +kubebuilder:scaffold:imports
)

var (
myscheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")

//flags.
Expand All @@ -67,12 +67,24 @@ var (

func init() {
klog.InitFlags(nil)
}

_ = scheme.AddToScheme(myscheme)
_ = infrav1.AddToScheme(myscheme)
_ = clusterv1.AddToScheme(myscheme)
_ = kubevirtv1.AddToScheme(myscheme)
// +kubebuilder:scaffold:scheme
func registerScheme() (*runtime.Scheme, error) {
myscheme := runtime.NewScheme()

for _, f := range []func(*runtime.Scheme) error{
scheme.AddToScheme,
infrav1.AddToScheme,
clusterv1.AddToScheme,
kubevirtv1.AddToScheme,
cdiv1.AddToScheme,
// +kubebuilder:scaffold:scheme
} {
if err := f(myscheme); err != nil {
return nil, err
}
}
return myscheme, nil
}

func initFlags(fs *pflag.FlagSet) {
Expand Down Expand Up @@ -106,6 +118,12 @@ func main() {

ctrl.SetLogger(klogr.New())

myscheme, err := registerScheme()
if err != nil {
setupLog.Error(err, "can't register scheme")
os.Exit(1)
}

var defaultNamespaces map[string]cache.Config
if watchNamespace != "" {
setupLog.Info("Watching cluster-api objects only in namespace for reconciliation", "namespace", watchNamespace)
Expand Down
93 changes: 91 additions & 2 deletions pkg/kubevirt/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,25 @@ package kubevirt
import (
gocontext "context"
"fmt"
"time"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
kubedrain "k8s.io/kubectl/pkg/drain"
kubevirtv1 "kubevirt.io/api/core/v1"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/workloadcluster"
cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/controllers/noderefutil"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"time"

infrav1 "sigs.k8s.io/cluster-api-provider-kubevirt/api/v1alpha1"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/context"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/ssh"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/workloadcluster"
)

const (
Expand All @@ -49,6 +51,7 @@ type Machine struct {
machineContext *context.MachineContext
vmiInstance *kubevirtv1.VirtualMachineInstance
vmInstance *kubevirtv1.VirtualMachine
dataVolume *cdiv1.DataVolume

sshKeys *ssh.ClusterNodeSshKeys
getCommandExecutor func(string, *ssh.ClusterNodeSshKeys) ssh.VMCommandExecutor
Expand All @@ -63,6 +66,7 @@ func NewMachine(ctx *context.MachineContext, client client.Client, namespace str
vmiInstance: nil,
vmInstance: nil,
sshKeys: sshKeys,
dataVolume: nil,
getCommandExecutor: ssh.NewVMCommandExecutor,
}

Expand Down Expand Up @@ -90,6 +94,23 @@ func NewMachine(ctx *context.MachineContext, client client.Client, namespace str
machine.vmInstance = vm
}

if vm != nil && vm.Spec.Template != nil {
for _, vol := range vm.Spec.Template.Spec.Volumes {
if vol.DataVolume != nil {
dv := &cdiv1.DataVolume{}
err = client.Get(ctx.Context, types.NamespacedName{Name: vol.DataVolume.Name, Namespace: namespace}, dv)
if err != nil {
if !apierrors.IsNotFound(err) {
return nil, err
}
} else {
machine.dataVolume = dv
}
break
}
}
}

return machine, nil
}

Expand Down Expand Up @@ -219,6 +240,74 @@ func (m *Machine) IsReady() bool {
return m.hasReadyCondition()
}

const (
defaultCondReason = "VMNotReady"
defaultCondMessage = "VM is not ready"
)

func (m *Machine) GetVMUnscheduledReason(ctx *context.MachineContext) (string, string) {

if m.vmInstance == nil {
return defaultCondReason, defaultCondMessage
}

cond := m.getVMCondition(kubevirtv1.VirtualMachineConditionType(corev1.PodScheduled))
if cond != nil && cond.Status == corev1.ConditionFalse {
if cond.Reason == "Unschedulable" {
return "Unschedulable", cond.Message
}
}

reason, message, foundDVReason := m.getDVNotProvisionedReason()
if foundDVReason {
return reason, message
}

return defaultCondReason, defaultCondMessage
}

func (m *Machine) getDVNotProvisionedReason() (string, string, bool) {
if m.dataVolume == nil {
return "", "", false
}

switch m.dataVolume.Status.Phase {
case cdiv1.Succeeded: // DV's OK, return default reason & message
return "", "", false
case cdiv1.Pending:
return "DVPending", fmt.Sprintf("DataVolume %s is pending", m.dataVolume.Name), true
case cdiv1.Failed:
return "DVFailed", fmt.Sprintf("DataVolume %s failed", m.dataVolume.Name), true
default:
msg := "DataVolume is not ready"
for _, dvCond := range m.dataVolume.Status.Conditions {
if dvCond.Type == cdiv1.DataVolumeRunning {
if dvCond.Status == corev1.ConditionTrue || dvCond.Reason == "Completed" {
msg = fmt.Sprintf("DataVolume %s is still provisioning", m.dataVolume.Name)
} else if dvCond.Status == corev1.ConditionFalse {
msg = fmt.Sprintf("DataVolume %s is not running: %s", m.dataVolume.Name, dvCond.Message)
}
break
}
}
return "DVNotReady", msg, true
}
}

func (m *Machine) getVMCondition(t kubevirtv1.VirtualMachineConditionType) *kubevirtv1.VirtualMachineCondition {
if m.vmInstance == nil {
return nil
}

for _, cond := range m.vmInstance.Status.Conditions {
if cond.Type == t {
return cond.DeepCopy()
}
}

return nil
}

// SupportsCheckingIsBootstrapped checks if we have a method of checking
// that this bootstrapper has completed.
func (m *Machine) SupportsCheckingIsBootstrapped() bool {
Expand Down
3 changes: 3 additions & 0 deletions pkg/kubevirt/machine_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ type MachineInterface interface {
IsTerminal() (bool, string, error)

DrainNodeIfNeeded(workloadcluster.WorkloadCluster) (time.Duration, error)

// GetVMUnscheduledReason returns the reason and message for the condition, if the VM is not ready
GetVMUnscheduledReason(ctx *context.MachineContext) (string, string)
}

// MachineFactory allows creating new instances of kubevirt.machine
Expand Down
5 changes: 5 additions & 0 deletions pkg/kubevirt/mock/machine_factory_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 82bfd09

Please sign in to comment.