diff --git a/test/e2e/apimachinery/BUILD b/test/e2e/apimachinery/BUILD index 71b9b33a8d6cf..94ed3b962d489 100644 --- a/test/e2e/apimachinery/BUILD +++ b/test/e2e/apimachinery/BUILD @@ -75,6 +75,7 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/features:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/names:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/apihelpers:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/cli-runtime/pkg/printers:go_default_library", "//staging/src/k8s.io/client-go/discovery:go_default_library", diff --git a/test/e2e/apimachinery/flowcontrol.go b/test/e2e/apimachinery/flowcontrol.go index 0fae59a418d05..3b3f71a058025 100644 --- a/test/e2e/apimachinery/flowcontrol.go +++ b/test/e2e/apimachinery/flowcontrol.go @@ -19,6 +19,7 @@ package apimachinery import ( "bytes" "context" + "errors" "fmt" "io" "net/http" @@ -32,14 +33,21 @@ import ( flowcontrol "k8s.io/api/flowcontrol/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/util/apihelpers" + clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" clientsideflowcontrol "k8s.io/client-go/util/flowcontrol" "k8s.io/kubernetes/test/e2e/framework" ) const ( - requestConcurrencyLimitMetricName = "apiserver_flowcontrol_request_concurrency_limit" - requestConcurrencyLimitMetricLabelName = "priority_level" + requestConcurrencyLimitMetricName = "apiserver_flowcontrol_request_concurrency_limit" + priorityLevelLabelName = "priority_level" +) + +var ( + errPriorityLevelNotFound = errors.New("cannot find a metric sample with a matching priority level name label") ) var _ = SIGDescribe("API priority and fairness", func() { @@ -59,6 +67,9 @@ var _ = SIGDescribe("API priority and fairness", func() { createdFlowSchema, cleanup := createFlowSchema(f, testingFlowSchemaName, 1000, testingPriorityLevelName, []string{matchingUsername}) defer cleanup() + ginkgo.By("waiting for testing FlowSchema and PriorityLevelConfiguration to reach steady state") + waitForSteadyState(f, testingFlowSchemaName, testingPriorityLevelName) + var response *http.Response ginkgo.By("response headers should contain the UID of the appropriate FlowSchema and PriorityLevelConfiguration for a matching user") response = makeRequest(f, matchingUsername) @@ -126,11 +137,15 @@ var _ = SIGDescribe("API priority and fairness", func() { framework.Logf("creating FlowSchema %q", clients[i].flowSchemaName) _, cleanup = createFlowSchema(f, clients[i].flowSchemaName, clients[i].matchingPrecedence, clients[i].priorityLevelName, []string{clients[i].username}) defer cleanup() + + ginkgo.By("waiting for testing FlowSchema and PriorityLevelConfiguration to reach steady state") + waitForSteadyState(f, clients[i].flowSchemaName, clients[i].priorityLevelName) } ginkgo.By("getting request concurrency from metrics") for i := range clients { - realConcurrency := getPriorityLevelConcurrency(f, clients[i].priorityLevelName) + realConcurrency, err := getPriorityLevelConcurrency(f.ClientSet, clients[i].priorityLevelName) + framework.ExpectNoError(err) clients[i].concurrency = int32(float64(realConcurrency) * clients[i].concurrencyMultiplier) if clients[i].concurrency < 1 { clients[i].concurrency = 1 @@ -185,6 +200,9 @@ var _ = SIGDescribe("API priority and fairness", func() { _, cleanup = createFlowSchema(f, flowSchemaName, 1000, priorityLevelName, []string{highQPSClientName, lowQPSClientName}) defer cleanup() + ginkgo.By("waiting for testing flow schema and priority level to reach steady state") + waitForSteadyState(f, flowSchemaName, priorityLevelName) + type client struct { username string qps float64 @@ -199,7 +217,8 @@ var _ = SIGDescribe("API priority and fairness", func() { } framework.Logf("getting real concurrency") - realConcurrency := getPriorityLevelConcurrency(f, priorityLevelName) + realConcurrency, err := getPriorityLevelConcurrency(f.ClientSet, priorityLevelName) + framework.ExpectNoError(err) for i := range clients { clients[i].concurrency = int32(float64(realConcurrency) * clients[i].concurrencyMultiplier) if clients[i].concurrency < 1 { @@ -259,10 +278,11 @@ func createPriorityLevel(f *framework.Framework, priorityLevelName string, assur } } -//lint:ignore U1000 function is actually referenced -func getPriorityLevelConcurrency(f *framework.Framework, priorityLevelName string) int32 { - resp, err := f.ClientSet.CoreV1().RESTClient().Get().RequestURI("/metrics").DoRaw(context.TODO()) - framework.ExpectNoError(err) +func getPriorityLevelConcurrency(c clientset.Interface, priorityLevelName string) (int32, error) { + resp, err := c.CoreV1().RESTClient().Get().RequestURI("/metrics").DoRaw(context.TODO()) + if err != nil { + return 0, err + } sampleDecoder := expfmt.SampleDecoder{ Dec: expfmt.NewDecoder(bytes.NewBuffer(resp), expfmt.FmtText), Opts: &expfmt.DecodeOptions{}, @@ -270,22 +290,23 @@ func getPriorityLevelConcurrency(f *framework.Framework, priorityLevelName strin for { var v model.Vector err := sampleDecoder.Decode(&v) - if err == io.EOF { - break + if err != nil { + if err == io.EOF { + break + } + return 0, err } - framework.ExpectNoError(err) for _, metric := range v { if string(metric.Metric[model.MetricNameLabel]) != requestConcurrencyLimitMetricName { continue } - if string(metric.Metric[requestConcurrencyLimitMetricLabelName]) != priorityLevelName { + if string(metric.Metric[priorityLevelLabelName]) != priorityLevelName { continue } - return int32(metric.Value) + return int32(metric.Value), nil } } - framework.ExpectNoError(fmt.Errorf("cannot find metric %q with matching priority level name label %q", requestConcurrencyLimitMetricName, priorityLevelName)) - return 0 + return 0, errPriorityLevelNotFound } // createFlowSchema creates a flow schema referring to a particular priority @@ -335,6 +356,35 @@ func createFlowSchema(f *framework.Framework, flowSchemaName string, matchingPre } } +// waitForSteadyState repeatedly polls the API server to check if the newly +// created flow schema and priority level have been seen by the APF controller +// by checking: (1) the dangling priority level reference condition in the flow +// schema status, and (2) metrics. The function times out after 30 seconds. +func waitForSteadyState(f *framework.Framework, flowSchemaName string, priorityLevelName string) { + framework.ExpectNoError(wait.Poll(time.Second, 30*time.Second, func() (bool, error) { + fs, err := f.ClientSet.FlowcontrolV1beta1().FlowSchemas().Get(context.TODO(), flowSchemaName, metav1.GetOptions{}) + if err != nil { + return false, err + } + condition := apihelpers.GetFlowSchemaConditionByType(fs, flowcontrol.FlowSchemaConditionDangling) + if condition == nil || condition.Status != flowcontrol.ConditionFalse { + // The absence of the dangling status object implies that the APF + // controller isn't done with syncing the flow schema object. And, of + // course, the condition being anything but false means that steady state + // hasn't been achieved. + return false, nil + } + _, err = getPriorityLevelConcurrency(f.ClientSet, priorityLevelName) + if err != nil { + if err == errPriorityLevelNotFound { + return false, nil + } + return false, err + } + return true, nil + })) +} + // makeRequests creates a request to the API server and returns the response. func makeRequest(f *framework.Framework, username string) *http.Response { config := f.ClientConfig()