Skip to content

Commit

Permalink
Enable setting PodAutoscaler configuration via YAML labels (#409)
Browse files Browse the repository at this point in the history
* enable config PA via metadata.labels

* fix linter
  • Loading branch information
kr11 authored Nov 21, 2024
1 parent 703c4f6 commit 8d25fb7
Show file tree
Hide file tree
Showing 7 changed files with 266 additions and 8 deletions.
4 changes: 4 additions & 0 deletions config/samples/autoscaling_v1alpha1_mock_llama.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ metadata:
labels:
app.kubernetes.io/name: aibrix
app.kubernetes.io/managed-by: kustomize
autoscaling.aibrix.ai/max-scale-up-rate: "2"
autoscaling.aibrix.ai/max-scale-down-rate: "2"
kpa.autoscaling.aibrix.ai/stable-window: "60s"
kpa.autoscaling.aibrix.ai/scale-down-delay: "60s"
namespace: aibrix-system
spec:
scaleTargetRef:
Expand Down
46 changes: 46 additions & 0 deletions pkg/controller/podautoscaler/common/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,19 @@ limitations under the License.

package common

import (
"strconv"

autoscalingv1alpha1 "github.com/aibrix/aibrix/api/autoscaling/v1alpha1"
"k8s.io/klog/v2"
)

const (
AutoscalingLabelPrefix = "autoscaling.aibrix.ai/"
maxScaleUpRateLabel = AutoscalingLabelPrefix + "max-scale-up-rate"
maxScaleDownRateLabel = AutoscalingLabelPrefix + "max-scale-down-rate"
)

// ScalingContext defines the generalized common that holds all necessary data for scaling calculations.
type ScalingContext interface {
GetTargetValue() float64
Expand All @@ -24,6 +37,7 @@ type ScalingContext interface {
GetMaxScaleUpRate() float64
GetMaxScaleDownRate() float64
GetCurrentUsePerPod() float64
UpdateByPaTypes(pa *autoscalingv1alpha1.PodAutoscaler) error
}

// BaseScalingContext provides a base implementation of the ScalingContext interface.
Expand All @@ -42,6 +56,8 @@ type BaseScalingContext struct {
currentUsePerPod float64
}

var _ ScalingContext = (*BaseScalingContext)(nil)

// NewBaseScalingContext creates a new instance of BaseScalingContext with default values.
func NewBaseScalingContext() *BaseScalingContext {
return &BaseScalingContext{
Expand All @@ -53,6 +69,36 @@ func NewBaseScalingContext() *BaseScalingContext {
}
}

// UpdateByPaTypes should be invoked in any scaling context that embeds BaseScalingContext.
func (b *BaseScalingContext) UpdateByPaTypes(pa *autoscalingv1alpha1.PodAutoscaler) error {
b.ScalingMetric = pa.Spec.TargetMetric
// parse target value
targetValue, err := strconv.ParseFloat(pa.Spec.TargetValue, 64)
if err != nil {
klog.ErrorS(err, "Failed to parse target value", "targetValue", pa.Spec.TargetValue)
return err
}
b.TargetValue = targetValue

for key, value := range pa.Labels {
switch key {
case maxScaleUpRateLabel:
v, err := strconv.ParseFloat(value, 64)
if err != nil {
return err
}
b.MaxScaleUpRate = v
case maxScaleDownRateLabel:
v, err := strconv.ParseFloat(value, 64)
if err != nil {
return err
}
b.MaxScaleDownRate = v
}
}
return nil
}

func (b *BaseScalingContext) SetCurrentUsePerPod(value float64) {
b.currentUsePerPod = value
}
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/podautoscaler/podautoscaler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,7 @@ func (r *PodAutoscalerReconciler) computeReplicasForMetrics(ctx context.Context,

err = r.updateScalerSpec(ctx, pa)
if err != nil {
klog.ErrorS(err, "Failed to update scaler spec from pa_types")
return 0, "", currentTimestamp, fmt.Errorf("error update scaler spec: %w", err)
}

Expand Down
30 changes: 30 additions & 0 deletions pkg/controller/podautoscaler/scaler/apa.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ import (
"k8s.io/klog/v2"
)

const (
APALabelPrefix = "apa." + scalingcontext.AutoscalingLabelPrefix
upFluctuationToleranceLabel = APALabelPrefix + "up-fluctuation-tolerance"
downFluctuationToleranceLabel = APALabelPrefix + "down-fluctuation-tolerance"
)

// ApaScalingContext defines parameters for scaling decisions.
type ApaScalingContext struct {
scalingcontext.BaseScalingContext
Expand Down Expand Up @@ -85,6 +91,30 @@ func NewApaAutoscaler(readyPodsCount int, spec *ApaScalingContext) (*ApaAutoscal
}, nil
}

func (a *ApaScalingContext) UpdateByPaTypes(pa *autoscalingv1alpha1.PodAutoscaler) error {
err := a.BaseScalingContext.UpdateByPaTypes(pa)
if err != nil {
return err
}
for key, value := range pa.Labels {
switch key {
case upFluctuationToleranceLabel:
v, err := strconv.ParseFloat(value, 64)
if err != nil {
return err
}
a.UpFluctuationTolerance = v
case downFluctuationToleranceLabel:
v, err := strconv.ParseFloat(value, 64)
if err != nil {
return err
}
a.DownFluctuationTolerance = v
}
}
return nil
}

func (a *ApaAutoscaler) Scale(originalReadyPodsCount int, metricKey metrics.NamespaceNameMetric, now time.Time) ScaleResult {
spec, ok := a.GetScalingContext().(*ApaScalingContext)
if !ok {
Expand Down
53 changes: 53 additions & 0 deletions pkg/controller/podautoscaler/scaler/apa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import (
"testing"
"time"

autoscalingv1alpha1 "github.com/aibrix/aibrix/api/autoscaling/v1alpha1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/aibrix/aibrix/pkg/controller/podautoscaler/metrics"
)

Expand Down Expand Up @@ -67,3 +71,52 @@ func TestAPAScale(t *testing.T) {
t.Errorf("result should remain previous replica = %d, but got %d", readyPodCount, result.DesiredPodCount)
}
}

func TestApaUpdateContext(t *testing.T) {
pa := &autoscalingv1alpha1.PodAutoscaler{
Spec: autoscalingv1alpha1.PodAutoscalerSpec{
ScaleTargetRef: corev1.ObjectReference{
Kind: "Deployment",
Name: "example-deployment",
},
MinReplicas: nil, // expecting nil as default since it's a pointer and no value is assigned
MaxReplicas: 5,
TargetValue: "1",
TargetMetric: "test.metrics",
MetricsSources: []autoscalingv1alpha1.MetricSource{
{
Endpoint: "service1.example.com",
Path: "/api/metrics/cpu",
},
},
ScalingStrategy: "APA",
},
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"autoscaling.aibrix.ai/max-scale-up-rate": "32.1",
"autoscaling.aibrix.ai/max-scale-down-rate": "12.3",
"apa.autoscaling.aibrix.ai/up-fluctuation-tolerance": "1.2",
"apa.autoscaling.aibrix.ai/down-fluctuation-tolerance": "0.9",
},
},
}
apaSpec := NewApaScalingContext()
err := apaSpec.UpdateByPaTypes(pa)
if err != nil {
t.Errorf("Failed to update KpaScalingContext: %v", err)
}
if apaSpec.MaxScaleUpRate != 32.1 {
t.Errorf("expected MaxScaleDownRate = 32.1, got %f", apaSpec.MaxScaleDownRate)
}
if apaSpec.MaxScaleDownRate != 12.3 {
t.Errorf("expected MaxScaleDownRate = 12.3, got %f", apaSpec.MaxScaleDownRate)
}

if apaSpec.UpFluctuationTolerance != 1.2 {
t.Errorf("expected UpFluctuationTolerance = 1.2, got %f", apaSpec.UpFluctuationTolerance)
}
if apaSpec.DownFluctuationTolerance != 0.9 {
t.Errorf("expected DownFluctuationTolerance = 0.9, got %f", apaSpec.DownFluctuationTolerance)
}

}
77 changes: 69 additions & 8 deletions pkg/controller/podautoscaler/scaler/kpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ If the metric no longer exceeds the panic threshold, exit the panic mode.
*/

const (
KPALabelPrefix = "kpa." + scalingcontext.AutoscalingLabelPrefix
targetBurstCapacityLabel = KPALabelPrefix + "target-burst-capacity"
activationScaleLabel = KPALabelPrefix + "activation-scale"
panicThresholdLabel = KPALabelPrefix + "panic-threshold"
stableWindowLabel = KPALabelPrefix + "stable-window"
scaleDownDelayLabel = KPALabelPrefix + "scale-down-delay"
)

// KpaScalingContext defines parameters for scaling decisions.
type KpaScalingContext struct {
scalingcontext.BaseScalingContext
Expand Down Expand Up @@ -100,6 +109,57 @@ func NewKpaScalingContext() *KpaScalingContext {
}
}

func (k *KpaScalingContext) UpdateByPaTypes(pa *autoscalingv1alpha1.PodAutoscaler) error {
err := k.BaseScalingContext.UpdateByPaTypes(pa)
if err != nil {
return err
}
for key, value := range pa.Labels {
switch key {
case targetBurstCapacityLabel:
v, err := strconv.ParseFloat(value, 64)
if err != nil {
return err
}
k.TargetBurstCapacity = v
case activationScaleLabel:
v, err := strconv.ParseInt(value, 10, 32)
if err != nil {
return err
}
k.ActivationScale = int32(v)
case panicThresholdLabel:
v, err := strconv.ParseFloat(value, 64)
if err != nil {
return err
}
k.PanicThreshold = v
case stableWindowLabel:
v, err := time.ParseDuration(value)
if err != nil {
return err
}
k.StableWindow = v
case scaleDownDelayLabel:
v, err := time.ParseDuration(value)
if err != nil {
return err
}
k.ScaleDownDelay = v
}
}
// unset some attribute if there are no configuration
if _, exists := pa.Labels[scaleDownDelayLabel]; !exists {
// TODO N.B. three parts of KPAScaler are stateful : panic_window, stable_window and delay windows.
// reconcile() updates KpaScalingContext periodically, but doesn't reset these three parts.
// These three parts are only initialized when controller starts.
// Therefore, apply kpa.yaml cannot modify the panic_duration, stable_duration and delay_window duration
k.ScaleDownDelay = 0
}

return nil
}

type KpaAutoscaler struct {
specMux sync.RWMutex
metricClient metrics.MetricClient
Expand All @@ -122,10 +182,14 @@ func NewKpaAutoscaler(readyPodsCount int, spec *KpaScalingContext) (*KpaAutoscal
}

// Create a new delay window based on the ScaleDownDelay specified in the spec
if spec.ScaleDownDelay <= 0 {
if spec.ScaleDownDelay < 0 {
return nil, errors.New("ScaleDownDelay must be positive")
}
delayWindow := aggregation.NewTimeWindow(spec.ScaleDownDelay, 1*time.Second)
var delayWindow *aggregation.TimeWindow
// If specify ScaleDownDelay, KpaAutoscaler.delayWindow will be initialized
if spec.ScaleDownDelay > 0 {
delayWindow = aggregation.NewTimeWindow(spec.ScaleDownDelay, 1*time.Second)
}

// As KNative stated:
// We always start in the panic mode, if the deployment is scaled up over 1 pod.
Expand All @@ -144,6 +208,7 @@ func NewKpaAutoscaler(readyPodsCount int, spec *KpaScalingContext) (*KpaAutoscal
// TODO missing MetricClient
metricsFetcher := &metrics.RestMetricsFetcher{}
metricsClient := metrics.NewKPAMetricsClient(metricsFetcher)

scalingAlgorithm := algorithm.KpaScalingAlgorithm{}

return &KpaAutoscaler{
Expand Down Expand Up @@ -253,6 +318,7 @@ func (k *KpaAutoscaler) Scale(originalReadyPodsCount int, metricKey metrics.Name
// in that case).
klog.V(4).InfoS("DelayWindow details", "delayWindow", k.delayWindow.String())
if k.delayWindow != nil {
// the actual desiredPodCount will be recorded, but return the max replicas during passed delayWindow
k.delayWindow.Record(now, float64(desiredPodCount))
delayedPodCount, err := k.delayWindow.Max()
if err != nil {
Expand Down Expand Up @@ -319,15 +385,10 @@ func (k *KpaAutoscaler) UpdateSourceMetrics(ctx context.Context, metricKey metri
func (k *KpaAutoscaler) UpdateScalingContext(pa autoscalingv1alpha1.PodAutoscaler) error {
k.specMux.Lock()
defer k.specMux.Unlock()

targetValue, err := strconv.ParseFloat(pa.Spec.TargetValue, 64)
err := k.scalingContext.UpdateByPaTypes(&pa)
if err != nil {
klog.ErrorS(err, "Failed to parse target value", "targetValue", pa.Spec.TargetValue)
return err
}
k.scalingContext.TargetValue = targetValue
k.scalingContext.ScalingMetric = pa.Spec.TargetMetric

return nil
}

Expand Down
63 changes: 63 additions & 0 deletions pkg/controller/podautoscaler/scaler/kpa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import (
"testing"
"time"

autoscalingv1alpha1 "github.com/aibrix/aibrix/api/autoscaling/v1alpha1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/aibrix/aibrix/pkg/controller/podautoscaler/common"

"github.com/aibrix/aibrix/pkg/controller/podautoscaler/metrics"
Expand Down Expand Up @@ -70,3 +74,62 @@ func TestKpaScale(t *testing.T) {
t.Errorf("result.DesiredPodCount = 10, got %d", result.DesiredPodCount)
}
}

func TestKpaUpdateContext(t *testing.T) {
pa := &autoscalingv1alpha1.PodAutoscaler{
Spec: autoscalingv1alpha1.PodAutoscalerSpec{
ScaleTargetRef: corev1.ObjectReference{
Kind: "Deployment",
Name: "example-deployment",
},
MinReplicas: nil, // expecting nil as default since it's a pointer and no value is assigned
MaxReplicas: 5,
TargetValue: "1",
TargetMetric: "test.metrics",
MetricsSources: []autoscalingv1alpha1.MetricSource{
{
Endpoint: "service1.example.com",
Path: "/api/metrics/cpu",
},
},
ScalingStrategy: "KPA",
},
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"autoscaling.aibrix.ai/max-scale-up-rate": "32.1",
"autoscaling.aibrix.ai/max-scale-down-rate": "12.3",
"kpa.autoscaling.aibrix.ai/target-burst-capacity": "45.6",
"kpa.autoscaling.aibrix.ai/activation-scale": "3",
"kpa.autoscaling.aibrix.ai/panic-threshold": "2.5",
"kpa.autoscaling.aibrix.ai/stable-window": "60s",
"kpa.autoscaling.aibrix.ai/scale-down-delay": "30s",
},
},
}
kpaSpec := NewKpaScalingContext()
err := kpaSpec.UpdateByPaTypes(pa)
if err != nil {
t.Errorf("Failed to update KpaScalingContext: %v", err)
}
if kpaSpec.MaxScaleUpRate != 32.1 {
t.Errorf("expected MaxScaleDownRate = 32.1, got %f", kpaSpec.MaxScaleDownRate)
}
if kpaSpec.MaxScaleDownRate != 12.3 {
t.Errorf("expected MaxScaleDownRate = 12.3, got %f", kpaSpec.MaxScaleDownRate)
}
if kpaSpec.TargetBurstCapacity != 45.6 {
t.Errorf("expected TargetBurstCapacity = 45.6, got %f", kpaSpec.TargetBurstCapacity)
}
if kpaSpec.ActivationScale != 3 {
t.Errorf("expected ActivationScale = 3, got %d", kpaSpec.ActivationScale)
}
if kpaSpec.PanicThreshold != 2.5 {
t.Errorf("expected PanicThreshold = 2.5, got %f", kpaSpec.PanicThreshold)
}
if kpaSpec.StableWindow != 60*time.Second {
t.Errorf("expected StableWindow = 60s, got %v", kpaSpec.StableWindow)
}
if kpaSpec.ScaleDownDelay != 30*time.Second {
t.Errorf("expected ScaleDownDelay = 10s, got %v", kpaSpec.ScaleDownDelay)
}
}

0 comments on commit 8d25fb7

Please sign in to comment.