Skip to content

Commit

Permalink
release: test upgrade/downgrade for 1.13/1.14/1.15 + mariner (#1491)
Browse files Browse the repository at this point in the history
* release: test upgrade/downgrade for 1.13/1.14/1.15 + mariner

Signed-off-by: Mike Nguyen <hey@mike.ee>

* fix: version skews

Co-authored-by: Anton Troshin <troll.sic@gmail.com>
Signed-off-by: Mike Nguyen <hey@mike.ee>

* Update tests/e2e/upgrade/upgrade_test.go

Accepted

Co-authored-by: Anton Troshin <troll.sic@gmail.com>
Signed-off-by: Yaron Schneider <schneider.yaron@live.com>

* Update tests/e2e/upgrade/upgrade_test.go

Co-authored-by: Anton Troshin <troll.sic@gmail.com>
Signed-off-by: Yaron Schneider <schneider.yaron@live.com>

* Fix downgrade issue from 1.15 by deleting previous version scheduler pods
Update 1.15 RC to latest RC.18

Signed-off-by: Anton Troshin <anton@diagrid.io>

* Fix downgrade 1.15 to 1.13 scenario with 0 scheduler pods

Signed-off-by: Anton Troshin <anton@diagrid.io>

* increase update test timeout to 60m and update latest version to 1.15

Signed-off-by: Anton Troshin <anton@diagrid.io>

* fix httpendpoint tests cleanup and checks

Signed-off-by: Anton Troshin <anton@diagrid.io>

* make sure matrix runs appropriate tests, every matrix ran the same tests

Signed-off-by: Anton Troshin <anton@diagrid.io>

* skip TestKubernetesRunFile on HA

Signed-off-by: Anton Troshin <anton@diagrid.io>

* fix skip TestKubernetesRunFile on HA

Signed-off-by: Anton Troshin <anton@diagrid.io>

* update to latest dapr 1.15.2

Signed-off-by: Anton Troshin <anton@diagrid.io>

* add logs when waiting for pod deletion

Signed-off-by: Anton Troshin <anton@diagrid.io>

---------

Signed-off-by: Mike Nguyen <hey@mike.ee>
Signed-off-by: Yaron Schneider <schneider.yaron@live.com>
Signed-off-by: Anton Troshin <anton@diagrid.io>
Co-authored-by: Anton Troshin <anton@diagrid.io>
Co-authored-by: Yaron Schneider <schneider.yaron@live.com>
Co-authored-by: Anton Troshin <troll.sic@gmail.com>
  • Loading branch information
4 people authored Mar 6, 2025
1 parent a0921c7 commit 4c1c26f
Show file tree
Hide file tree
Showing 8 changed files with 298 additions and 58 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/kind_e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ jobs:
export TEST_OUTPUT_FILE=$GITHUB_WORKSPACE/test-e2e-kind.json
echo "TEST_OUTPUT_FILE=$TEST_OUTPUT_FILE" >> $GITHUB_ENV
export GITHUB_TOKEN=${{ secrets.GITHUB_TOKEN }}
export TEST_DAPR_HA_MODE=${{ matrix.mode }}
make e2e-build-run-k8s
shell: bash
- name: Run tests with Docker hub
Expand All @@ -181,6 +182,7 @@ jobs:
export TEST_OUTPUT_FILE=$GITHUB_WORKSPACE/test-e2e-kind.json
echo "TEST_OUTPUT_FILE=$TEST_OUTPUT_FILE" >> $GITHUB_ENV
export GITHUB_TOKEN=${{ secrets.GITHUB_TOKEN }}
export TEST_DAPR_HA_MODE=${{ matrix.mode }}
make e2e-build-run-k8s
shell: bash
- name: Upload test results
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/upgrade_e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,15 @@ jobs:
run: |
export TEST_OUTPUT_FILE=$GITHUB_WORKSPACE/test-e2e-upgrade-kind.json
echo "TEST_OUTPUT_FILE=$TEST_OUTPUT_FILE" >> $GITHUB_ENV
export TEST_DAPR_HA_MODE=${{ matrix.mode }}
make e2e-build-run-upgrade
- name: Run tests with Docker hub
if: github.event.schedule != '0 */6 * * *'
run: |
export TEST_OUTPUT_FILE=$GITHUB_WORKSPACE/test-e2e-upgrade-kind.json
echo "TEST_OUTPUT_FILE=$TEST_OUTPUT_FILE" >> $GITHUB_ENV
export TEST_DAPR_HA_MODE=${{ matrix.mode }}
make e2e-build-run-upgrade
- name: Upload test results
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ e2e-build-run-k8s: build test-e2e-k8s
################################################################################
.PHONY: test-e2e-upgrade
test-e2e-upgrade: test-deps
gotestsum --jsonfile $(TEST_OUTPUT_FILE) --format standard-verbose -- -timeout 40m -count=1 -tags=e2e ./tests/e2e/upgrade/...
gotestsum --jsonfile $(TEST_OUTPUT_FILE) --format standard-verbose -- -timeout 60m -count=1 -tags=e2e ./tests/e2e/upgrade/...

################################################################################
# Build, E2E Tests for Kubernetes Upgrade #
Expand Down
110 changes: 110 additions & 0 deletions pkg/kubernetes/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ limitations under the License.
package kubernetes

import (
"context"
"errors"
"fmt"
"net/http"
"os"
Expand All @@ -23,8 +25,11 @@ import (
helm "helm.sh/helm/v3/pkg/action"
"helm.sh/helm/v3/pkg/chart"
"helm.sh/helm/v3/pkg/release"
core_v1 "k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/helm/pkg/strvals"

"github.com/Masterminds/semver/v3"
"github.com/hashicorp/go-version"

"github.com/dapr/cli/pkg/print"
Expand All @@ -49,6 +54,8 @@ var crdsFullResources = []string{
"httpendpoints.dapr.io",
}

var versionWithHAScheduler = semver.MustParse("1.15.0-rc.1")

type UpgradeConfig struct {
RuntimeVersion string
DashboardVersion string
Expand Down Expand Up @@ -157,13 +164,40 @@ func Upgrade(conf UpgradeConfig) error {
return err
}

// used to signal the deletion of the scheduler pods only when downgrading from 1.15 to previous versions to handle incompatible changes
// in other cases the channel should be nil
var downgradeDeletionChan chan error

if !isDowngrade(conf.RuntimeVersion, daprVersion) {
err = applyCRDs("v" + conf.RuntimeVersion)
if err != nil {
return fmt.Errorf("unable to apply CRDs: %w", err)
}
} else {
print.InfoStatusEvent(os.Stdout, "Downgrade detected, skipping CRDs.")

targetVersion, errVersion := semver.NewVersion(conf.RuntimeVersion)
if errVersion != nil {
return fmt.Errorf("unable to parse dapr target version: %w", errVersion)
}

currentVersion, errVersion := semver.NewVersion(daprVersion)
if errVersion != nil {
return fmt.Errorf("unable to parse dapr current version: %w", errVersion)
}

if currentVersion.GreaterThanEqual(versionWithHAScheduler) && targetVersion.LessThan(versionWithHAScheduler) {
downgradeDeletionChan = make(chan error)
// Must delete all scheduler pods from cluster due to incompatible changes in version 1.15 with older versions.
go func() {
errDeletion := deleteSchedulerPods(status[0].Namespace, currentVersion, targetVersion)
if errDeletion != nil {
downgradeDeletionChan <- fmt.Errorf("failed to delete scheduler pods: %w", errDeletion)
print.FailureStatusEvent(os.Stderr, "Failed to delete scheduler pods: "+errDeletion.Error())
}
close(downgradeDeletionChan)
}()
}
}

chart, err := GetDaprHelmChartName(helmConf)
Expand All @@ -180,6 +214,15 @@ func Upgrade(conf UpgradeConfig) error {
return fmt.Errorf("failure while running upgrade: %w", err)
}

// wait for the deletion of the scheduler pods to finish
if downgradeDeletionChan != nil {
select {
case <-downgradeDeletionChan:
case <-time.After(3 * time.Minute):
return errors.New("timed out waiting for downgrade deletion")
}
}

if dashboardChart != nil {
if dashboardExists {
if _, err = upgradeClient.Run(dashboardReleaseName, dashboardChart, vals); err != nil {
Expand All @@ -202,6 +245,73 @@ func Upgrade(conf UpgradeConfig) error {
return nil
}

func deleteSchedulerPods(namespace string, currentVersion *semver.Version, targetVersion *semver.Version) error {
ctxWithTimeout, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

var pods *core_v1.PodList

// wait for at least one pod of the target version to be in the list before deleting the rest
// check the label app.kubernetes.io/version to determine the version of the pod
foundTargetVersion := false
for {
if foundTargetVersion {
break
}
k8sClient, err := Client()
if err != nil {
return err
}

pods, err = k8sClient.CoreV1().Pods(namespace).List(ctxWithTimeout, meta_v1.ListOptions{
LabelSelector: "app=dapr-scheduler-server",
})
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return err
}

if len(pods.Items) == 0 {
return nil
}

for _, pod := range pods.Items {
pv, ok := pod.Labels["app.kubernetes.io/version"]
if ok {
podVersion, err := semver.NewVersion(pv)
if err == nil && podVersion.Equal(targetVersion) {
foundTargetVersion = true
break
}
}
}
time.Sleep(5 * time.Second)
}

if pods == nil {
return errors.New("no scheduler pods found")
}

// get a fresh client to ensure we have the latest state of the cluster
k8sClient, err := Client()
if err != nil {
return err
}

// delete scheduler pods of the current version, i.e. >1.15.0
for _, pod := range pods.Items {
if pv, ok := pod.Labels["app.kubernetes.io/version"]; ok {
podVersion, err := semver.NewVersion(pv)
if err == nil && podVersion.Equal(currentVersion) {
err = k8sClient.CoreV1().Pods(namespace).Delete(ctxWithTimeout, pod.Name, meta_v1.DeleteOptions{})
if err != nil {
return fmt.Errorf("failed to delete pod %s during downgrade: %w", pod.Name, err)
}
}
}
}
return nil
}

// WithRetry enables retry with the specified max retries and retry interval.
func WithRetry(maxRetries int, retryInterval time.Duration) UpgradeOption {
return func(o *UpgradeOptions) {
Expand Down
60 changes: 48 additions & 12 deletions tests/e2e/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ const (
thirdPartyDevNamespace = "default"
devRedisReleaseName = "dapr-dev-redis"
devZipkinReleaseName = "dapr-dev-zipkin"

DaprModeHA = "ha"
DaprModeNonHA = "non-ha"
)

var (
Expand Down Expand Up @@ -120,6 +123,22 @@ func GetRuntimeVersion(t *testing.T, latest bool) *semver.Version {
return runtimeVersion
}

func GetDaprTestHaMode() string {
daprHaMode := os.Getenv("TEST_DAPR_HA_MODE")
if daprHaMode != "" {
return daprHaMode
}
return ""
}

func ShouldSkipTest(mode string) bool {
envDaprHaMode := GetDaprTestHaMode()
if envDaprHaMode != "" {
return envDaprHaMode != mode
}
return false
}

func UpgradeTest(details VersionDetails, opts TestOptions) func(t *testing.T) {
return func(t *testing.T) {
daprPath := GetDaprPath()
Expand Down Expand Up @@ -213,7 +232,7 @@ func GetTestsOnInstall(details VersionDetails, opts TestOptions) []TestCase {
{"clusterroles exist " + details.RuntimeVersion, ClusterRolesTest(details, opts)},
{"clusterrolebindings exist " + details.RuntimeVersion, ClusterRoleBindingsTest(details, opts)},
{"apply and check components exist " + details.RuntimeVersion, ComponentsTestOnInstallUpgrade(opts)},
{"apply and check httpendpoints exist " + details.RuntimeVersion, HTTPEndpointsTestOnInstallUpgrade(opts)},
{"apply and check httpendpoints exist " + details.RuntimeVersion, HTTPEndpointsTestOnInstallUpgrade(opts, TestOptions{})},
{"check mtls " + details.RuntimeVersion, MTLSTestOnInstallUpgrade(opts)},
{"status check " + details.RuntimeVersion, StatusTestOnInstallUpgrade(details, opts)},
}
Expand Down Expand Up @@ -341,10 +360,10 @@ func ComponentsTestOnInstallUpgrade(opts TestOptions) func(t *testing.T) {
}
}

func HTTPEndpointsTestOnInstallUpgrade(opts TestOptions) func(t *testing.T) {
func HTTPEndpointsTestOnInstallUpgrade(installOpts TestOptions, upgradeOpts TestOptions) func(t *testing.T) {
return func(t *testing.T) {
// if dapr is installed with httpendpoints.
if opts.ApplyHTTPEndpointChanges {
if installOpts.ApplyHTTPEndpointChanges {
// apply any changes to the httpendpoint.
t.Log("apply httpendpoint changes")
output, err := spawn.Command("kubectl", "apply", "-f", "../testdata/namespace.yaml")
Expand All @@ -353,12 +372,17 @@ func HTTPEndpointsTestOnInstallUpgrade(opts TestOptions) func(t *testing.T) {
output, err = spawn.Command("kubectl", "apply", "-f", "../testdata/httpendpoint.yaml")
t.Log(output)
require.NoError(t, err, "expected no error on kubectl apply")
require.Equal(t, "httpendpoints.dapr.io/httpendpoint created\nhttpendpoints.dapr.io/httpendpoint created\n", output, "expected output to match")
httpEndpointOutputCheck(t, output)

if installOpts.ApplyHTTPEndpointChanges && upgradeOpts.ApplyHTTPEndpointChanges {
require.Equal(t, "httpendpoint.dapr.io/httpendpoint unchanged\n", output, "expected output to match")
} else {
require.Equal(t, "httpendpoint.dapr.io/httpendpoint created\n", output, "expected output to match")
}

t.Log("check applied httpendpoint exists")
_, err = spawn.Command("kubectl", "get", "httpendpoint")
output, err = spawn.Command("kubectl", "get", "httpendpoint")
require.NoError(t, err, "expected no error on calling to retrieve httpendpoints")
httpEndpointOutputCheck(t, output)
}
}
}
Expand Down Expand Up @@ -984,7 +1008,7 @@ func componentOutputCheck(t *testing.T, opts TestOptions, output string) {
return
}

lines = strings.Split(output, "\n")[2:] // remove header and warning message.
lines = lines[2:] // remove header and warning message.

if opts.DevEnabled {
// default, test statestore.
Expand Down Expand Up @@ -1152,6 +1176,8 @@ func waitPodDeletionDev(t *testing.T, done, podsDeleted chan struct{}) {
devRedisReleaseName: "dapr-dev-redis-master-",
devZipkinReleaseName: "dapr-dev-zipkin-",
}

t.Logf("dev pods waiting to be deleted: %d", len(list.Items))
for _, pod := range list.Items {
t.Log(pod.ObjectMeta.Name)
for component, prefix := range prefixes {
Expand All @@ -1169,7 +1195,7 @@ func waitPodDeletionDev(t *testing.T, done, podsDeleted chan struct{}) {
if len(found) == 2 {
podsDeleted <- struct{}{}
}
time.Sleep(15 * time.Second)
time.Sleep(10 * time.Second)
}
}

Expand All @@ -1181,19 +1207,28 @@ func waitPodDeletion(t *testing.T, done, podsDeleted chan struct{}) {
default:
break
}

ctx := context.Background()
ctxt, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

k8sClient, err := getClient()
require.NoError(t, err, "error getting k8s client for pods check")

list, err := k8sClient.CoreV1().Pods(DaprTestNamespace).List(ctxt, v1.ListOptions{
Limit: 100,
})
require.NoError(t, err, "error getting pods list from k8s")

if len(list.Items) == 0 {
podsDeleted <- struct{}{}
} else {
t.Logf("pods waiting to be deleted: %d", len(list.Items))
for _, pod := range list.Items {
t.Log(pod.ObjectMeta.Name)
}
}
time.Sleep(15 * time.Second)
time.Sleep(5 * time.Second)
}
}

Expand All @@ -1214,7 +1249,8 @@ func waitAllPodsRunning(t *testing.T, namespace string, haEnabled bool, done, po
Limit: 100,
})
require.NoError(t, err, "error getting pods list from k8s")
t.Logf("items %d", len(list.Items))

t.Logf("waiting for pods to be running, current count: %d", len(list.Items))
countOfReadyPods := 0
for _, item := range list.Items {
t.Log(item.ObjectMeta.Name)
Expand All @@ -1235,11 +1271,11 @@ func waitAllPodsRunning(t *testing.T, namespace string, haEnabled bool, done, po
if err != nil {
t.Error(err)
}
if len(list.Items) == countOfReadyPods && ((haEnabled && countOfReadyPods == pods) || (!haEnabled && countOfReadyPods == pods)) {
if len(list.Items) == countOfReadyPods && countOfReadyPods == pods {
podsRunning <- struct{}{}
}

time.Sleep(15 * time.Second)
time.Sleep(5 * time.Second)
}
}

Expand Down
Loading

0 comments on commit 4c1c26f

Please sign in to comment.