Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow concurrent broker restarts from same AZ (broker rack) #62

Merged
merged 3 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions api/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ type RollingUpgradeConfig struct {
// distinct broker replicas with either offline replicas or out of sync replicas and the number of alerts triggered by
// alerts with 'rollingupgrade'
FailureThreshold int `json:"failureThreshold"`

// ConcurrentBrokerRestartsAllowed controls how many brokers can be restarted in parallel during a rolling upgrade. If
// it is set to a value greater than 1, the operator will restart up to that amount of brokers in parallel, if the
// brokers are within the same rack (as specified by "broker.rack" in broker read-only configs). Since using Kafka broker
// racks spreads out the replicas, we know that restarting multiple brokers in the same rack will not cause more than
// 1/Nth of the replicas of a topic-partition to be unavailable at the same time, where N is the number of racks used.
// This is a safe way to speed up the rolling upgrade. Note that for the rack distribution explained above, Cruise Control
// requires `com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareDistributionGoal` to be configured.
// +optional
ConcurrentBrokerRestartsAllowed int `json:"concurrentBrokerRestartsAllowed,omitempty"`
}

// DisruptionBudget defines the configuration for PodDisruptionBudget where the workload is managed by the kafka-operator
Expand Down
8 changes: 8 additions & 0 deletions charts/kafka-operator/templates/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21177,6 +21177,14 @@ spec:
with either offline replicas or out of sync replicas and the
number of alerts triggered by alerts with 'rollingupgrade'
type: integer
concurrentBrokerRestartsAllowed:
description: ConcurrentBrokerRestartsAllowed controls how many brokers can be restarted in parallel during a rolling upgrade. If
it is set to a value greater than 1, the operator will restart up to that amount of brokers in parallel, if the
brokers are within the same AZ (as specified by "broker.rack" in broker read-only configs). Since using Kafka broker
racks spreads out the replicas, we know that restarting multiple brokers in the same rack will not cause more than
1/Nth of the replicas of a topic-partition to be unavailable at the same time, where N is the number of racks used.
This is a safe way to speed up the rolling upgrade.
type: integer
required:
- failureThreshold
type: object
Expand Down
8 changes: 8 additions & 0 deletions config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21014,6 +21014,14 @@ spec:
with either offline replicas or out of sync replicas and the
number of alerts triggered by alerts with 'rollingupgrade'
type: integer
concurrentBrokerRestartsAllowed:
description: ConcurrentBrokerRestartsAllowed controls how many brokers can be restarted in parallel during a rolling upgrade. If
it is set to a value greater than 1, the operator will restart up to that amount of brokers in parallel, if the
brokers are within the same AZ (as specified by "broker.rack" in broker read-only configs). Since using Kafka broker
racks spreads out the replicas, we know that restarting multiple brokers in the same rack will not cause more than
1/Nth of the replicas of a topic-partition to be unavailable at the same time, where N is the number of racks used.
This is a safe way to speed up the rolling upgrade.
type: integer
required:
- failureThreshold
type: object
Expand Down
12 changes: 10 additions & 2 deletions config/samples/banzaicloud_v1beta1_kafkacluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ spec:
# alerts with 'rollingupgrade'
# failureThreshold: 1

# concurrentBrokerRestartsAllowed controls how many brokers can be restarted in parallel during a rolling upgrade. If
# it is set to a value greater than 1, the operator will restart up to that amount of brokers in parallel, if the
# brokers are within the same AZ (as specified by "broker.rack" in broker read-only configs). Since using Kafka broker
# racks spreads out the replicas, we know that restarting multiple brokers in the same rack will not cause more than
# 1/Nth of the replicas of a topic-partition to be unavailable at the same time, where N is the number of racks used.
# This is a safe way to speed up the rolling upgrade.
# concurrentBrokerRestartsAllowed: 1

# brokerConfigGroups specifies multiple broker configs with unique name
brokerConfigGroups:
# Specify desired group name (eg., 'default_group')
Expand Down Expand Up @@ -189,11 +197,11 @@ spec:
# which has type per-broker
# priorityClassName can be used to set the broker pod's priority
# priorityClassName: "high-priority"

# When "hostNameOverride" and brokerConfig.nodePortExternalIP are empty and NodePort access method is selected for external listener
# nodePortNodeAdddressType defines the Kafka broker's Kubernetes node's address type that shall be used in the advertised.listeners property
# when nodeport is used for an external listener.
# its values can be Hostname, ExternalIP, InternalIP, InternalDNS,ExternalDNS
# its values can be Hostname, ExternalIP, InternalIP, InternalDNS,ExternalDNS
#nodePortNodeAddressType: "ExternalIP"
config: |
sasl.enabled.mechanisms=PLAIN
Expand Down
2 changes: 1 addition & 1 deletion controllers/tests/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func createMinimalKafkaClusterCR(name, namespace string) *v1beta1.KafkaCluster {
CCJMXExporterConfig: "custom_property: custom_value",
},
ReadOnlyConfig: "cruise.control.metrics.topic.auto.create=true",
RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{FailureThreshold: 1},
RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{FailureThreshold: 1, ConcurrentBrokerRestartsAllowed: 1},
},
}
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/kafkaclient/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package kafkaclient

import (
"github.com/stretchr/testify/mock"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/banzaicloud/koperator/api/v1beta1"
Expand Down Expand Up @@ -45,3 +46,13 @@ func NewDefaultProvider() Provider {
func (dp *defaultProvider) NewFromCluster(client client.Client, cluster *v1beta1.KafkaCluster) (KafkaClient, func(), error) {
return NewFromCluster(client, cluster)
}

// MockerProvider is a Testify mock for providing Kafka clients that can be mocks too
type MockedProvider struct {
mock.Mock
}

func (m *MockedProvider) NewFromCluster(client client.Client, cluster *v1beta1.KafkaCluster) (KafkaClient, func(), error) {
args := m.Called(client, cluster)
return args.Get(0).(KafkaClient), args.Get(1).(func()), args.Error(2)
}
129 changes: 116 additions & 13 deletions pkg/resources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"reflect"
"regexp"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -87,12 +88,23 @@ const (
nonControllerBrokerReconcilePriority
// controllerBrokerReconcilePriority the priority used for controller broker used to define its priority in the reconciliation order
controllerBrokerReconcilePriority

// defaultConcurrentBrokerRestartsAllowed the default number of brokers that can be restarted in parallel
defaultConcurrentBrokerRestartsAllowed = 1
)

var (
// kafkaConfigBrokerRackRegex the regex to parse the "broker.rack" Kafka property used in read-only configs
kafkaConfigBrokerRackRegex = regexp.MustCompile(`broker\.rack\s*=\s*(\w+)`)
)

// Reconciler implements the Component Reconciler
type Reconciler struct {
resources.Reconciler
// kafkaClientProvider is used to create a new KafkaClient
kafkaClientProvider kafkaclient.Provider
// kafkaBrokerAvailabilityZoneMap is a map of broker id to availability zone used in concurrent broker restarts logic
kafkaBrokerAvailabilityZoneMap map[int32]string
}

// New creates a new reconciler for Kafka
Expand All @@ -103,9 +115,39 @@ func New(client client.Client, directClient client.Reader, cluster *v1beta1.Kafk
DirectClient: directClient,
KafkaCluster: cluster,
},
kafkaClientProvider: kafkaClientProvider,
kafkaClientProvider: kafkaClientProvider,
kafkaBrokerAvailabilityZoneMap: getBrokerAzMap(cluster),
}
}

func getBrokerAzMap(cluster *v1beta1.KafkaCluster) map[int32]string {
brokerAzMap := make(map[int32]string)
for _, broker := range cluster.Spec.Brokers {
brokerRack := getBrokerRack(broker.ReadOnlyConfig)
if brokerRack != "" {
brokerAzMap[broker.Id] = brokerRack
}
}
// if incomplete broker AZ information, consider all brokers as being in different AZs
if len(brokerAzMap) != len(cluster.Spec.Brokers) {
for _, broker := range cluster.Spec.Brokers {
brokerAzMap[broker.Id] = fmt.Sprintf("%d", broker.Id)
}
}
return brokerAzMap
}

func getBrokerRack(readOnlyConfig string) string {
if readOnlyConfig == "" {
return ""
}
match := kafkaConfigBrokerRackRegex.FindStringSubmatch(readOnlyConfig)
if len(match) == 2 {
return match[1]
}
return ""
}

func getCreatedPvcForBroker(
ctx context.Context,
c client.Reader,
Expand Down Expand Up @@ -877,18 +919,23 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo
if err != nil {
return errors.WrapIf(err, "failed to reconcile resource")
}
for _, pod := range podList.Items {
pod := pod
if k8sutil.IsMarkedForDeletion(pod.ObjectMeta) {
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("pod is still terminating"), "rolling upgrade in progress")
}
if k8sutil.IsPodContainsPendingContainer(&pod) {
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("pod is still creating"), "rolling upgrade in progress")
}
if len(podList.Items) < len(r.KafkaCluster.Spec.Brokers) {
ctrlaltluc marked this conversation as resolved.
Show resolved Hide resolved
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("pod count differs from brokers spec"), "rolling upgrade in progress")
}

errorCount := r.KafkaCluster.Status.RollingUpgrade.ErrorCount
// Check if we support multiple broker restarts and restart only in same AZ, otherwise restart only 1 broker at once
concurrentBrokerRestartsAllowed := r.getConcurrentBrokerRestartsAllowed()
terminatingOrPendingPods := getPodsInTerminatingOrPendingState(podList.Items)
if len(terminatingOrPendingPods) >= concurrentBrokerRestartsAllowed {
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New(strconv.Itoa(concurrentBrokerRestartsAllowed)+" pod(s) is still terminating or creating"), "rolling upgrade in progress")
}
currentPodAz := r.getBrokerAz(currentPod)
if concurrentBrokerRestartsAllowed > 1 && r.existsTerminatingPodFromAnotherAz(currentPodAz, terminatingOrPendingPods) {
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("pod is still terminating or creating from another AZ"), "rolling upgrade in progress")
}

// Check broker count with out-of-sync and offline replicas against the rolling upgrade failure threshold
errorCount := r.KafkaCluster.Status.RollingUpgrade.ErrorCount
kClient, close, err := r.kafkaClientProvider.NewFromCluster(r.Client, r.KafkaCluster)
if err != nil {
return errorfactory.New(errorfactory.BrokersUnreachable{}, err, "could not connect to kafka brokers")
Expand All @@ -908,20 +955,24 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo
if len(outOfSyncReplicas) > 0 {
log.Info("out-of-sync replicas", "IDs", outOfSyncReplicas)
}

impactedReplicas := make(map[int32]struct{})
for _, brokerID := range allOfflineReplicas {
impactedReplicas[brokerID] = struct{}{}
}
for _, brokerID := range outOfSyncReplicas {
impactedReplicas[brokerID] = struct{}{}
}

errorCount += len(impactedReplicas)

if errorCount >= r.KafkaCluster.Spec.RollingUpgradeConfig.FailureThreshold {
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("cluster is not healthy"), "rolling upgrade in progress")
}

// If multiple concurrent restarts and broker failures allowed, restart only brokers from the same AZ
if concurrentBrokerRestartsAllowed > 1 && r.KafkaCluster.Spec.RollingUpgradeConfig.FailureThreshold > 1 {
if r.existsFailedBrokerFromAnotherRack(currentPodAz, impactedReplicas) {
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("broker is not healthy from another AZ"), "rolling upgrade in progress")
}
}
}
}

Expand All @@ -943,6 +994,37 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo
return nil
}

func (r *Reconciler) existsFailedBrokerFromAnotherRack(currentPodAz string, impactedReplicas map[int32]struct{}) bool {
if currentPodAz == "" && len(impactedReplicas) > 0 {
return true
}
for brokerWithFailure := range impactedReplicas {
if currentPodAz != r.kafkaBrokerAvailabilityZoneMap[brokerWithFailure] {
return true
}
}
return false
}

func (r *Reconciler) getConcurrentBrokerRestartsAllowed() int {
if r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartsAllowed > 1 {
return r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartsAllowed
}
return defaultConcurrentBrokerRestartsAllowed
}

func (r *Reconciler) existsTerminatingPodFromAnotherAz(currentPodAz string, terminatingOrPendingPods []corev1.Pod) bool {
if currentPodAz == "" && len(terminatingOrPendingPods) > 0 {
return true
}
for _, terminatingOrPendingPod := range terminatingOrPendingPods {
if currentPodAz != r.getBrokerAz(&terminatingOrPendingPod) {
return true
}
}
return false
}

// Checks for match between pod labels and TaintedBrokersSelector
func (r *Reconciler) isPodTainted(log logr.Logger, pod *corev1.Pod) bool {
selector, err := metav1.LabelSelectorAsSelector(r.KafkaCluster.Spec.TaintedBrokersSelector)
Expand Down Expand Up @@ -1382,6 +1464,27 @@ func (r *Reconciler) determineControllerId() (int32, error) {
return controllerID, nil
}

func getPodsInTerminatingOrPendingState(items []corev1.Pod) []corev1.Pod {
var pods []corev1.Pod
for _, pod := range items {
if k8sutil.IsMarkedForDeletion(pod.ObjectMeta) {
pods = append(pods, pod)
}
if k8sutil.IsPodContainsPendingContainer(&pod) {
pods = append(pods, pod)
}
}
return pods
}

func (r *Reconciler) getBrokerAz(pod *corev1.Pod) string {
brokerId, err := strconv.ParseInt(pod.Labels[v1beta1.BrokerIdLabelKey], 10, 32)
if err != nil {
return ""
}
return r.kafkaBrokerAvailabilityZoneMap[int32(brokerId)]
}

func getServiceFromExternalListener(client client.Client, cluster *v1beta1.KafkaCluster,
eListenerName string, ingressConfigName string) (*corev1.Service, error) {
foundLBService := &corev1.Service{}
Expand Down
Loading