Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release: test upgrade/downgrade for 1.13/1.14/1.15 + mariner #1491

Merged
merged 17 commits into from
Mar 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/kind_e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ jobs:
export TEST_OUTPUT_FILE=$GITHUB_WORKSPACE/test-e2e-kind.json
echo "TEST_OUTPUT_FILE=$TEST_OUTPUT_FILE" >> $GITHUB_ENV
export GITHUB_TOKEN=${{ secrets.GITHUB_TOKEN }}
export TEST_DAPR_HA_MODE=${{ matrix.mode }}
make e2e-build-run-k8s
shell: bash
- name: Run tests with Docker hub
Expand All @@ -181,6 +182,7 @@ jobs:
export TEST_OUTPUT_FILE=$GITHUB_WORKSPACE/test-e2e-kind.json
echo "TEST_OUTPUT_FILE=$TEST_OUTPUT_FILE" >> $GITHUB_ENV
export GITHUB_TOKEN=${{ secrets.GITHUB_TOKEN }}
export TEST_DAPR_HA_MODE=${{ matrix.mode }}
make e2e-build-run-k8s
shell: bash
- name: Upload test results
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/upgrade_e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,15 @@ jobs:
run: |
export TEST_OUTPUT_FILE=$GITHUB_WORKSPACE/test-e2e-upgrade-kind.json
echo "TEST_OUTPUT_FILE=$TEST_OUTPUT_FILE" >> $GITHUB_ENV
export TEST_DAPR_HA_MODE=${{ matrix.mode }}
make e2e-build-run-upgrade
- name: Run tests with Docker hub
if: github.event.schedule != '0 */6 * * *'
run: |
export TEST_OUTPUT_FILE=$GITHUB_WORKSPACE/test-e2e-upgrade-kind.json
echo "TEST_OUTPUT_FILE=$TEST_OUTPUT_FILE" >> $GITHUB_ENV
export TEST_DAPR_HA_MODE=${{ matrix.mode }}
make e2e-build-run-upgrade
- name: Upload test results
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ e2e-build-run-k8s: build test-e2e-k8s
################################################################################
.PHONY: test-e2e-upgrade
test-e2e-upgrade: test-deps
gotestsum --jsonfile $(TEST_OUTPUT_FILE) --format standard-verbose -- -timeout 40m -count=1 -tags=e2e ./tests/e2e/upgrade/...
gotestsum --jsonfile $(TEST_OUTPUT_FILE) --format standard-verbose -- -timeout 60m -count=1 -tags=e2e ./tests/e2e/upgrade/...

################################################################################
# Build, E2E Tests for Kubernetes Upgrade #
Expand Down
110 changes: 110 additions & 0 deletions pkg/kubernetes/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ limitations under the License.
package kubernetes

import (
"context"
"errors"
"fmt"
"net/http"
"os"
Expand All @@ -23,8 +25,11 @@ import (
helm "helm.sh/helm/v3/pkg/action"
"helm.sh/helm/v3/pkg/chart"
"helm.sh/helm/v3/pkg/release"
core_v1 "k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/helm/pkg/strvals"

"github.com/Masterminds/semver/v3"
"github.com/hashicorp/go-version"

"github.com/dapr/cli/pkg/print"
Expand All @@ -49,6 +54,8 @@ var crdsFullResources = []string{
"httpendpoints.dapr.io",
}

var versionWithHAScheduler = semver.MustParse("1.15.0-rc.1")

type UpgradeConfig struct {
RuntimeVersion string
DashboardVersion string
Expand Down Expand Up @@ -157,13 +164,40 @@ func Upgrade(conf UpgradeConfig) error {
return err
}

// used to signal the deletion of the scheduler pods only when downgrading from 1.15 to previous versions to handle incompatible changes
// in other cases the channel should be nil
var downgradeDeletionChan chan error

if !isDowngrade(conf.RuntimeVersion, daprVersion) {
err = applyCRDs("v" + conf.RuntimeVersion)
if err != nil {
return fmt.Errorf("unable to apply CRDs: %w", err)
}
} else {
print.InfoStatusEvent(os.Stdout, "Downgrade detected, skipping CRDs.")

targetVersion, errVersion := semver.NewVersion(conf.RuntimeVersion)
if errVersion != nil {
return fmt.Errorf("unable to parse dapr target version: %w", errVersion)
}

currentVersion, errVersion := semver.NewVersion(daprVersion)
if errVersion != nil {
return fmt.Errorf("unable to parse dapr current version: %w", errVersion)
}

if currentVersion.GreaterThanEqual(versionWithHAScheduler) && targetVersion.LessThan(versionWithHAScheduler) {
downgradeDeletionChan = make(chan error)
// Must delete all scheduler pods from cluster due to incompatible changes in version 1.15 with older versions.
go func() {
errDeletion := deleteSchedulerPods(status[0].Namespace, currentVersion, targetVersion)
if errDeletion != nil {
downgradeDeletionChan <- fmt.Errorf("failed to delete scheduler pods: %w", errDeletion)
print.FailureStatusEvent(os.Stderr, "Failed to delete scheduler pods: "+errDeletion.Error())
}
close(downgradeDeletionChan)
}()
}
}

chart, err := GetDaprHelmChartName(helmConf)
Expand All @@ -180,6 +214,15 @@ func Upgrade(conf UpgradeConfig) error {
return fmt.Errorf("failure while running upgrade: %w", err)
}

// wait for the deletion of the scheduler pods to finish
if downgradeDeletionChan != nil {
select {
case <-downgradeDeletionChan:
case <-time.After(3 * time.Minute):
return errors.New("timed out waiting for downgrade deletion")
}
}

if dashboardChart != nil {
if dashboardExists {
if _, err = upgradeClient.Run(dashboardReleaseName, dashboardChart, vals); err != nil {
Expand All @@ -202,6 +245,73 @@ func Upgrade(conf UpgradeConfig) error {
return nil
}

func deleteSchedulerPods(namespace string, currentVersion *semver.Version, targetVersion *semver.Version) error {
ctxWithTimeout, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

var pods *core_v1.PodList

// wait for at least one pod of the target version to be in the list before deleting the rest
// check the label app.kubernetes.io/version to determine the version of the pod
foundTargetVersion := false
for {
if foundTargetVersion {
break
}
k8sClient, err := Client()
if err != nil {
return err
}

pods, err = k8sClient.CoreV1().Pods(namespace).List(ctxWithTimeout, meta_v1.ListOptions{
LabelSelector: "app=dapr-scheduler-server",
})
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return err
}

if len(pods.Items) == 0 {
return nil
}

for _, pod := range pods.Items {
pv, ok := pod.Labels["app.kubernetes.io/version"]
if ok {
podVersion, err := semver.NewVersion(pv)
if err == nil && podVersion.Equal(targetVersion) {
foundTargetVersion = true
break
}
}
}
time.Sleep(5 * time.Second)
}

if pods == nil {
return errors.New("no scheduler pods found")
}

// get a fresh client to ensure we have the latest state of the cluster
k8sClient, err := Client()
if err != nil {
return err
}

// delete scheduler pods of the current version, i.e. >1.15.0
for _, pod := range pods.Items {
if pv, ok := pod.Labels["app.kubernetes.io/version"]; ok {
podVersion, err := semver.NewVersion(pv)
if err == nil && podVersion.Equal(currentVersion) {
err = k8sClient.CoreV1().Pods(namespace).Delete(ctxWithTimeout, pod.Name, meta_v1.DeleteOptions{})
if err != nil {
return fmt.Errorf("failed to delete pod %s during downgrade: %w", pod.Name, err)
}
}
}
}
return nil
}

// WithRetry enables retry with the specified max retries and retry interval.
func WithRetry(maxRetries int, retryInterval time.Duration) UpgradeOption {
return func(o *UpgradeOptions) {
Expand Down
60 changes: 48 additions & 12 deletions tests/e2e/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ const (
thirdPartyDevNamespace = "default"
devRedisReleaseName = "dapr-dev-redis"
devZipkinReleaseName = "dapr-dev-zipkin"

DaprModeHA = "ha"
DaprModeNonHA = "non-ha"
)

var (
Expand Down Expand Up @@ -120,6 +123,22 @@ func GetRuntimeVersion(t *testing.T, latest bool) *semver.Version {
return runtimeVersion
}

func GetDaprTestHaMode() string {
daprHaMode := os.Getenv("TEST_DAPR_HA_MODE")
if daprHaMode != "" {
return daprHaMode
}
return ""
}

func ShouldSkipTest(mode string) bool {
envDaprHaMode := GetDaprTestHaMode()
if envDaprHaMode != "" {
return envDaprHaMode != mode
}
return false
}

func UpgradeTest(details VersionDetails, opts TestOptions) func(t *testing.T) {
return func(t *testing.T) {
daprPath := GetDaprPath()
Expand Down Expand Up @@ -213,7 +232,7 @@ func GetTestsOnInstall(details VersionDetails, opts TestOptions) []TestCase {
{"clusterroles exist " + details.RuntimeVersion, ClusterRolesTest(details, opts)},
{"clusterrolebindings exist " + details.RuntimeVersion, ClusterRoleBindingsTest(details, opts)},
{"apply and check components exist " + details.RuntimeVersion, ComponentsTestOnInstallUpgrade(opts)},
{"apply and check httpendpoints exist " + details.RuntimeVersion, HTTPEndpointsTestOnInstallUpgrade(opts)},
{"apply and check httpendpoints exist " + details.RuntimeVersion, HTTPEndpointsTestOnInstallUpgrade(opts, TestOptions{})},
{"check mtls " + details.RuntimeVersion, MTLSTestOnInstallUpgrade(opts)},
{"status check " + details.RuntimeVersion, StatusTestOnInstallUpgrade(details, opts)},
}
Expand Down Expand Up @@ -341,10 +360,10 @@ func ComponentsTestOnInstallUpgrade(opts TestOptions) func(t *testing.T) {
}
}

func HTTPEndpointsTestOnInstallUpgrade(opts TestOptions) func(t *testing.T) {
func HTTPEndpointsTestOnInstallUpgrade(installOpts TestOptions, upgradeOpts TestOptions) func(t *testing.T) {
return func(t *testing.T) {
// if dapr is installed with httpendpoints.
if opts.ApplyHTTPEndpointChanges {
if installOpts.ApplyHTTPEndpointChanges {
// apply any changes to the httpendpoint.
t.Log("apply httpendpoint changes")
output, err := spawn.Command("kubectl", "apply", "-f", "../testdata/namespace.yaml")
Expand All @@ -353,12 +372,17 @@ func HTTPEndpointsTestOnInstallUpgrade(opts TestOptions) func(t *testing.T) {
output, err = spawn.Command("kubectl", "apply", "-f", "../testdata/httpendpoint.yaml")
t.Log(output)
require.NoError(t, err, "expected no error on kubectl apply")
require.Equal(t, "httpendpoints.dapr.io/httpendpoint created\nhttpendpoints.dapr.io/httpendpoint created\n", output, "expected output to match")
httpEndpointOutputCheck(t, output)

if installOpts.ApplyHTTPEndpointChanges && upgradeOpts.ApplyHTTPEndpointChanges {
require.Equal(t, "httpendpoint.dapr.io/httpendpoint unchanged\n", output, "expected output to match")
} else {
require.Equal(t, "httpendpoint.dapr.io/httpendpoint created\n", output, "expected output to match")
}

t.Log("check applied httpendpoint exists")
_, err = spawn.Command("kubectl", "get", "httpendpoint")
output, err = spawn.Command("kubectl", "get", "httpendpoint")
require.NoError(t, err, "expected no error on calling to retrieve httpendpoints")
httpEndpointOutputCheck(t, output)
}
}
}
Expand Down Expand Up @@ -984,7 +1008,7 @@ func componentOutputCheck(t *testing.T, opts TestOptions, output string) {
return
}

lines = strings.Split(output, "\n")[2:] // remove header and warning message.
lines = lines[2:] // remove header and warning message.

if opts.DevEnabled {
// default, test statestore.
Expand Down Expand Up @@ -1152,6 +1176,8 @@ func waitPodDeletionDev(t *testing.T, done, podsDeleted chan struct{}) {
devRedisReleaseName: "dapr-dev-redis-master-",
devZipkinReleaseName: "dapr-dev-zipkin-",
}

t.Logf("dev pods waiting to be deleted: %d", len(list.Items))
for _, pod := range list.Items {
t.Log(pod.ObjectMeta.Name)
for component, prefix := range prefixes {
Expand All @@ -1169,7 +1195,7 @@ func waitPodDeletionDev(t *testing.T, done, podsDeleted chan struct{}) {
if len(found) == 2 {
podsDeleted <- struct{}{}
}
time.Sleep(15 * time.Second)
time.Sleep(10 * time.Second)
}
}

Expand All @@ -1181,19 +1207,28 @@ func waitPodDeletion(t *testing.T, done, podsDeleted chan struct{}) {
default:
break
}

ctx := context.Background()
ctxt, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

k8sClient, err := getClient()
require.NoError(t, err, "error getting k8s client for pods check")

list, err := k8sClient.CoreV1().Pods(DaprTestNamespace).List(ctxt, v1.ListOptions{
Limit: 100,
})
require.NoError(t, err, "error getting pods list from k8s")

if len(list.Items) == 0 {
podsDeleted <- struct{}{}
} else {
t.Logf("pods waiting to be deleted: %d", len(list.Items))
for _, pod := range list.Items {
t.Log(pod.ObjectMeta.Name)
}
}
time.Sleep(15 * time.Second)
time.Sleep(5 * time.Second)
}
}

Expand All @@ -1214,7 +1249,8 @@ func waitAllPodsRunning(t *testing.T, namespace string, haEnabled bool, done, po
Limit: 100,
})
require.NoError(t, err, "error getting pods list from k8s")
t.Logf("items %d", len(list.Items))

t.Logf("waiting for pods to be running, current count: %d", len(list.Items))
countOfReadyPods := 0
for _, item := range list.Items {
t.Log(item.ObjectMeta.Name)
Expand All @@ -1235,11 +1271,11 @@ func waitAllPodsRunning(t *testing.T, namespace string, haEnabled bool, done, po
if err != nil {
t.Error(err)
}
if len(list.Items) == countOfReadyPods && ((haEnabled && countOfReadyPods == pods) || (!haEnabled && countOfReadyPods == pods)) {
if len(list.Items) == countOfReadyPods && countOfReadyPods == pods {
podsRunning <- struct{}{}
}

time.Sleep(15 * time.Second)
time.Sleep(5 * time.Second)
}
}

Expand Down
Loading
Loading