diff --git a/pkg/controller/podautoscaler/aggregation/window.go b/pkg/controller/podautoscaler/aggregation/window.go index 228ac2a7..eba635bb 100644 --- a/pkg/controller/podautoscaler/aggregation/window.go +++ b/pkg/controller/podautoscaler/aggregation/window.go @@ -34,6 +34,10 @@ type entry struct { index int } +func (e entry) String() string { + return fmt.Sprintf("{%d, %.2f}", e.index, e.value) +} + // window is a sliding window that keeps track of recent {size} values. type window struct { valueList []entry @@ -125,7 +129,7 @@ func (w *window) String() string { for i := 0; i < w.length; i++ { idx := (w.first + i) % len(w.valueList) - sb.WriteString(fmt.Sprintf("%v", w.valueList[idx])) + sb.WriteString(fmt.Sprintf("%v", w.valueList[idx].String())) if i < w.length-1 { sb.WriteString(", ") } diff --git a/pkg/controller/podautoscaler/metrics/client.go b/pkg/controller/podautoscaler/metrics/client.go index 160038d4..6e198710 100644 --- a/pkg/controller/podautoscaler/metrics/client.go +++ b/pkg/controller/podautoscaler/metrics/client.go @@ -18,7 +18,6 @@ package metrics import ( "context" - "fmt" "sync" corev1 "k8s.io/api/core/v1" @@ -33,6 +32,7 @@ import ( const ( metricServerDefaultMetricWindow = time.Minute + paGranularity = time.Second ) type KPAMetricsClient struct { @@ -54,42 +54,28 @@ type KPAMetricsClient struct { // are collected and processed within the sliding window. granularity time.Duration // the difference between stable and panic metrics is the time window range - panicWindowDict map[NamespaceNameMetric]*aggregation.TimeWindow - stableWindowDict map[NamespaceNameMetric]*aggregation.TimeWindow + panicWindow *aggregation.TimeWindow + stableWindow *aggregation.TimeWindow } var _ MetricClient = (*KPAMetricsClient)(nil) // NewKPAMetricsClient initializes and returns a KPAMetricsClient with specified durations. -func NewKPAMetricsClient(fetcher MetricFetcher) *KPAMetricsClient { +func NewKPAMetricsClient(fetcher MetricFetcher, stableDuration time.Duration, panicDuration time.Duration) *KPAMetricsClient { client := &KPAMetricsClient{ - fetcher: fetcher, - stableDuration: 60 * time.Second, - panicDuration: 10 * time.Second, - granularity: time.Second, - panicWindowDict: make(map[NamespaceNameMetric]*aggregation.TimeWindow), - stableWindowDict: make(map[NamespaceNameMetric]*aggregation.TimeWindow), + fetcher: fetcher, + stableDuration: stableDuration, + panicDuration: panicDuration, + granularity: paGranularity, + panicWindow: aggregation.NewTimeWindow(panicDuration, paGranularity), + stableWindow: aggregation.NewTimeWindow(stableDuration, paGranularity), } return client } -func (c *KPAMetricsClient) UpdateMetricIntoWindow(metricKey NamespaceNameMetric, now time.Time, metricValue float64) error { - // Add to panic and stable windows; create a new window if not present in the map - // Ensure that panicWindowDict and stableWindowDict maps are checked and updated - updateWindow := func(windowDict map[NamespaceNameMetric]*aggregation.TimeWindow, duration time.Duration) { - window, exists := windowDict[metricKey] - if !exists { - // Create a new TimeWindow if it does not exist - windowDict[metricKey] = aggregation.NewTimeWindow(duration, c.granularity) - window = windowDict[metricKey] - } - // Record the maximum metric value in the TimeWindow - window.Record(now, metricValue) - } - - // Update panic and stable windows - updateWindow(c.panicWindowDict, c.panicDuration) - updateWindow(c.stableWindowDict, c.stableDuration) +func (c *KPAMetricsClient) UpdateMetricIntoWindow(now time.Time, metricValue float64) error { + c.panicWindow.Record(now, metricValue) + c.stableWindow.Record(now, metricValue) return nil } @@ -112,7 +98,7 @@ func (c *KPAMetricsClient) UpdateMetrics(now time.Time, metricKey NamespaceNameM defer c.collectionsMutex.Unlock() // Update metrics into the window for tracking - err := c.UpdateMetricIntoWindow(metricKey, now, sumMetricValue) + err := c.UpdateMetricIntoWindow(now, sumMetricValue) if err != nil { return err } @@ -125,29 +111,19 @@ func (c *KPAMetricsClient) StableAndPanicMetrics( c.collectionsMutex.RLock() defer c.collectionsMutex.RUnlock() - panicWindow, exists := c.panicWindowDict[metricKey] - if !exists { - return -1, -1, fmt.Errorf("panic metrics %s not found", metricKey) - } - - panicValue, err := panicWindow.Avg() + panicValue, err := c.panicWindow.Avg() if err != nil { return -1, -1, err } - klog.InfoS("Get panicWindow", "metricKey", metricKey, "panicValue", panicValue, "panicWindow", panicWindow) + klog.InfoS("Get panicWindow", "metricKey", metricKey, "panicValue", panicValue, "panicWindow", c.panicWindow) - stableWindow, exists := c.stableWindowDict[metricKey] - if !exists { - return -1, -1, fmt.Errorf("stable metrics %s not found", metricKey) - } - stableValue, err := stableWindow.Avg() + stableValue, err := c.stableWindow.Avg() if err != nil { return -1, -1, err } - klog.InfoS("Get stableWindow", "metricKey", metricKey, "stableValue", stableValue, "stableWindow", stableWindow) - + klog.Infof("Get stableWindow: metricKey=%s, stableValue=%.2f, stableWindow=%v", metricKey, stableValue, c.stableWindow) return stableValue, panicValue, nil } @@ -180,38 +156,24 @@ type APAMetricsClient struct { // are collected and processed within the sliding window. granularity time.Duration // stable time window - windowDict map[NamespaceNameMetric]*aggregation.TimeWindow + window *aggregation.TimeWindow } var _ MetricClient = (*APAMetricsClient)(nil) // NewAPAMetricsClient initializes and returns a KPAMetricsClient with specified durations. -func NewAPAMetricsClient(fetcher MetricFetcher) *APAMetricsClient { +func NewAPAMetricsClient(fetcher MetricFetcher, duration time.Duration) *APAMetricsClient { client := &APAMetricsClient{ fetcher: fetcher, - duration: 60 * time.Second, - granularity: time.Second, - windowDict: make(map[NamespaceNameMetric]*aggregation.TimeWindow), + duration: duration, + granularity: paGranularity, + window: aggregation.NewTimeWindow(duration, paGranularity), } return client } -func (c *APAMetricsClient) UpdateMetricIntoWindow(metricKey NamespaceNameMetric, now time.Time, metricValue float64) error { - // Add to metric window; create a new window if not present in the map - // Ensure that windowDict maps are checked and updated - updateWindow := func(windowDict map[NamespaceNameMetric]*aggregation.TimeWindow, duration time.Duration) { - window, exists := windowDict[metricKey] - if !exists { - // Create a new TimeWindow if it does not exist - windowDict[metricKey] = aggregation.NewTimeWindow(duration, c.granularity) - window = windowDict[metricKey] - } - // Record the maximum metric value in the TimeWindow - window.Record(now, metricValue) - } - - // Update metrics windows - updateWindow(c.windowDict, c.duration) +func (c *APAMetricsClient) UpdateMetricIntoWindow(now time.Time, metricValue float64) error { + c.window.Record(now, metricValue) return nil } @@ -230,7 +192,7 @@ func (c *APAMetricsClient) UpdateMetrics(now time.Time, metricKey NamespaceNameM defer c.collectionsMutex.Unlock() // Update metrics into the window for tracking - err := c.UpdateMetricIntoWindow(metricKey, now, sumMetricValue) + err := c.UpdateMetricIntoWindow(now, sumMetricValue) if err != nil { return err } @@ -243,12 +205,7 @@ func (c *APAMetricsClient) GetMetricValue( c.collectionsMutex.RLock() defer c.collectionsMutex.RUnlock() - window, exists := c.windowDict[metricKey] - if !exists { - return -1, fmt.Errorf("metrics %s not found", metricKey) - } - - metricValue, err := window.Avg() + metricValue, err := c.window.Avg() if err != nil { return -1, err } diff --git a/pkg/controller/podautoscaler/podautoscaler_controller.go b/pkg/controller/podautoscaler/podautoscaler_controller.go index 87dc35de..9a2bf520 100644 --- a/pkg/controller/podautoscaler/podautoscaler_controller.go +++ b/pkg/controller/podautoscaler/podautoscaler_controller.go @@ -74,36 +74,9 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) { Mapper: mgr.GetRESTMapper(), resyncInterval: 10 * time.Second, // TODO: this should be override by an environment variable eventCh: make(chan event.GenericEvent), + AutoscalerMap: make(map[metrics.NamespaceNameMetric]scaler.Scaler), } - // initialize all kinds of autoscalers, such as KPA and APA. - kpaScaler, err := scaler.NewKpaAutoscaler( - 0, - // TODO: The following parameters are specific to KPA. - // We use default values based on KNative settings to quickly establish a fully functional workflow. - // refer to https://github.com/knative/serving/blob/b6e6baa6dc6697d0e7ddb3a12925f329a1f5064c/config/core/configmaps/autoscaler.yaml#L27 - scaler.NewKpaScalingContext(), - ) - if err != nil { - return nil, err - } - klog.InfoS("Initialized CustomPA: KPA autoscaler successfully") - - apaScaler, err := scaler.NewApaAutoscaler( - 0, - // TODO: The following parameters are specific to APA. - // Adjust these parameters based on your requirements for APA. - scaler.NewApaScalingContext(), - ) - if err != nil { - return nil, err - } - klog.InfoS("Initialized CustomPA: APA autoscaler successfully") - - reconciler.AutoscalerMap = map[autoscalingv1alpha1.ScalingStrategyType]scaler.Scaler{ - autoscalingv1alpha1.KPA: kpaScaler, - autoscalingv1alpha1.APA: apaScaler, - } return reconciler, nil } @@ -146,7 +119,7 @@ type PodAutoscalerReconciler struct { Scheme *runtime.Scheme EventRecorder record.EventRecorder Mapper apimeta.RESTMapper - AutoscalerMap map[autoscalingv1alpha1.ScalingStrategyType]scaler.Scaler + AutoscalerMap map[metrics.NamespaceNameMetric]scaler.Scaler // AutoscalerMap maps each NamespaceNameMetric to its corresponding scaler instance. resyncInterval time.Duration eventCh chan event.GenericEvent } @@ -600,6 +573,7 @@ func (r *PodAutoscalerReconciler) computeReplicasForMetrics(ctx context.Context, return 0, "", currentTimestamp, fmt.Errorf("error getting ready pods count: %w", err) } + // TODO UpdateScalingContext (in updateScalerSpec) is duplicate invoked in computeReplicasForMetrics and updateMetricsForScale err = r.updateScalerSpec(ctx, pa) if err != nil { klog.ErrorS(err, "Failed to update scaler spec from pa_types") @@ -615,7 +589,7 @@ func (r *PodAutoscalerReconciler) computeReplicasForMetrics(ctx context.Context, } // Calculate the desired number of pods using the autoscaler logic. - autoScaler, ok := r.AutoscalerMap[pa.Spec.ScalingStrategy] + autoScaler, ok := r.AutoscalerMap[metricKey] if !ok { return 0, "", currentTimestamp, fmt.Errorf("unsupported scaling strategy: %s", pa.Spec.ScalingStrategy) } @@ -630,10 +604,10 @@ func (r *PodAutoscalerReconciler) computeReplicasForMetrics(ctx context.Context, // refer to knative-serving. // In pkg/reconciler/autoscaling/kpa/kpa.go:198, kpa maintains a list of deciders into multi-scaler, each of them corresponds to a pa (PodAutoscaler). -// kpa create or update deciders in reconcile function. -// for now, we update the kpascaler.spec when reconciling before calling the Scale function, to make the pa information pass into the Scale algorithm. +// We create or update the scaler instance according to the pa passed in func (r *PodAutoscalerReconciler) updateScalerSpec(ctx context.Context, pa autoscalingv1alpha1.PodAutoscaler) error { - autoScaler, ok := r.AutoscalerMap[pa.Spec.ScalingStrategy] + metricKey := NewNamespaceNameMetricByPa(pa) + autoScaler, ok := r.AutoscalerMap[metricKey] if !ok { return fmt.Errorf("unsupported scaling strategy: %s", pa.Spec.ScalingStrategy) } @@ -642,12 +616,38 @@ func (r *PodAutoscalerReconciler) updateScalerSpec(ctx context.Context, pa autos func (r *PodAutoscalerReconciler) updateMetricsForScale(ctx context.Context, pa autoscalingv1alpha1.PodAutoscaler, scale *unstructured.Unstructured) (err error) { currentTimestamp := time.Now() - - autoScaler, ok := r.AutoscalerMap[pa.Spec.ScalingStrategy] - if !ok { - return fmt.Errorf("unsupported scaling strategy: %s", pa.Spec.ScalingStrategy) + metricKey := NewNamespaceNameMetricByPa(pa) + var autoScaler scaler.Scaler + autoScaler, exists := r.AutoscalerMap[metricKey] + if !exists { + klog.InfoS("Scaler not found, creating new scaler", "metricKey", metricKey, "type", pa.Spec.ScalingStrategy) + + switch pa.Spec.ScalingStrategy { + case autoscalingv1alpha1.HPA: + // initialize all kinds of autoscalers, such as KPA and APA. + // TODO Currently, we initialize kpa with default config and allocate window with default length. + // We then reallocate window according to pa until UpdateScalingContext. + // it's not wrong, but we allocate window twice, to be optimized. + autoScaler, err = scaler.NewKpaAutoscaler(0, &pa) + case autoscalingv1alpha1.APA: + autoScaler, err = scaler.NewApaAutoscaler(0, &pa) + default: + return fmt.Errorf("unsupported scaling strategy: %s", pa.Spec.ScalingStrategy) + } + if err != nil { + return err + } + r.AutoscalerMap[metricKey] = autoScaler + klog.InfoS("New scaler added to AutoscalerMap", "metricKey", metricKey, "type", pa.Spec.ScalingStrategy, "spec", pa.Spec) + } else { + err := autoScaler.UpdateScalingContext(pa) + if err != nil { + klog.ErrorS(err, "update existed pa failed", "metricKey", metricKey, "type", pa.Spec.ScalingStrategy, "spec", pa.Spec) + return err + } } + // update metrics for _, source := range pa.Spec.MetricsSources { metricKey := metrics.NewNamespaceNameMetric(pa.Namespace, pa.Spec.ScaleTargetRef.Name, source.Name) return autoScaler.UpdateSourceMetrics(ctx, metricKey, source, currentTimestamp) @@ -669,7 +669,6 @@ func (r *PodAutoscalerReconciler) updateMetricsForScale(ctx context.Context, pa // TODO: do we need to indicate the metrics source. // Technically, the metrics could come from Kubernetes metrics API (resource or custom), pod prometheus endpoint or ai runtime - metricKey := metrics.NewNamespaceNameMetric(pa.Namespace, pa.Spec.ScaleTargetRef.Name, pa.Spec.TargetMetric) // Update targets if err := autoScaler.UpdateScaleTargetMetrics(ctx, metricKey, podList.Items, currentTimestamp); err != nil { diff --git a/pkg/controller/podautoscaler/scaler/apa.go b/pkg/controller/podautoscaler/scaler/apa.go index 9064ff0b..484a1565 100644 --- a/pkg/controller/podautoscaler/scaler/apa.go +++ b/pkg/controller/podautoscaler/scaler/apa.go @@ -51,6 +51,8 @@ type ApaScalingContext struct { // UpFluctuationTolerance represents the threshold before scaling down, // which means no scaling down will occur unless the currentMetricValue is less than the TargetValue by more than UpFluctuationTolerance DownFluctuationTolerance float64 + // metric window length + Window time.Duration } // NewApaScalingContext references KPA and sets up a default configuration. @@ -59,9 +61,20 @@ func NewApaScalingContext() *ApaScalingContext { BaseScalingContext: *scalingcontext.NewBaseScalingContext(), UpFluctuationTolerance: 0.1, // Tolerance for scaling up, set at 10% DownFluctuationTolerance: 0.2, // Tolerance for scaling up, set at 10% + Window: time.Second * 60, } } +// NewApaScalingContextByPa initializes ApaScalingContext by passed-in PodAutoscaler description +func NewApaScalingContextByPa(pa *autoscalingv1alpha1.PodAutoscaler) (*ApaScalingContext, error) { + res := NewApaScalingContext() + err := res.UpdateByPaTypes(pa) + if err != nil { + return nil, err + } + return res, nil +} + var _ common.ScalingContext = (*ApaScalingContext)(nil) type ApaAutoscaler struct { @@ -79,9 +92,14 @@ type ApaAutoscaler struct { var _ Scaler = (*ApaAutoscaler)(nil) // NewApaAutoscaler Initialize ApaAutoscaler -func NewApaAutoscaler(readyPodsCount int, spec *ApaScalingContext) (*ApaAutoscaler, error) { +func NewApaAutoscaler(readyPodsCount int, pa *autoscalingv1alpha1.PodAutoscaler) (*ApaAutoscaler, error) { + spec, err := NewApaScalingContextByPa(pa) + if err != nil { + return nil, err + } + metricsFetcher := &metrics.RestMetricsFetcher{} - metricsClient := metrics.NewAPAMetricsClient(metricsFetcher) + metricsClient := metrics.NewAPAMetricsClient(metricsFetcher, spec.Window) scalingAlgorithm := algorithm.ApaScalingAlgorithm{} return &ApaAutoscaler{ @@ -170,14 +188,19 @@ func (a *ApaAutoscaler) UpdateScalingContext(pa autoscalingv1alpha1.PodAutoscale a.specMux.Lock() defer a.specMux.Unlock() - targetValue, err := strconv.ParseFloat(pa.Spec.TargetValue, 64) + // update context and check configuration restraint. + // N.B. for now, we forbid update the config related to the stateful attribute, like window length. + updatedSpec, err := NewApaScalingContextByPa(&pa) if err != nil { - klog.ErrorS(err, "Failed to parse target value", "targetValue", pa.Spec.TargetValue) return err } - a.scalingContext.TargetValue = targetValue - a.scalingContext.ScalingMetric = pa.Spec.TargetMetric - + // check apa spec: panic window, stable window and delaywindow + rawSpec := a.scalingContext + if updatedSpec.Window != rawSpec.Window { + klog.Warningf("For APA, updating the Window (%v) is not allowed. Keep the original value (%v)", updatedSpec.Window, rawSpec.Window) + updatedSpec.Window = rawSpec.Window + } + a.scalingContext = updatedSpec return nil } diff --git a/pkg/controller/podautoscaler/scaler/apa_test.go b/pkg/controller/podautoscaler/scaler/apa_test.go index 8a728c52..c48342fa 100644 --- a/pkg/controller/podautoscaler/scaler/apa_test.go +++ b/pkg/controller/podautoscaler/scaler/apa_test.go @@ -20,6 +20,8 @@ import ( "testing" "time" + "github.com/aibrix/aibrix/pkg/controller/podautoscaler/algorithm" + autoscalingv1alpha1 "github.com/aibrix/aibrix/api/autoscaling/v1alpha1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -33,24 +35,24 @@ func TestAPAScale(t *testing.T) { t.Skip("Skipping this test") readyPodCount := 5 + spec := NewApaScalingContext() metricsFetcher := &metrics.RestMetricsFetcher{} - kpaMetricsClient := metrics.NewKPAMetricsClient(metricsFetcher) + apaMetricsClient := metrics.NewAPAMetricsClient(metricsFetcher, spec.Window) now := time.Now() metricKey := metrics.NewNamespaceNameMetric("test_ns", "llama-70b", "ttot") - _ = kpaMetricsClient.UpdateMetricIntoWindow(metricKey, now.Add(-60*time.Second), 10.0) - _ = kpaMetricsClient.UpdateMetricIntoWindow(metricKey, now.Add(-50*time.Second), 11.0) - _ = kpaMetricsClient.UpdateMetricIntoWindow(metricKey, now.Add(-40*time.Second), 12.0) - _ = kpaMetricsClient.UpdateMetricIntoWindow(metricKey, now.Add(-30*time.Second), 13.0) - _ = kpaMetricsClient.UpdateMetricIntoWindow(metricKey, now.Add(-20*time.Second), 14.0) - _ = kpaMetricsClient.UpdateMetricIntoWindow(metricKey, now.Add(-10*time.Second), 100.0) - - apaScaler, err := NewApaAutoscaler(readyPodCount, - &ApaScalingContext{}, - ) - apaScaler.metricClient = kpaMetricsClient - if err != nil { - t.Errorf("Failed to create KpaAutoscaler: %v", err) + _ = apaMetricsClient.UpdateMetricIntoWindow(now.Add(-60*time.Second), 10.0) + _ = apaMetricsClient.UpdateMetricIntoWindow(now.Add(-50*time.Second), 11.0) + _ = apaMetricsClient.UpdateMetricIntoWindow(now.Add(-40*time.Second), 12.0) + _ = apaMetricsClient.UpdateMetricIntoWindow(now.Add(-30*time.Second), 13.0) + _ = apaMetricsClient.UpdateMetricIntoWindow(now.Add(-20*time.Second), 14.0) + _ = apaMetricsClient.UpdateMetricIntoWindow(now.Add(-10*time.Second), 100.0) + + apaScaler := ApaAutoscaler{ + metricClient: apaMetricsClient, + algorithm: &algorithm.ApaScalingAlgorithm{}, + scalingContext: spec, } + apaScaler.metricClient = apaMetricsClient ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() diff --git a/pkg/controller/podautoscaler/scaler/kpa.go b/pkg/controller/podautoscaler/scaler/kpa.go index c0453c95..29c83091 100644 --- a/pkg/controller/podautoscaler/scaler/kpa.go +++ b/pkg/controller/podautoscaler/scaler/kpa.go @@ -90,6 +90,8 @@ type KpaScalingContext struct { PanicThreshold float64 // StableWindow is needed to determine when to exit panic mode. StableWindow time.Duration + // PanicWindow is needed to determine when to exit panic mode. + PanicWindow time.Duration // ScaleDownDelay is the time that must pass at reduced concurrency before a // scale-down decision is applied. ScaleDownDelay time.Duration @@ -105,10 +107,21 @@ func NewKpaScalingContext() *KpaScalingContext { ActivationScale: 1, // Initial scaling factor upon activation PanicThreshold: 2.0, // Panic threshold set at 200% to trigger rapid scaling StableWindow: 60 * time.Second, // Time window to stabilize before altering scale + PanicWindow: 10 * time.Second, // Time window to stabilize before altering scale ScaleDownDelay: 30 * time.Minute, // Delay before scaling down to avoid flapping } } +// NewKpaScalingContextByPa initializes KpaScalingContext by passed-in PodAutoscaler description +func NewKpaScalingContextByPa(pa *autoscalingv1alpha1.PodAutoscaler) (*KpaScalingContext, error) { + res := NewKpaScalingContext() + err := res.UpdateByPaTypes(pa) + if err != nil { + return nil, err + } + return res, nil +} + func (k *KpaScalingContext) UpdateByPaTypes(pa *autoscalingv1alpha1.PodAutoscaler) error { err := k.BaseScalingContext.UpdateByPaTypes(pa) if err != nil { @@ -148,14 +161,6 @@ func (k *KpaScalingContext) UpdateByPaTypes(pa *autoscalingv1alpha1.PodAutoscale k.ScaleDownDelay = v } } - // unset some attribute if there are no configuration - if _, exists := pa.Labels[scaleDownDelayLabel]; !exists { - // TODO N.B. three parts of KPAScaler are stateful : panic_window, stable_window and delay windows. - // reconcile() updates KpaScalingContext periodically, but doesn't reset these three parts. - // These three parts are only initialized when controller starts. - // Therefore, apply kpa.yaml cannot modify the panic_duration, stable_duration and delay_window duration - k.ScaleDownDelay = 0 - } return nil } @@ -176,9 +181,10 @@ type KpaAutoscaler struct { var _ Scaler = (*KpaAutoscaler)(nil) // NewKpaAutoscaler Initialize KpaAutoscaler: Referenced from `knative/pkg/autoscaler/scaling/autoscaler.go newAutoscaler` -func NewKpaAutoscaler(readyPodsCount int, spec *KpaScalingContext) (*KpaAutoscaler, error) { - if spec == nil { - return nil, errors.New("spec cannot be nil") +func NewKpaAutoscaler(readyPodsCount int, pa *autoscalingv1alpha1.PodAutoscaler) (*KpaAutoscaler, error) { + spec, err := NewKpaScalingContextByPa(pa) + if err != nil { + return nil, err } // Create a new delay window based on the ScaleDownDelay specified in the spec @@ -207,7 +213,7 @@ func NewKpaAutoscaler(readyPodsCount int, spec *KpaScalingContext) (*KpaAutoscal // TODO missing MetricClient metricsFetcher := &metrics.RestMetricsFetcher{} - metricsClient := metrics.NewKPAMetricsClient(metricsFetcher) + metricsClient := metrics.NewKPAMetricsClient(metricsFetcher, spec.StableWindow, spec.PanicWindow) scalingAlgorithm := algorithm.KpaScalingAlgorithm{} @@ -270,7 +276,7 @@ func (k *KpaAutoscaler) Scale(originalReadyPodsCount int, metricKey metrics.Name klog.V(4).InfoS("--- KPA Details", "readyPodsCount", readyPodsCount, "MaxScaleUpRate", spec.MaxScaleUpRate, "MaxScaleDownRate", spec.MaxScaleDownRate, "TargetValue", spec.TargetValue, "PanicThreshold", spec.PanicThreshold, - "StableWindow", spec.StableWindow, "ScaleDownDelay", spec.ScaleDownDelay, + "StableWindow", spec.StableWindow, "PanicWindow", spec.PanicWindow, "ScaleDownDelay", spec.ScaleDownDelay, "dppc", dppc, "dspc", dspc, "desiredStablePodCount", desiredStablePodCount, "PanicThreshold", spec.PanicThreshold, "isOverPanicThreshold", isOverPanicThreshold, ) @@ -385,10 +391,27 @@ func (k *KpaAutoscaler) UpdateSourceMetrics(ctx context.Context, metricKey metri func (k *KpaAutoscaler) UpdateScalingContext(pa autoscalingv1alpha1.PodAutoscaler) error { k.specMux.Lock() defer k.specMux.Unlock() - err := k.scalingContext.UpdateByPaTypes(&pa) + // update context and check configuration restraint. + // N.B. for now, we forbid update the config related to the stateful attribute, like window length. + updatedSpec, err := NewKpaScalingContextByPa(&pa) if err != nil { return err } + // check kpa spec: panic window, stable window and delaywindow + rawSpec := k.scalingContext + if updatedSpec.PanicWindow != rawSpec.PanicWindow { + klog.Warningf("For KPA, updating the PanicWindow (%v) is not allowed. Keep the original value (%v)", updatedSpec.PanicWindow, rawSpec.PanicWindow) + updatedSpec.PanicWindow = rawSpec.PanicWindow + } + if updatedSpec.StableWindow != rawSpec.StableWindow { + klog.Warningf("For KPA, updating the StableWindow (%v) is not allowed. Keep the original value (%v)", updatedSpec.StableWindow, rawSpec.StableWindow) + updatedSpec.StableWindow = rawSpec.StableWindow + } + if updatedSpec.ScaleDownDelay != rawSpec.ScaleDownDelay { + klog.Warningf("For KPA, updating the ScaleDownDelay (%v) is not allowed. Keep the original value (%v)", updatedSpec.ScaleDownDelay, rawSpec.ScaleDownDelay) + updatedSpec.ScaleDownDelay = rawSpec.ScaleDownDelay + } + k.scalingContext = updatedSpec return nil } diff --git a/pkg/controller/podautoscaler/scaler/kpa_test.go b/pkg/controller/podautoscaler/scaler/kpa_test.go index 75838c1c..5e5635d8 100644 --- a/pkg/controller/podautoscaler/scaler/kpa_test.go +++ b/pkg/controller/podautoscaler/scaler/kpa_test.go @@ -20,12 +20,14 @@ import ( "testing" "time" + "github.com/aibrix/aibrix/pkg/controller/podautoscaler/aggregation" + "github.com/aibrix/aibrix/pkg/controller/podautoscaler/algorithm" + scalingcontext "github.com/aibrix/aibrix/pkg/controller/podautoscaler/common" + autoscalingv1alpha1 "github.com/aibrix/aibrix/api/autoscaling/v1alpha1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "github.com/aibrix/aibrix/pkg/controller/podautoscaler/common" - "github.com/aibrix/aibrix/pkg/controller/podautoscaler/metrics" ) @@ -35,36 +37,42 @@ import ( // and surpassing the PanicThreshold, the system should enter panic mode and scale up to 10 replicas. func TestKpaScale(t *testing.T) { readyPodCount := 5 + spec := KpaScalingContext{ + BaseScalingContext: scalingcontext.BaseScalingContext{ + MaxScaleUpRate: 2, + MaxScaleDownRate: 2, + ScalingMetric: "ttot", + TargetValue: 10, + TotalValue: 500, + }, + TargetBurstCapacity: 2.0, + ActivationScale: 2, + PanicThreshold: 2.0, + StableWindow: 60 * time.Second, + PanicWindow: 10 * time.Second, + ScaleDownDelay: 30 * time.Minute, + } metricsFetcher := &metrics.RestMetricsFetcher{} - kpaMetricsClient := metrics.NewKPAMetricsClient(metricsFetcher) + kpaMetricsClient := metrics.NewKPAMetricsClient(metricsFetcher, spec.StableWindow, spec.PanicWindow) now := time.Now() - metricKey := metrics.NewNamespaceNameMetric("test_ns", "llama-70b", "ttot") - _ = kpaMetricsClient.UpdateMetricIntoWindow(metricKey, now.Add(-60*time.Second), 10.0) - _ = kpaMetricsClient.UpdateMetricIntoWindow(metricKey, now.Add(-50*time.Second), 11.0) - _ = kpaMetricsClient.UpdateMetricIntoWindow(metricKey, now.Add(-40*time.Second), 12.0) - _ = kpaMetricsClient.UpdateMetricIntoWindow(metricKey, now.Add(-30*time.Second), 13.0) - _ = kpaMetricsClient.UpdateMetricIntoWindow(metricKey, now.Add(-20*time.Second), 14.0) - _ = kpaMetricsClient.UpdateMetricIntoWindow(metricKey, now.Add(-10*time.Second), 100.0) + metricKey := metrics.NewNamespaceNameMetric("test_ns", "llama-70b", spec.ScalingMetric) + _ = kpaMetricsClient.UpdateMetricIntoWindow(now.Add(-60*time.Second), 10.0) + _ = kpaMetricsClient.UpdateMetricIntoWindow(now.Add(-50*time.Second), 11.0) + _ = kpaMetricsClient.UpdateMetricIntoWindow(now.Add(-40*time.Second), 12.0) + _ = kpaMetricsClient.UpdateMetricIntoWindow(now.Add(-30*time.Second), 13.0) + _ = kpaMetricsClient.UpdateMetricIntoWindow(now.Add(-20*time.Second), 14.0) + _ = kpaMetricsClient.UpdateMetricIntoWindow(now.Add(-10*time.Second), 100.0) - kpaScaler, err := NewKpaAutoscaler(readyPodCount, - &KpaScalingContext{ - BaseScalingContext: common.BaseScalingContext{ - MaxScaleUpRate: 2, - MaxScaleDownRate: 2, - ScalingMetric: metricKey.MetricName, - TargetValue: 10, - TotalValue: 500, - }, - PanicThreshold: 2.0, - StableWindow: 60 * time.Second, - ScaleDownDelay: 10 * time.Second, - ActivationScale: 2, - }, - ) - kpaScaler.metricClient = kpaMetricsClient - if err != nil { - t.Errorf("Failed to create KpaAutoscaler: %v", err) + kpaScaler := KpaAutoscaler{ + metricClient: kpaMetricsClient, + panicTime: now, + maxPanicPods: int32(readyPodCount), + delayWindow: aggregation.NewTimeWindow(spec.ScaleDownDelay, time.Second), + algorithm: &algorithm.KpaScalingAlgorithm{}, + scalingContext: &spec, } + + kpaScaler.metricClient = kpaMetricsClient ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() diff --git a/pkg/controller/podautoscaler/scaler/scaler_factory.go b/pkg/controller/podautoscaler/scaler/scaler_factory.go index 9d7d1047..7106f3d5 100644 --- a/pkg/controller/podautoscaler/scaler/scaler_factory.go +++ b/pkg/controller/podautoscaler/scaler/scaler_factory.go @@ -24,15 +24,17 @@ import ( // NewAutoscalerFactory creates an Autoscaler based on the given ScalingStrategy func NewAutoscalerFactory(strategy autoscalingv1alpha1.ScalingStrategyType) (Scaler, error) { + // after update, the XpaAutoscaler must be associated with an instantiated PA, rather than an empty scaler that awaits filling. + // But NewAutoscalerFactory doesn't be used, so we temporarily pass into nil switch strategy { case autoscalingv1alpha1.KPA: - autoscaler, err := NewKpaAutoscaler(0, NewKpaScalingContext()) + autoscaler, err := NewKpaAutoscaler(0, nil) if err != nil { return nil, err } return autoscaler, nil case autoscalingv1alpha1.APA: - autoscaler, err := NewApaAutoscaler(0, NewApaScalingContext()) + autoscaler, err := NewApaAutoscaler(0, nil) if err != nil { return nil, err } diff --git a/pkg/controller/podautoscaler/utils.go b/pkg/controller/podautoscaler/utils.go index 7385943f..4275b404 100644 --- a/pkg/controller/podautoscaler/utils.go +++ b/pkg/controller/podautoscaler/utils.go @@ -19,6 +19,8 @@ package podautoscaler import ( "fmt" + autoscalingv1alpha1 "github.com/aibrix/aibrix/api/autoscaling/v1alpha1" + "github.com/aibrix/aibrix/pkg/controller/podautoscaler/metrics" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" @@ -50,3 +52,7 @@ func extractLabelSelector(scale *unstructured.Unstructured) (labels.Selector, er return labelsSelector, nil } + +func NewNamespaceNameMetricByPa(pa autoscalingv1alpha1.PodAutoscaler) metrics.NamespaceNameMetric { + return metrics.NewNamespaceNameMetric(pa.Namespace, pa.Spec.ScaleTargetRef.Name, pa.Spec.TargetMetric) +}