Skip to content

Commit

Permalink
Logstash: Support volume expansion for Logstash pods
Browse files Browse the repository at this point in the history
This commit adds support for expanding Logstash volumes by editing the storage requests
in spec.VolumeClaimTemplate, based on the existing implenentation in Elasticsearch
implemented in elastic#3752.

The same constraints hold -

* Volume size can only be increased, not decreased
* Storage class must specify allowVolumeExpansion: true
* Filesystem resize without pod recreation is only possible if the storage driver allows it

This works in the same way as the Elasticsearch implementation

An update of the storage request in the volumeClaimTemplate will
* Update the storage requests spec of all existing PVCs: they are immediately resized by the storage driver, if inline expansion is supported. Otherwise Pods need to be recreated.
* Delete the StatefulSet, but not the pod that it owns, storing recreation details in an annotation on the owning Logstash resource
* Recreate the StatefulSet with the new volumeClaimTemplate spec
* Remove the recreation annotation from the Logstash resource

This PR moves some of the PVC-expansion code from Elasticsearch into a common area to avoid code duplication
  • Loading branch information
robbavey committed Jan 23, 2024
1 parent bfae9f6 commit 47817ea
Show file tree
Hide file tree
Showing 16 changed files with 1,468 additions and 22 deletions.
9 changes: 9 additions & 0 deletions pkg/apis/logstash/v1alpha1/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,12 @@ func (logstash *Logstash) GetIdentityLabels() map[string]string {
"logstash.k8s.elastic.co/name": logstash.Name,
}
}

// GetPodIdentityLabels will return the common Elastic assigned labels for a Logstash Pod
func (logstash *Logstash) GetPodIdentityLabels() map[string]string {
return map[string]string{
commonv1.TypeLabelName: "logstash",
"logstash.k8s.elastic.co/name": logstash.Name,
"logstash.k8s.elastic.co/statefulset-name": Name(logstash.Name),
}
}
44 changes: 43 additions & 1 deletion pkg/controller/logstash/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ package logstash

import (
"context"
"fmt"
"hash/fnv"
"strings"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
Expand All @@ -15,16 +17,21 @@ import (
logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/certificates"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/events"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/expectations"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/keystore"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/operator"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/reconciler"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/tracing"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/watches"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/logstash/configs"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/logstash/labels"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/logstash/sset"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/logstash/stackmon"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/logstash/volume"
"github.com/elastic/cloud-on-k8s/v2/pkg/utils/k8s"
"github.com/elastic/cloud-on-k8s/v2/pkg/utils/log"

ulog "github.com/elastic/cloud-on-k8s/v2/pkg/utils/log"
)

// Params are a set of parameters used during internal reconciliation of Logstash.
Expand All @@ -41,6 +48,10 @@ type Params struct {
OperatorParams operator.Parameters
KeystoreResources *keystore.Resources
APIServerConfig configs.APIServer // resolved API server config

// Expectations control some expectations set on resources in the cache, in order to
// avoid doing certain operations if the cache hasn't seen an up-to-date resource yet.
Expectations *expectations.Expectations
}

// K8sClient returns the Kubernetes client.
Expand Down Expand Up @@ -91,7 +102,7 @@ func internalReconcile(params Params) (*reconciler.Results, logstashv1alpha1.Log
Owner: &params.Logstash,
TLSOptions: apiSvcTLS,
Namer: logstashv1alpha1.Namer,
Labels: NewLabels(params.Logstash),
Labels: labels.NewLabels(params.Logstash),
Services: []corev1.Service{apiSvc},
GlobalCA: params.OperatorParams.GlobalCA,
CACertRotation: params.OperatorParams.CACertRotation,
Expand Down Expand Up @@ -137,3 +148,34 @@ func internalReconcile(params Params) (*reconciler.Results, logstashv1alpha1.Log
}
return reconcileStatefulSet(params, podTemplate)
}

// expectationsSatisfied checks that resources in our local cache match what we expect.
// If not, it's safer to not move on with StatefulSets and Pods reconciliation.
// Continuing with the reconciliation at this point may lead to:
// - calling ES orchestration settings (zen1/zen2/allocation excludes) with wrong assumptions
// (eg. incorrect number of nodes or master-eligible nodes topology)
// - create or delete more than one master node at once
func (p *Params)expectationsSatisfied(ctx context.Context) (bool, string, error) {

log := ulog.FromContext(ctx)
// make sure the cache is up-to-date
expectationsOK, reason, err := p.Expectations.Satisfied()
if err != nil {
return false, "Cache is not up to date", err
}
if !expectationsOK {
log.V(1).Info("Cache expectations are not satisfied yet, re-queueing", "namespace", p.Logstash.Namespace, "ls_name", p.Logstash.Name, "reason", reason)
return false, reason, nil
}
actualStatefulSets, err := sset.RetrieveActualStatefulSets(p.Client, k8s.ExtractNamespacedName(&p.Logstash))
if err != nil {
return false, "Cannot retrieve actual stateful sets", err
}
// make sure StatefulSet statuses have been reconciled by the StatefulSet controller
pendingStatefulSetReconciliation := actualStatefulSets.PendingReconciliation()
if len(pendingStatefulSetReconciliation) > 0 {
log.V(1).Info("StatefulSets observedGeneration is not reconciled yet, re-queueing", "namespace", p.Logstash.Namespace, "ls_name", p.Logstash.Name)
return false, fmt.Sprintf("observedGeneration is not reconciled yet for StatefulSets %s", strings.Join(pendingStatefulSetReconciliation.Names().AsSlice(), ",")), nil
}
return actualStatefulSets.PodReconciliationDone(ctx, p.Client)
}
3 changes: 2 additions & 1 deletion pkg/controller/logstash/keystore.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/keystore"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/logstash/labels"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/pod"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/logstash/volume"
)
Expand Down Expand Up @@ -46,7 +47,7 @@ func reconcileKeystore(params Params, configHash hash.Hash) (*keystore.Resources
params,
&params.Logstash,
logstashv1alpha1.Namer,
NewLabels(params.Logstash),
labels.NewLabels(params.Logstash),
initContainersParameters,
); err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package logstash
package labels

import (
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -20,6 +20,10 @@ const (

// NamespaceLabelName used to represent a Logstash in k8s resources
NamespaceLabelName = "logstash.k8s.elastic.co/namespace"

// StatefulSetNameLabelName used to store the name of the statefulset.
StatefulSetNameLabelName = "logstash.k8s.elastic.co/statefulset-name"

)

// NewLabels returns the set of common labels for an Elastic Logstash.
Expand All @@ -30,7 +34,20 @@ func NewLabels(logstash logstashv1alpha1.Logstash) map[string]string {
}
}

// NewLabelSelectorForLogstash returns a labels.Selector that matches the labels as constructed by NewLabels
func NewPodLabels(logstash logstashv1alpha1.Logstash, ssetName string) map[string]string {
return map[string]string{
commonv1.TypeLabelName: TypeLabelValue,
NameLabelName: logstash.Name,
StatefulSetNameLabelName: ssetName,
}
}

// NewLabelSelectorForElasticsearch returns a labels.Selector that matches the labels as constructed by NewLabels
func NewLabelSelectorForLogstash(ls logstashv1alpha1.Logstash) client.MatchingLabels {
return client.MatchingLabels(map[string]string{commonv1.TypeLabelName: TypeLabelValue, NameLabelName: ls.Name})
return NewLabelSelectorForLogstashName(ls.Name)
}

// NewLabelSelectorForLogstash returns a labels.Selector that matches the labels as constructed by NewLabels
func NewLabelSelectorForLogstashName(lsName string) client.MatchingLabels {
return client.MatchingLabels(map[string]string{commonv1.TypeLabelName: TypeLabelValue, NameLabelName: lsName})
}
7 changes: 6 additions & 1 deletion pkg/controller/logstash/logstash_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import (
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/association"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/events"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/expectations"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/keystore"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/operator"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/reconciler"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/tracing"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/version"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/watches"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/logstash/labels"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/logstash/pipelines"
"github.com/elastic/cloud-on-k8s/v2/pkg/utils/k8s"
ulog "github.com/elastic/cloud-on-k8s/v2/pkg/utils/log"
Expand Down Expand Up @@ -56,6 +58,7 @@ func newReconciler(mgr manager.Manager, params operator.Parameters) *ReconcileLo
recorder: mgr.GetEventRecorderFor(controllerName),
dynamicWatches: watches.NewDynamicWatches(),
Parameters: params,
expectations: expectations.NewExpectations(client),
}
}

Expand All @@ -77,7 +80,7 @@ func addWatches(mgr manager.Manager, c controller.Controller, r *ReconcileLogsta

// Watch Pods, to ensure `status.version` is correctly reconciled on any change.
// Watching StatefulSets only may lead to missing some events.
if err := watches.WatchPods(mgr, c, NameLabelName); err != nil {
if err := watches.WatchPods(mgr, c, labels.NameLabelName); err != nil {
return err
}

Expand Down Expand Up @@ -114,6 +117,7 @@ type ReconcileLogstash struct {
operator.Parameters
// iteration is the number of times this controller has run its Reconcile method
iteration uint64
expectations *expectations.Expectations
}

// Reconcile reads that state of the cluster for a Logstash object and makes changes based on the state read
Expand Down Expand Up @@ -200,6 +204,7 @@ func (r *ReconcileLogstash) doReconcile(ctx context.Context, logstash logstashv1
Logstash: logstash,
Status: status,
OperatorParams: r.Parameters,
Expectations: r.expectations,
})
}

Expand Down
75 changes: 71 additions & 4 deletions pkg/controller/logstash/logstash_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,33 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/utils/pointer"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/apimachinery/pkg/api/resource"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

commonv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/common/v1"
logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/logstash/labels"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/comparison"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/expectations"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/hash"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/watches"
"github.com/elastic/cloud-on-k8s/v2/pkg/utils/k8s"
)

func newReconcileLogstash(objs ...client.Object) *ReconcileLogstash {
client := k8s.NewFakeClient(objs...)
r := &ReconcileLogstash{
Client: k8s.NewFakeClient(objs...),
Client: client,
recorder: record.NewFakeRecorder(100),
dynamicWatches: watches.NewDynamicWatches(),
expectations: expectations.NewExpectations(client),
}
return r
}
Expand Down Expand Up @@ -157,13 +164,32 @@ func TestReconcileLogstash_Reconcile(t *testing.T) {
Replicas: 1,
ReadyReplicas: 1,
},
Spec: appsv1.StatefulSetSpec{
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{
{
ObjectMeta: metav1.ObjectMeta{
Name: "logstash-data",
Namespace: "test",
},
Spec: corev1.PersistentVolumeClaimSpec{
StorageClassName: pointer.String(sampleStorageClass.Name),
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: resource.MustParse("1Gi"),
},
},
},
},
},
},

},
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "testLogstash-ls",
Namespace: "test",
Generation: 2,
Labels: map[string]string{NameLabelName: "testLogstash", VersionLabelName: "8.12.0"},
Labels: map[string]string{labels.NameLabelName: "testLogstash", VersionLabelName: "8.12.0"},
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
Expand Down Expand Up @@ -250,13 +276,31 @@ func TestReconcileLogstash_Reconcile(t *testing.T) {
Replicas: 1,
ReadyReplicas: 1,
},
Spec: appsv1.StatefulSetSpec{
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{
{
ObjectMeta: metav1.ObjectMeta{
Name: "logstash-data",
Namespace: "test",
},
Spec: corev1.PersistentVolumeClaimSpec{
StorageClassName: pointer.String(sampleStorageClass.Name),
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: resource.MustParse("1Gi"),
},
},
},
},
},
},
},
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "testLogstash-ls",
Namespace: "test",
Generation: 2,
Labels: map[string]string{NameLabelName: "testLogstash", VersionLabelName: "8.12.0"},
Labels: map[string]string{labels.NameLabelName: "testLogstash", VersionLabelName: "8.12.0"},
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
Expand Down Expand Up @@ -344,6 +388,11 @@ func TestReconcileLogstash_Reconcile(t *testing.T) {
ObservedGeneration: 1,
},
},
&storagev1.StorageClass{
ObjectMeta: metav1.ObjectMeta{
Name: "default-sc",
},
},
&appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: "testLogstash-ls",
Expand All @@ -355,13 +404,31 @@ func TestReconcileLogstash_Reconcile(t *testing.T) {
Replicas: 1,
ReadyReplicas: 1,
},
Spec: appsv1.StatefulSetSpec{
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{
{
ObjectMeta: metav1.ObjectMeta{
Name: "logstash-data",
Namespace: "test",
},
Spec: corev1.PersistentVolumeClaimSpec{
StorageClassName: pointer.String(sampleStorageClass.Name),
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: resource.MustParse("1Gi"),
},
},
},
},
},
},
},
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "testLogstash-ls",
Namespace: "test",
Generation: 2,
Labels: map[string]string{NameLabelName: "testLogstash", VersionLabelName: "8.12.0"},
Labels: map[string]string{labels.NameLabelName: "testLogstash", VersionLabelName: "8.12.0"},
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller/logstash/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/labels"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/reconciler"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/tracing"
lslabels "github.com/elastic/cloud-on-k8s/v2/pkg/controller/logstash/labels"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/logstash/pipelines"
)

Expand All @@ -34,7 +35,7 @@ func reconcilePipeline(params Params) error {
ObjectMeta: metav1.ObjectMeta{
Namespace: params.Logstash.Namespace,
Name: logstashv1alpha1.PipelineSecretName(params.Logstash.Name),
Labels: labels.AddCredentialsLabel(NewLabels(params.Logstash)),
Labels: labels.AddCredentialsLabel(lslabels.NewLabels(params.Logstash)),
},
Data: map[string][]byte{
PipelineFileName: cfgBytes,
Expand All @@ -45,7 +46,7 @@ func reconcilePipeline(params Params) error {
reconciler.WithPostUpdate(func() {
annotation.MarkPodsAsUpdated(params.Context, params.Client,
client.InNamespace(params.Logstash.Namespace),
NewLabelSelectorForLogstash(params.Logstash),
lslabels.NewLabelSelectorForLogstash(params.Logstash),
)
}),
); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/logstash/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func buildPodTemplate(params Params, configHash hash.Hash32) (corev1.PodTemplate
return corev1.PodTemplateSpec{}, err
}

labels := maps.Merge(params.Logstash.GetIdentityLabels(), map[string]string{
labels := maps.Merge(params.Logstash.GetPodIdentityLabels(), map[string]string{
VersionLabelName: spec.Version})

annotations := map[string]string{
Expand Down
Loading

0 comments on commit 47817ea

Please sign in to comment.