Skip to content

Commit

Permalink
[controller] Add a BlockDevice labels watcher controller (#94)
Browse files Browse the repository at this point in the history
Signed-off-by: Viktor Kramarenko <viktor.kramarenko@flant.com>
  • Loading branch information
ViktorKram authored Sep 27, 2024
1 parent 6815fd9 commit b7bd55f
Show file tree
Hide file tree
Showing 12 changed files with 700 additions and 26 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/go_modules_check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
echo "Processing $go_mod_file"
while IFS= read -r line; do
if [[ "line" =~ ^replace ]]; then
if [[ "$line" =~ ^replace ]]; then
continue
fi
Expand Down
2 changes: 1 addition & 1 deletion images/agent/src/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module agent
go 1.22.2

require (
github.com/deckhouse/sds-node-configurator/api v0.0.0-20240805103635-969dc811217b
github.com/deckhouse/sds-node-configurator/api v0.0.0-20240926063625-6815fd9556ea
github.com/go-logr/logr v1.4.2
github.com/google/go-cmp v0.6.0
github.com/onsi/ginkgo/v2 v2.19.0
Expand Down
3 changes: 3 additions & 0 deletions images/agent/src/internal/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package internal
import "k8s.io/apimachinery/pkg/api/resource"

const (
// LVGUpdateTriggerLabel if you change this value, you must change its value in sds-health-watcher-controller/src/pkg/block_device_labels_watcher.go as well
LVGUpdateTriggerLabel = "storage.deckhouse.io/update-trigger"

resizeDelta = "32Mi"
PartType = "part"
MultiPathType = "mpath"
Expand Down
10 changes: 5 additions & 5 deletions images/agent/src/pkg/controller/lvm_volume_group_discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ func LVMVolumeGroupDiscoverReconcile(ctx context.Context, cl client.Client, metr

log.Info(fmt.Sprintf(`[RunLVMVolumeGroupDiscoverController] updated LVMVolumeGroup, name: "%s"`, lvg.Name))
} else {
log.Debug(fmt.Sprintf("[RunLVMVolumeGroupDiscoverController] the LVMVolumeGroup %s is not yet created. Create it", lvg.Name))
lvm, err := CreateLVMVolumeGroupByCandidate(ctx, log, metrics, cl, candidate, cfg.NodeName)
log.Debug(fmt.Sprintf("[RunLVMVolumeGroupDiscoverController] the LVMVolumeGroup %s is not yet created. Create it", candidate.LVMVGName))
createdLvg, err := CreateLVMVolumeGroupByCandidate(ctx, log, metrics, cl, candidate, cfg.NodeName)
if err != nil {
log.Error(err, fmt.Sprintf("[RunLVMVolumeGroupDiscoverController] unable to CreateLVMVolumeGroupByCandidate %s. Requeue the request in %s", candidate.LVMVGName, cfg.VolumeGroupScanIntervalSec.String()))
shouldRequeue = true
Expand All @@ -173,19 +173,19 @@ func LVMVolumeGroupDiscoverReconcile(ctx context.Context, cl client.Client, metr

err = updateLVGConditionIfNeeded(ctx, cl, log, &lvg, metav1.ConditionTrue, internal.TypeVGConfigurationApplied, "Success", "all configuration has been applied")
if err != nil {
log.Error(err, fmt.Sprintf("[RunLVMVolumeGroupDiscoverController] unable to add a condition %s to the LVMVolumeGroup %s", internal.TypeVGConfigurationApplied, lvg.Name))
log.Error(err, fmt.Sprintf("[RunLVMVolumeGroupDiscoverController] unable to add a condition %s to the LVMVolumeGroup %s", internal.TypeVGConfigurationApplied, createdLvg.Name))
shouldRequeue = true
continue
}

err = updateLVGConditionIfNeeded(ctx, cl, log, &lvg, metav1.ConditionTrue, internal.TypeVGReady, internal.ReasonUpdated, "ready to create LV")
if err != nil {
log.Error(err, fmt.Sprintf("[RunLVMVolumeGroupDiscoverController] unable to add a condition %s to the LVMVolumeGroup %s", internal.TypeVGReady, lvg.Name))
log.Error(err, fmt.Sprintf("[RunLVMVolumeGroupDiscoverController] unable to add a condition %s to the LVMVolumeGroup %s", internal.TypeVGReady, createdLvg.Name))
shouldRequeue = true
continue
}

log.Info(fmt.Sprintf(`[RunLVMVolumeGroupDiscoverController] created new APILVMVolumeGroup, name: "%s"`, lvm.Name))
log.Info(fmt.Sprintf(`[RunLVMVolumeGroupDiscoverController] created new APILVMVolumeGroup, name: "%s"`, createdLvg.Name))
}
}

Expand Down
27 changes: 12 additions & 15 deletions images/agent/src/pkg/controller/lvm_volume_group_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"fmt"

"github.com/deckhouse/sds-node-configurator/api/v1alpha1"
errors2 "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -63,7 +63,7 @@ func RunLVMVolumeGroupWatcherController(
lvg := &v1alpha1.LVMVolumeGroup{}
err := cl.Get(ctx, request.NamespacedName, lvg)
if err != nil {
if errors2.IsNotFound(err) {
if errors.IsNotFound(err) {
log.Warning(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] seems like the LVMVolumeGroup was deleted as unable to get it, err: %s. Stop to reconcile", err.Error()))
return reconcile.Result{}, nil
}
Expand Down Expand Up @@ -108,6 +108,16 @@ func RunLVMVolumeGroupWatcherController(
return reconcile.Result{}, nil
}

if _, exist := lvg.Labels[internal.LVGUpdateTriggerLabel]; exist {
delete(lvg.Labels, internal.LVGUpdateTriggerLabel)
err = cl.Update(ctx, lvg)
if err != nil {
log.Error(err, fmt.Sprintf("[RunLVMVolumeGroupWatcherController] unable to update the LVMVolumeGroup %s", lvg.Name))
return reconcile.Result{}, err
}
log.Debug(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] successfully removed the label %s from the LVMVolumeGroup %s", internal.LVGUpdateTriggerLabel, lvg.Name))
}

log.Debug(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] tries to get block device resources for the LVMVolumeGroup %s by the selector %v", lvg.Name, lvg.Spec.BlockDeviceSelector.MatchLabels))
blockDevices, err := GetAPIBlockDevices(ctx, cl, metrics, lvg.Spec.BlockDeviceSelector)
if err != nil {
Expand Down Expand Up @@ -147,19 +157,6 @@ func RunLVMVolumeGroupWatcherController(
log.Debug(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] no need to add label %s to the LVMVolumeGroup %s", LVGMetadateNameLabelKey, lvg.Name))
}

log.Debug(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] tries to add label %s to the LVMVolumeGroup %s", LVGMetadateNameLabelKey, cfg.NodeName))
added, err = addLVGLabelIfNeeded(ctx, cl, log, lvg, LVGMetadateNameLabelKey, lvg.Name)
if err != nil {
log.Error(err, fmt.Sprintf("[RunLVMVolumeGroupWatcherController] unable to add label %s to the LVMVolumeGroup %s", LVGMetadateNameLabelKey, lvg.Name))
return reconcile.Result{}, err
}

if added {
log.Debug(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] successfully added label %s to the LVMVolumeGroup %s", LVGMetadateNameLabelKey, lvg.Name))
} else {
log.Debug(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] no need to add label %s to the LVMVolumeGroup %s", LVGMetadateNameLabelKey, lvg.Name))
}

// We do this after BlockDevices validation and node belonging check to prevent multiple updates by all agents pods
bds, _ := sdsCache.GetDevices()
if len(bds) == 0 {
Expand Down
23 changes: 22 additions & 1 deletion images/agent/src/pkg/controller/lvm_volume_group_watcher_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ func shouldLVGWatcherReconcileUpdateEvent(log logger.Logger, oldLVG, newLVG *v1a
return true
}

if _, exist := newLVG.Labels[internal.LVGUpdateTriggerLabel]; exist {
log.Debug(fmt.Sprintf("[shouldLVGWatcherReconcileUpdateEvent] update event should be reconciled as the LVMVolumeGroup %s has the label %s", newLVG.Name, internal.LVGUpdateTriggerLabel))
return true
}

if shouldUpdateLVGLabels(log, newLVG, LVGMetadateNameLabelKey, newLVG.Name) {
log.Debug(fmt.Sprintf("[shouldLVGWatcherReconcileUpdateEvent] update event should be reconciled as the LVMVolumeGroup's %s labels have been changed", newLVG.Name))
return true
Expand Down Expand Up @@ -241,7 +246,7 @@ func syncThinPoolsAllocationLimit(ctx context.Context, cl client.Client, log log
if updated {
fmt.Printf("%+v", lvg.Status.ThinPools)
log.Debug(fmt.Sprintf("[syncThinPoolsAllocationLimit] tries to update the LVMVolumeGroup %s", lvg.Name))
err := cl.Status().Update(ctx, lvg)
err = cl.Status().Update(ctx, lvg)
if err != nil {
return err
}
Expand All @@ -258,6 +263,22 @@ func validateSpecBlockDevices(lvg *v1alpha1.LVMVolumeGroup, blockDevices map[str
return false, "none of specified BlockDevices were found"
}

if len(lvg.Status.Nodes) > 0 {
lostBdNames := make([]string, 0, len(lvg.Status.Nodes[0].Devices))
for _, n := range lvg.Status.Nodes {
for _, d := range n.Devices {
if _, found := blockDevices[d.BlockDevice]; !found {
lostBdNames = append(lostBdNames, d.BlockDevice)
}
}
}

// that means some of the used BlockDevices no longer match the blockDeviceSelector
if len(lostBdNames) > 0 {
return false, fmt.Sprintf("these BlockDevices no longer match the blockDeviceSelector: %s", strings.Join(lostBdNames, ","))
}
}

for _, me := range lvg.Spec.BlockDeviceSelector.MatchExpressions {
if me.Key == internal.MetadataNameLabelKey {
if len(me.Values) != len(blockDevices) {
Expand Down
38 changes: 36 additions & 2 deletions images/agent/src/pkg/controller/lvm_volume_group_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,38 @@ func TestLVMVolumeGroupWatcherCtrl(t *testing.T) {
}
})

t.Run("validation_fails_due_to_bd_left_the_selector", func(t *testing.T) {
lvg := &v1alpha1.LVMVolumeGroup{
Status: v1alpha1.LVMVolumeGroupStatus{
Nodes: []v1alpha1.LVMVolumeGroupNode{
{
Devices: []v1alpha1.LVMVolumeGroupDevice{
{
BlockDevice: "first",
},
{
BlockDevice: "second",
},
},
Name: "some-node",
},
},
},
}

bds := map[string]v1alpha1.BlockDevice{
"second": {
ObjectMeta: v1.ObjectMeta{
Name: "second",
},
},
}

valid, reason := validateSpecBlockDevices(lvg, bds)
assert.False(t, valid)
assert.Equal(t, "these BlockDevices no longer match the blockDeviceSelector: first", reason)
})

t.Run("validation_fails_due_to_bd_has_dif_node", func(t *testing.T) {
const (
nodeName = "nodeName"
Expand Down Expand Up @@ -762,8 +794,9 @@ func TestLVMVolumeGroupWatcherCtrl(t *testing.T) {
},
}

valid, _ := validateSpecBlockDevices(lvg, bds)
valid, reason := validateSpecBlockDevices(lvg, bds)
assert.False(t, valid)
assert.Equal(t, "block devices second have different node names from LVMVolumeGroup Local.NodeName", reason)
})

t.Run("validation_fails_due_to_no_block_devices_were_found", func(t *testing.T) {
Expand All @@ -787,8 +820,9 @@ func TestLVMVolumeGroupWatcherCtrl(t *testing.T) {
},
}

valid, _ := validateSpecBlockDevices(lvg, nil)
valid, reason := validateSpecBlockDevices(lvg, nil)
assert.False(t, valid)
assert.Equal(t, "none of specified BlockDevices were found", reason)
})

t.Run("validation_fails_due_to_some_blockdevice_were_not_found", func(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions images/sds-health-watcher-controller/src/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ func main() {
os.Exit(1)
}

err = controller.RunBlockDeviceLabelsWatcher(mgr, *log, *cfgParams)
if err != nil {
log.Error(err, "[main] unable to run BlockDeviceWatcher controller")
os.Exit(1)
}

if err = mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
log.Error(err, "[main] unable to mgr.AddHealthzCheck")
os.Exit(1)
Expand Down
2 changes: 1 addition & 1 deletion images/sds-health-watcher-controller/src/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.22.3

require (
github.com/cloudflare/cfssl v1.5.0
github.com/deckhouse/sds-node-configurator/api v0.0.0-20240925090458-249de2896583
github.com/deckhouse/sds-node-configurator/api v0.0.0-20240926063625-6815fd9556ea
github.com/go-logr/logr v1.4.2
github.com/prometheus/client_golang v1.19.1
github.com/stretchr/testify v1.9.0
Expand Down
Loading

0 comments on commit b7bd55f

Please sign in to comment.