Skip to content

Commit

Permalink
Refactor Scaler: Resolve Issues with Metric Parameter Updates in Mult…
Browse files Browse the repository at this point in the history
…iple KPAs (#437)

refactor scaler init and update
  • Loading branch information
kr11 authored Nov 30, 2024
1 parent e15991b commit 25eb982
Show file tree
Hide file tree
Showing 9 changed files with 198 additions and 174 deletions.
6 changes: 5 additions & 1 deletion pkg/controller/podautoscaler/aggregation/window.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ type entry struct {
index int
}

func (e entry) String() string {
return fmt.Sprintf("{%d, %.2f}", e.index, e.value)
}

// window is a sliding window that keeps track of recent {size} values.
type window struct {
valueList []entry
Expand Down Expand Up @@ -125,7 +129,7 @@ func (w *window) String() string {

for i := 0; i < w.length; i++ {
idx := (w.first + i) % len(w.valueList)
sb.WriteString(fmt.Sprintf("%v", w.valueList[idx]))
sb.WriteString(fmt.Sprintf("%v", w.valueList[idx].String()))
if i < w.length-1 {
sb.WriteString(", ")
}
Expand Down
97 changes: 27 additions & 70 deletions pkg/controller/podautoscaler/metrics/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package metrics

import (
"context"
"fmt"
"sync"

corev1 "k8s.io/api/core/v1"
Expand All @@ -33,6 +32,7 @@ import (

const (
metricServerDefaultMetricWindow = time.Minute
paGranularity = time.Second
)

type KPAMetricsClient struct {
Expand All @@ -54,42 +54,28 @@ type KPAMetricsClient struct {
// are collected and processed within the sliding window.
granularity time.Duration
// the difference between stable and panic metrics is the time window range
panicWindowDict map[NamespaceNameMetric]*aggregation.TimeWindow
stableWindowDict map[NamespaceNameMetric]*aggregation.TimeWindow
panicWindow *aggregation.TimeWindow
stableWindow *aggregation.TimeWindow
}

var _ MetricClient = (*KPAMetricsClient)(nil)

// NewKPAMetricsClient initializes and returns a KPAMetricsClient with specified durations.
func NewKPAMetricsClient(fetcher MetricFetcher) *KPAMetricsClient {
func NewKPAMetricsClient(fetcher MetricFetcher, stableDuration time.Duration, panicDuration time.Duration) *KPAMetricsClient {
client := &KPAMetricsClient{
fetcher: fetcher,
stableDuration: 60 * time.Second,
panicDuration: 10 * time.Second,
granularity: time.Second,
panicWindowDict: make(map[NamespaceNameMetric]*aggregation.TimeWindow),
stableWindowDict: make(map[NamespaceNameMetric]*aggregation.TimeWindow),
fetcher: fetcher,
stableDuration: stableDuration,
panicDuration: panicDuration,
granularity: paGranularity,
panicWindow: aggregation.NewTimeWindow(panicDuration, paGranularity),
stableWindow: aggregation.NewTimeWindow(stableDuration, paGranularity),
}
return client
}

func (c *KPAMetricsClient) UpdateMetricIntoWindow(metricKey NamespaceNameMetric, now time.Time, metricValue float64) error {
// Add to panic and stable windows; create a new window if not present in the map
// Ensure that panicWindowDict and stableWindowDict maps are checked and updated
updateWindow := func(windowDict map[NamespaceNameMetric]*aggregation.TimeWindow, duration time.Duration) {
window, exists := windowDict[metricKey]
if !exists {
// Create a new TimeWindow if it does not exist
windowDict[metricKey] = aggregation.NewTimeWindow(duration, c.granularity)
window = windowDict[metricKey]
}
// Record the maximum metric value in the TimeWindow
window.Record(now, metricValue)
}

// Update panic and stable windows
updateWindow(c.panicWindowDict, c.panicDuration)
updateWindow(c.stableWindowDict, c.stableDuration)
func (c *KPAMetricsClient) UpdateMetricIntoWindow(now time.Time, metricValue float64) error {
c.panicWindow.Record(now, metricValue)
c.stableWindow.Record(now, metricValue)
return nil
}

Expand All @@ -112,7 +98,7 @@ func (c *KPAMetricsClient) UpdateMetrics(now time.Time, metricKey NamespaceNameM
defer c.collectionsMutex.Unlock()

// Update metrics into the window for tracking
err := c.UpdateMetricIntoWindow(metricKey, now, sumMetricValue)
err := c.UpdateMetricIntoWindow(now, sumMetricValue)
if err != nil {
return err
}
Expand All @@ -125,29 +111,19 @@ func (c *KPAMetricsClient) StableAndPanicMetrics(
c.collectionsMutex.RLock()
defer c.collectionsMutex.RUnlock()

panicWindow, exists := c.panicWindowDict[metricKey]
if !exists {
return -1, -1, fmt.Errorf("panic metrics %s not found", metricKey)
}

panicValue, err := panicWindow.Avg()
panicValue, err := c.panicWindow.Avg()
if err != nil {
return -1, -1, err
}

klog.InfoS("Get panicWindow", "metricKey", metricKey, "panicValue", panicValue, "panicWindow", panicWindow)
klog.InfoS("Get panicWindow", "metricKey", metricKey, "panicValue", panicValue, "panicWindow", c.panicWindow)

stableWindow, exists := c.stableWindowDict[metricKey]
if !exists {
return -1, -1, fmt.Errorf("stable metrics %s not found", metricKey)
}
stableValue, err := stableWindow.Avg()
stableValue, err := c.stableWindow.Avg()
if err != nil {
return -1, -1, err
}

klog.InfoS("Get stableWindow", "metricKey", metricKey, "stableValue", stableValue, "stableWindow", stableWindow)

klog.Infof("Get stableWindow: metricKey=%s, stableValue=%.2f, stableWindow=%v", metricKey, stableValue, c.stableWindow)
return stableValue, panicValue, nil
}

Expand Down Expand Up @@ -180,38 +156,24 @@ type APAMetricsClient struct {
// are collected and processed within the sliding window.
granularity time.Duration
// stable time window
windowDict map[NamespaceNameMetric]*aggregation.TimeWindow
window *aggregation.TimeWindow
}

var _ MetricClient = (*APAMetricsClient)(nil)

// NewAPAMetricsClient initializes and returns a KPAMetricsClient with specified durations.
func NewAPAMetricsClient(fetcher MetricFetcher) *APAMetricsClient {
func NewAPAMetricsClient(fetcher MetricFetcher, duration time.Duration) *APAMetricsClient {
client := &APAMetricsClient{
fetcher: fetcher,
duration: 60 * time.Second,
granularity: time.Second,
windowDict: make(map[NamespaceNameMetric]*aggregation.TimeWindow),
duration: duration,
granularity: paGranularity,
window: aggregation.NewTimeWindow(duration, paGranularity),
}
return client
}

func (c *APAMetricsClient) UpdateMetricIntoWindow(metricKey NamespaceNameMetric, now time.Time, metricValue float64) error {
// Add to metric window; create a new window if not present in the map
// Ensure that windowDict maps are checked and updated
updateWindow := func(windowDict map[NamespaceNameMetric]*aggregation.TimeWindow, duration time.Duration) {
window, exists := windowDict[metricKey]
if !exists {
// Create a new TimeWindow if it does not exist
windowDict[metricKey] = aggregation.NewTimeWindow(duration, c.granularity)
window = windowDict[metricKey]
}
// Record the maximum metric value in the TimeWindow
window.Record(now, metricValue)
}

// Update metrics windows
updateWindow(c.windowDict, c.duration)
func (c *APAMetricsClient) UpdateMetricIntoWindow(now time.Time, metricValue float64) error {
c.window.Record(now, metricValue)
return nil
}

Expand All @@ -230,7 +192,7 @@ func (c *APAMetricsClient) UpdateMetrics(now time.Time, metricKey NamespaceNameM
defer c.collectionsMutex.Unlock()

// Update metrics into the window for tracking
err := c.UpdateMetricIntoWindow(metricKey, now, sumMetricValue)
err := c.UpdateMetricIntoWindow(now, sumMetricValue)
if err != nil {
return err
}
Expand All @@ -243,12 +205,7 @@ func (c *APAMetricsClient) GetMetricValue(
c.collectionsMutex.RLock()
defer c.collectionsMutex.RUnlock()

window, exists := c.windowDict[metricKey]
if !exists {
return -1, fmt.Errorf("metrics %s not found", metricKey)
}

metricValue, err := window.Avg()
metricValue, err := c.window.Avg()
if err != nil {
return -1, err
}
Expand Down
75 changes: 37 additions & 38 deletions pkg/controller/podautoscaler/podautoscaler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,36 +74,9 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) {
Mapper: mgr.GetRESTMapper(),
resyncInterval: 10 * time.Second, // TODO: this should be override by an environment variable
eventCh: make(chan event.GenericEvent),
AutoscalerMap: make(map[metrics.NamespaceNameMetric]scaler.Scaler),
}

// initialize all kinds of autoscalers, such as KPA and APA.
kpaScaler, err := scaler.NewKpaAutoscaler(
0,
// TODO: The following parameters are specific to KPA.
// We use default values based on KNative settings to quickly establish a fully functional workflow.
// refer to https://github.com/knative/serving/blob/b6e6baa6dc6697d0e7ddb3a12925f329a1f5064c/config/core/configmaps/autoscaler.yaml#L27
scaler.NewKpaScalingContext(),
)
if err != nil {
return nil, err
}
klog.InfoS("Initialized CustomPA: KPA autoscaler successfully")

apaScaler, err := scaler.NewApaAutoscaler(
0,
// TODO: The following parameters are specific to APA.
// Adjust these parameters based on your requirements for APA.
scaler.NewApaScalingContext(),
)
if err != nil {
return nil, err
}
klog.InfoS("Initialized CustomPA: APA autoscaler successfully")

reconciler.AutoscalerMap = map[autoscalingv1alpha1.ScalingStrategyType]scaler.Scaler{
autoscalingv1alpha1.KPA: kpaScaler,
autoscalingv1alpha1.APA: apaScaler,
}
return reconciler, nil
}

Expand Down Expand Up @@ -146,7 +119,7 @@ type PodAutoscalerReconciler struct {
Scheme *runtime.Scheme
EventRecorder record.EventRecorder
Mapper apimeta.RESTMapper
AutoscalerMap map[autoscalingv1alpha1.ScalingStrategyType]scaler.Scaler
AutoscalerMap map[metrics.NamespaceNameMetric]scaler.Scaler // AutoscalerMap maps each NamespaceNameMetric to its corresponding scaler instance.
resyncInterval time.Duration
eventCh chan event.GenericEvent
}
Expand Down Expand Up @@ -600,6 +573,7 @@ func (r *PodAutoscalerReconciler) computeReplicasForMetrics(ctx context.Context,
return 0, "", currentTimestamp, fmt.Errorf("error getting ready pods count: %w", err)
}

// TODO UpdateScalingContext (in updateScalerSpec) is duplicate invoked in computeReplicasForMetrics and updateMetricsForScale
err = r.updateScalerSpec(ctx, pa)
if err != nil {
klog.ErrorS(err, "Failed to update scaler spec from pa_types")
Expand All @@ -615,7 +589,7 @@ func (r *PodAutoscalerReconciler) computeReplicasForMetrics(ctx context.Context,
}

// Calculate the desired number of pods using the autoscaler logic.
autoScaler, ok := r.AutoscalerMap[pa.Spec.ScalingStrategy]
autoScaler, ok := r.AutoscalerMap[metricKey]
if !ok {
return 0, "", currentTimestamp, fmt.Errorf("unsupported scaling strategy: %s", pa.Spec.ScalingStrategy)
}
Expand All @@ -630,10 +604,10 @@ func (r *PodAutoscalerReconciler) computeReplicasForMetrics(ctx context.Context,

// refer to knative-serving.
// In pkg/reconciler/autoscaling/kpa/kpa.go:198, kpa maintains a list of deciders into multi-scaler, each of them corresponds to a pa (PodAutoscaler).
// kpa create or update deciders in reconcile function.
// for now, we update the kpascaler.spec when reconciling before calling the Scale function, to make the pa information pass into the Scale algorithm.
// We create or update the scaler instance according to the pa passed in
func (r *PodAutoscalerReconciler) updateScalerSpec(ctx context.Context, pa autoscalingv1alpha1.PodAutoscaler) error {
autoScaler, ok := r.AutoscalerMap[pa.Spec.ScalingStrategy]
metricKey := NewNamespaceNameMetricByPa(pa)
autoScaler, ok := r.AutoscalerMap[metricKey]
if !ok {
return fmt.Errorf("unsupported scaling strategy: %s", pa.Spec.ScalingStrategy)
}
Expand All @@ -642,12 +616,38 @@ func (r *PodAutoscalerReconciler) updateScalerSpec(ctx context.Context, pa autos

func (r *PodAutoscalerReconciler) updateMetricsForScale(ctx context.Context, pa autoscalingv1alpha1.PodAutoscaler, scale *unstructured.Unstructured) (err error) {
currentTimestamp := time.Now()

autoScaler, ok := r.AutoscalerMap[pa.Spec.ScalingStrategy]
if !ok {
return fmt.Errorf("unsupported scaling strategy: %s", pa.Spec.ScalingStrategy)
metricKey := NewNamespaceNameMetricByPa(pa)
var autoScaler scaler.Scaler
autoScaler, exists := r.AutoscalerMap[metricKey]
if !exists {
klog.InfoS("Scaler not found, creating new scaler", "metricKey", metricKey, "type", pa.Spec.ScalingStrategy)

switch pa.Spec.ScalingStrategy {
case autoscalingv1alpha1.HPA:
// initialize all kinds of autoscalers, such as KPA and APA.
// TODO Currently, we initialize kpa with default config and allocate window with default length.
// We then reallocate window according to pa until UpdateScalingContext.
// it's not wrong, but we allocate window twice, to be optimized.
autoScaler, err = scaler.NewKpaAutoscaler(0, &pa)
case autoscalingv1alpha1.APA:
autoScaler, err = scaler.NewApaAutoscaler(0, &pa)
default:
return fmt.Errorf("unsupported scaling strategy: %s", pa.Spec.ScalingStrategy)
}
if err != nil {
return err
}
r.AutoscalerMap[metricKey] = autoScaler
klog.InfoS("New scaler added to AutoscalerMap", "metricKey", metricKey, "type", pa.Spec.ScalingStrategy, "spec", pa.Spec)
} else {
err := autoScaler.UpdateScalingContext(pa)
if err != nil {
klog.ErrorS(err, "update existed pa failed", "metricKey", metricKey, "type", pa.Spec.ScalingStrategy, "spec", pa.Spec)
return err
}
}

// update metrics
for _, source := range pa.Spec.MetricsSources {
metricKey := metrics.NewNamespaceNameMetric(pa.Namespace, pa.Spec.ScaleTargetRef.Name, source.Name)
return autoScaler.UpdateSourceMetrics(ctx, metricKey, source, currentTimestamp)
Expand All @@ -669,7 +669,6 @@ func (r *PodAutoscalerReconciler) updateMetricsForScale(ctx context.Context, pa

// TODO: do we need to indicate the metrics source.
// Technically, the metrics could come from Kubernetes metrics API (resource or custom), pod prometheus endpoint or ai runtime
metricKey := metrics.NewNamespaceNameMetric(pa.Namespace, pa.Spec.ScaleTargetRef.Name, pa.Spec.TargetMetric)

// Update targets
if err := autoScaler.UpdateScaleTargetMetrics(ctx, metricKey, podList.Items, currentTimestamp); err != nil {
Expand Down
Loading

0 comments on commit 25eb982

Please sign in to comment.