diff --git a/pkg/autoscaler/autoscaler/calculate/calculate.go b/pkg/autoscaler/autoscaler/calculate/calculate.go new file mode 100644 index 00000000000..0d1f9e84139 --- /dev/null +++ b/pkg/autoscaler/autoscaler/calculate/calculate.go @@ -0,0 +1,109 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package calculate + +import ( + "encoding/json" + "fmt" + "math" + "net/http" + "strconv" + + "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" + promClient "github.com/prometheus/client_golang/api" + autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2" + "k8s.io/apimachinery/pkg/util/sets" +) + +const ( + TikvSumCpuMetricsPattern = `sum(increase(tikv_thread_cpu_seconds_total{cluster="%s"}[%s])) by (instance)` + TidbSumCpuMetricsPattern = `sum(increase(process_cpu_seconds_total{cluster="%s",job="tidb"}[%s])) by (instance)` + InvalidTacMetricConfigureMsg = "tac[%s/%s] metric configuration invalid" + queryPath = "/api/v1/query" + + float64EqualityThreshold = 1e-9 +) + +type SingleQuery struct { + Timestamp int64 + Quary string + Instances []string + Metric autoscalingv2beta2.MetricSpec +} + +func queryMetricsFromPrometheus(tac *v1alpha1.TidbClusterAutoScaler, client promClient.Client, sq *SingleQuery, resp *Response) error { + query := sq.Quary + timestamp := sq.Timestamp + req, err := http.NewRequest("GET", fmt.Sprintf("%s%s", *tac.Spec.MetricsUrl, queryPath), nil) + if err != nil { + return err + } + q := req.URL.Query() + q.Add("query", query) + q.Add("time", fmt.Sprintf("%d", timestamp)) + req.URL.RawQuery = q.Encode() + r, body, err := client.Do(req.Context(), req) + if err != nil { + return err + } + if r.StatusCode != http.StatusOK { + return fmt.Errorf("tac[%s/%s] query error, status code:%d", tac.Namespace, tac.Name, r.StatusCode) + } + err = json.Unmarshal(body, resp) + if err != nil { + return err + } + if resp.Status != statusSuccess { + return fmt.Errorf("tac[%s/%s] query error, response status: %v", tac.Namespace, tac.Name, resp.Status) + } + return nil +} + +// sumForEachInstance sum the value in Response of each instance from Prometheus +func sumForEachInstance(instances []string, resp *Response) (float64, error) { + if resp == nil { + return 0, fmt.Errorf("metrics response from Promethus can't be empty") + } + s := sets.String{} + for _, instance := range instances { + s.Insert(instance) + } + sum := 0.0 + if len(resp.Data.Result) < 1 { + return 0, fmt.Errorf("metrics Response return zero info") + } + for _, r := range resp.Data.Result { + if s.Has(r.Metric.Instance) { + v, err := strconv.ParseFloat(r.Value[1].(string), 64) + if err != nil { + return 0.0, err + } + sum = sum + v + } + } + return sum, nil +} + +// calculate func calculate the recommended replicas by given usageRatio and currentReplicas +func calculate(currentValue float64, targetValue float64, currentReplicas int32) (int32, error) { + if almostEqual(targetValue, 0.0) { + return -1, fmt.Errorf("targetValue in calculate func can't be zero") + } + usageRatio := currentValue / targetValue + return int32(math.Ceil(usageRatio * float64(currentReplicas))), nil +} + +func almostEqual(a, b float64) bool { + return math.Abs(a-b) <= float64EqualityThreshold +} diff --git a/pkg/autoscaler/autoscaler/calculate/cpu.go b/pkg/autoscaler/autoscaler/calculate/cpu.go new file mode 100644 index 00000000000..dcad599ded8 --- /dev/null +++ b/pkg/autoscaler/autoscaler/calculate/cpu.go @@ -0,0 +1,76 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package calculate + +import ( + "fmt" + "time" + + "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" + promClient "github.com/prometheus/client_golang/api" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" +) + +const ( + CpuSumMetricsErrorMsg = "tac[%s/%s] cpu sum metrics error, can't calculate the past %s cpu metrics, may caused by prometheus restart while data persistence not enabled" +) + +//TODO: create issue to explain how auto-scaling algorithm based on cpu metrics work +func CalculateRecomendedReplicasByCpuCosts(tac *v1alpha1.TidbClusterAutoScaler, sq *SingleQuery, sts *appsv1.StatefulSet, + client promClient.Client, memberType v1alpha1.MemberType, duration time.Duration) (int32, error) { + metric := sq.Metric + instances := sq.Instances + + if metric.Resource == nil || metric.Resource.Target.AverageUtilization == nil { + return -1, fmt.Errorf(InvalidTacMetricConfigureMsg, tac.Namespace, tac.Name) + } + currentReplicas := len(instances) + c, err := filterContainer(tac, sts, memberType.String()) + if err != nil { + return -1, err + } + cpuRequestsRatio, err := extractCpuRequestsRatio(c) + if err != nil { + return -1, err + } + r := &Response{} + err = queryMetricsFromPrometheus(tac, client, sq, r) + if err != nil { + return -1, err + } + sum, err := sumForEachInstance(instances, r) + if err != nil { + return -1, err + } + if sum < 0 { + return -1, fmt.Errorf(CpuSumMetricsErrorMsg, tac.Namespace, tac.Name, duration.String()) + } + cpuSecsTotal := sum + durationSeconds := duration.Seconds() + utilizationRatio := float64(*metric.Resource.Target.AverageUtilization) / 100.0 + expectedCpuSecsTotal := cpuRequestsRatio * durationSeconds * float64(currentReplicas) * utilizationRatio + rc, err := calculate(cpuSecsTotal, expectedCpuSecsTotal, int32(currentReplicas)) + if err != nil { + return -1, err + } + return rc, nil +} + +func extractCpuRequestsRatio(c *corev1.Container) (float64, error) { + if c.Resources.Requests.Cpu() == nil || c.Resources.Requests.Cpu().MilliValue() < 1 { + return 0, fmt.Errorf("container[%s] cpu requests is empty", c.Name) + } + return float64(c.Resources.Requests.Cpu().MilliValue()) / 1000.0, nil +} diff --git a/pkg/autoscaler/autoscaler/calculate/util.go b/pkg/autoscaler/autoscaler/calculate/util.go new file mode 100644 index 00000000000..97c3cb5bb14 --- /dev/null +++ b/pkg/autoscaler/autoscaler/calculate/util.go @@ -0,0 +1,89 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package calculate + +import ( + "fmt" + + "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" + appsv1 "k8s.io/api/apps/v1" + autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2" + corev1 "k8s.io/api/core/v1" +) + +// MetricType describe the current Supported Metric Type to calculate the recommended Replicas +type MetricType string + +const ( + MetricTypeCPU MetricType = "cpu" + //metricTypeQPS MetricType = "qps" +) + +// currently, we only choose one metrics to be computed. +// If there exists several metrics, we tend to choose ResourceMetricSourceType metric +func FilterMetrics(metrics []autoscalingv2beta2.MetricSpec) autoscalingv2beta2.MetricSpec { + for _, m := range metrics { + if m.Type == autoscalingv2beta2.ResourceMetricSourceType && m.Resource != nil { + return m + } + } + return metrics[0] +} + +// genMetricType return the supported MetricType in Operator by kubernetes auto-scaling MetricType +func GenMetricType(tac *v1alpha1.TidbClusterAutoScaler, metric autoscalingv2beta2.MetricSpec) (MetricType, error) { + if metric.Type == autoscalingv2beta2.ResourceMetricSourceType && metric.Resource != nil && metric.Resource.Name == corev1.ResourceCPU { + return MetricTypeCPU, nil + } + return "", fmt.Errorf(InvalidTacMetricConfigureMsg, tac.Namespace, tac.Name) +} + +// filterContainer is to filter the specific container from the given statefulset(tidb/tikv) +func filterContainer(tac *v1alpha1.TidbClusterAutoScaler, sts *appsv1.StatefulSet, containerName string) (*corev1.Container, error) { + for _, c := range sts.Spec.Template.Spec.Containers { + if c.Name == containerName { + return &c, nil + } + } + return nil, fmt.Errorf("tac[%s/%s]'s Target have not %s container", tac.Namespace, tac.Name, containerName) +} + +const ( + statusSuccess = "success" +) + +// Response is used to marshal the data queried from Prometheus +type Response struct { + Status string `json:"status"` + Data Data `json:"data"` +} + +type Data struct { + ResultType string `json:"resultType"` + Result []Result `json:"result"` +} + +type Result struct { + Metric Metric `json:"metric"` + Value []interface{} `json:"value"` +} + +type Metric struct { + Cluster string `json:"cluster,omitempty"` + Instance string `json:"instance"` + Job string `json:"job,omitempty"` + KubernetesNamespace string `json:"kubernetes_namespace,omitempty"` + KubernetesNode string `json:"kubernetes_node,omitempty"` + KubernetesPodIp string `json:"kubernetes_pod_ip,omitempty"` +} diff --git a/pkg/autoscaler/autoscaler/tidb_autoscaler.go b/pkg/autoscaler/autoscaler/tidb_autoscaler.go index 7eb4aabdf99..4be703e0fff 100644 --- a/pkg/autoscaler/autoscaler/tidb_autoscaler.go +++ b/pkg/autoscaler/autoscaler/tidb_autoscaler.go @@ -14,12 +14,15 @@ package autoscaler import ( + "fmt" "time" "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" + "github.com/pingcap/tidb-operator/pkg/autoscaler/autoscaler/calculate" "github.com/pingcap/tidb-operator/pkg/label" operatorUtils "github.com/pingcap/tidb-operator/pkg/util" promClient "github.com/prometheus/client_golang/api" + appsv1 "k8s.io/api/apps/v1" ) func (am *autoScalerManager) syncTiDB(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbClusterAutoScaler, client promClient.Client) error { @@ -36,7 +39,11 @@ func (am *autoScalerManager) syncTiDB(tc *v1alpha1.TidbCluster, tac *v1alpha1.Ti return nil } currentReplicas := tc.Spec.TiDB.Replicas - targetReplicas := calculateRecommendedReplicas(tac, v1alpha1.TiDBMemberType, client) + instances := filterTidbInstances(tc) + targetReplicas, err := calculateTidbMetrics(tac, sts, client, instances) + if err != nil { + return err + } targetReplicas = limitTargetReplicas(targetReplicas, tac, v1alpha1.TiDBMemberType) if targetReplicas == tc.Spec.TiDB.Replicas { emptyAutoScalingCountAnn(tac, v1alpha1.TiDBMemberType) @@ -80,3 +87,39 @@ func updateTcTiDBAnnIfScale(tac *v1alpha1.TidbClusterAutoScaler) { tac.Annotations[label.AnnTiDBLastAutoScalingTimestamp] = time.Now().String() emptyAutoScalingCountAnn(tac, v1alpha1.TiDBMemberType) } + +func calculateTidbMetrics(tac *v1alpha1.TidbClusterAutoScaler, sts *appsv1.StatefulSet, client promClient.Client, instances []string) (int32, error) { + metric := calculate.FilterMetrics(tac.Spec.TiDB.Metrics) + mType, err := calculate.GenMetricType(tac, metric) + if err != nil { + return -1, err + } + duration, err := time.ParseDuration(*tac.Spec.TiDB.MetricsTimeDuration) + if err != nil { + return -1, err + } + sq := &calculate.SingleQuery{ + Timestamp: time.Now().Unix(), + Instances: instances, + Metric: metric, + Quary: fmt.Sprintf(calculate.TidbSumCpuMetricsPattern, tac.Spec.Cluster.Name, *tac.Spec.TiDB.MetricsTimeDuration), + } + + switch mType { + case calculate.MetricTypeCPU: + return calculate.CalculateRecomendedReplicasByCpuCosts(tac, sq, sts, client, v1alpha1.TiDBMemberType, duration) + default: + return -1, fmt.Errorf(calculate.InvalidTacMetricConfigureMsg, tac.Namespace, tac.Name) + } +} + +func filterTidbInstances(tc *v1alpha1.TidbCluster) []string { + var instances []string + for i := 0; int32(i) < tc.Status.TiDB.StatefulSet.Replicas; i++ { + podName := operatorUtils.GetPodName(tc, v1alpha1.TiDBMemberType, int32(i)) + if _, existed := tc.Status.TiDB.FailureMembers[podName]; !existed { + instances = append(instances, podName) + } + } + return instances +} diff --git a/pkg/autoscaler/autoscaler/tidb_autoscaler_test.go b/pkg/autoscaler/autoscaler/tidb_autoscaler_test.go index 3f7e1287251..d29d370f0cb 100644 --- a/pkg/autoscaler/autoscaler/tidb_autoscaler_test.go +++ b/pkg/autoscaler/autoscaler/tidb_autoscaler_test.go @@ -42,6 +42,7 @@ func TestSyncTiDBAfterCalculated(t *testing.T) { tc.Spec.TiDB.Replicas = test.currentReplicas tac.Annotations[label.AnnTiDBConsecutiveScaleInCount] = fmt.Sprintf("%d", test.currentScaleInCount) tac.Annotations[label.AnnTiDBConsecutiveScaleOutCount] = fmt.Sprintf("%d", test.currentScaleOutCount) + tac.Spec.TiKV = nil err := syncTiDBAfterCalculated(tc, tac, test.currentReplicas, test.recommendedReplicas) g.Expect(err).ShouldNot(HaveOccurred()) diff --git a/pkg/autoscaler/autoscaler/tikv_autoscaler.go b/pkg/autoscaler/autoscaler/tikv_autoscaler.go index 7f7b8177773..d8d5a268fef 100644 --- a/pkg/autoscaler/autoscaler/tikv_autoscaler.go +++ b/pkg/autoscaler/autoscaler/tikv_autoscaler.go @@ -14,12 +14,15 @@ package autoscaler import ( + "fmt" "time" "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" + "github.com/pingcap/tidb-operator/pkg/autoscaler/autoscaler/calculate" "github.com/pingcap/tidb-operator/pkg/label" operatorUtils "github.com/pingcap/tidb-operator/pkg/util" promClient "github.com/prometheus/client_golang/api" + appsv1 "k8s.io/api/apps/v1" ) func (am *autoScalerManager) syncTiKV(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbClusterAutoScaler, client promClient.Client) error { @@ -35,8 +38,12 @@ func (am *autoScalerManager) syncTiKV(tc *v1alpha1.TidbCluster, tac *v1alpha1.Ti emptyAutoScalingCountAnn(tac, v1alpha1.TiKVMemberType) return nil } - currentReplicas := getStateUpReplicas(tc) - targetReplicas := calculateRecommendedReplicas(tac, v1alpha1.TiKVMemberType, client) + instances := filterTiKVInstances(tc) + currentReplicas := int32(len(instances)) + targetReplicas, err := calculateTikvMetrics(tac, sts, client, instances) + if err != nil { + return err + } targetReplicas = limitTargetReplicas(targetReplicas, tac, v1alpha1.TiKVMemberType) if targetReplicas == tc.Spec.TiKV.Replicas { emptyAutoScalingCountAnn(tac, v1alpha1.TiKVMemberType) @@ -79,17 +86,44 @@ func syncTiKVAfterCalculated(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbCluster return nil } -func getStateUpReplicas(tc *v1alpha1.TidbCluster) int32 { - count := 0 +//TODO: fetch tikv instances info from pdapi in future +func filterTiKVInstances(tc *v1alpha1.TidbCluster) []string { + var instances []string for _, store := range tc.Status.TiKV.Stores { if store.State == v1alpha1.TiKVStateUp { - count = count + 1 + instances = append(instances, store.PodName) } } - return int32(count) + return instances } func updateTcTiKVAnnIfScale(tac *v1alpha1.TidbClusterAutoScaler) { tac.Annotations[label.AnnTiKVLastAutoScalingTimestamp] = time.Now().String() emptyAutoScalingCountAnn(tac, v1alpha1.TiKVMemberType) } + +func calculateTikvMetrics(tac *v1alpha1.TidbClusterAutoScaler, sts *appsv1.StatefulSet, client promClient.Client, instances []string) (int32, error) { + metric := calculate.FilterMetrics(tac.Spec.TiKV.Metrics) + mType, err := calculate.GenMetricType(tac, metric) + if err != nil { + return -1, err + } + + duration, err := time.ParseDuration(*tac.Spec.TiKV.MetricsTimeDuration) + if err != nil { + return -1, err + } + sq := &calculate.SingleQuery{ + Timestamp: time.Now().Unix(), + Instances: instances, + Metric: metric, + Quary: fmt.Sprintf(calculate.TikvSumCpuMetricsPattern, tac.Spec.Cluster.Name, *tac.Spec.TiKV.MetricsTimeDuration), + } + + switch mType { + case calculate.MetricTypeCPU: + return calculate.CalculateRecomendedReplicasByCpuCosts(tac, sq, sts, client, v1alpha1.TiKVMemberType, duration) + default: + return -1, fmt.Errorf(calculate.InvalidTacMetricConfigureMsg, tac.Namespace, tac.Name) + } +} diff --git a/pkg/autoscaler/autoscaler/tikv_autoscaler_test.go b/pkg/autoscaler/autoscaler/tikv_autoscaler_test.go index 6a2647c3579..f7eb3c22f53 100644 --- a/pkg/autoscaler/autoscaler/tikv_autoscaler_test.go +++ b/pkg/autoscaler/autoscaler/tikv_autoscaler_test.go @@ -42,6 +42,7 @@ func TestSyncTiKVAfterCalculated(t *testing.T) { tc.Spec.TiKV.Replicas = test.currentReplicas tac.Annotations[label.AnnTiKVConsecutiveScaleInCount] = fmt.Sprintf("%d", test.currentScaleInCount) tac.Annotations[label.AnnTiKVConsecutiveScaleOutCount] = fmt.Sprintf("%d", test.currentScaleOutCount) + tac.Spec.TiDB = nil err := syncTiKVAfterCalculated(tc, tac, test.currentReplicas, test.recommendedReplicas) g.Expect(err).ShouldNot(HaveOccurred()) diff --git a/pkg/autoscaler/autoscaler/util.go b/pkg/autoscaler/autoscaler/util.go index 4632c6c8c46..d59a9bf4567 100644 --- a/pkg/autoscaler/autoscaler/util.go +++ b/pkg/autoscaler/autoscaler/util.go @@ -21,7 +21,6 @@ import ( "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" "github.com/pingcap/tidb-operator/pkg/label" operatorUtils "github.com/pingcap/tidb-operator/pkg/util" - promClient "github.com/prometheus/client_golang/api" appsv1 "k8s.io/api/apps/v1" autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2" corev1 "k8s.io/api/core/v1" @@ -265,7 +264,7 @@ func checkConsecutiveCount(tac *v1alpha1.TidbClusterAutoScaler, } } else { // scale-in, no-scaling would be return nil at first - if int32(currentScaleInCount) < *tac.Spec.TiDB.ScaleInThreshold { + if int32(currentScaleInCount) < *tac.Spec.TiKV.ScaleInThreshold { return false, nil } } @@ -285,12 +284,6 @@ func emptyAutoScalingCountAnn(tac *v1alpha1.TidbClusterAutoScaler, memberType v1 tac.Annotations[targetScaleInAnn] = "0" } -//TODO: calculate the recommended replicas from Prometheus -func calculateRecommendedReplicas(tac *v1alpha1.TidbClusterAutoScaler, memberType v1alpha1.MemberType, - client promClient.Client) int32 { - return 0 -} - func resetAutoScalingAnn(tac *v1alpha1.TidbClusterAutoScaler) { emptyAutoScalingCountAnn(tac, v1alpha1.TiDBMemberType) emptyAutoScalingCountAnn(tac, v1alpha1.TiKVMemberType) diff --git a/pkg/autoscaler/autoscaler/util_test.go b/pkg/autoscaler/autoscaler/util_test.go index df0f542344c..70378008414 100644 --- a/pkg/autoscaler/autoscaler/util_test.go +++ b/pkg/autoscaler/autoscaler/util_test.go @@ -46,6 +46,11 @@ func TestUpdateConsecutiveCount(t *testing.T) { g.Expect(err).ShouldNot(HaveOccurred()) updatedScaleOutCountAnnValue := tac.Annotations[fmt.Sprintf("%s.%s", test.memberType, annScaleOutSuffix)] updatedScaleInCountAnnValue := tac.Annotations[fmt.Sprintf("%s.%s", test.memberType, annScaleInSuffix)] + if test.memberType == v1alpha1.TiKVMemberType { + tac.Spec.TiDB = nil + } else if test.memberType == v1alpha1.TiDBMemberType { + tac.Spec.TiKV = nil + } g.Expect(updatedScaleOutCountAnnValue).Should(Equal(test.expectedScaleOutAnnValue)) g.Expect(updatedScaleInCountAnnValue).Should(Equal(test.expectedScaleInAnnValue)) } @@ -126,6 +131,11 @@ func TestCheckConsecutiveCount(t *testing.T) { tac := newTidbClusterAutoScaler() tac.Annotations[fmt.Sprintf("%s.%s", test.memberType, annScaleOutSuffix)] = fmt.Sprintf("%d", test.scaleOutCount) tac.Annotations[fmt.Sprintf("%s.%s", test.memberType, annScaleInSuffix)] = fmt.Sprintf("%d", test.scaleInCount) + if test.memberType == v1alpha1.TiKVMemberType { + tac.Spec.TiDB = nil + } else if test.memberType == v1alpha1.TiDBMemberType { + tac.Spec.TiKV = nil + } ableScale, err := checkConsecutiveCount(tac, test.memberType, test.currentReplicas, test.recommendedReplicas) g.Expect(err).ShouldNot(HaveOccurred())