Skip to content

Commit

Permalink
fix: Live workflow takes precedence during merge to correctly display…
Browse files Browse the repository at this point in the history
… in the UI (argoproj#11336)

Signed-off-by: Yuan Tang <terrytangyuan@gmail.com>
  • Loading branch information
terrytangyuan authored and isubasinghe committed May 5, 2024
1 parent a4ca4d2 commit 660bbb6
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 0 deletions.
37 changes: 37 additions & 0 deletions server/workflow/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/argoproj/argo-workflows/v3/persist/sqldb"
workflowpkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow"
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow"
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
"github.com/argoproj/argo-workflows/v3/server/auth"
Expand Down Expand Up @@ -127,6 +128,42 @@ func (s *workflowServer) GetWorkflow(ctx context.Context, req *workflowpkg.Workf
return wf, nil
}

func mergeWithArchivedWorkflows(liveWfs v1alpha1.WorkflowList, archivedWfs v1alpha1.WorkflowList, numWfsToKeep int) *v1alpha1.WorkflowList {
var mergedWfs []v1alpha1.Workflow
var uidToWfs = map[types.UID][]v1alpha1.Workflow{}
for _, item := range liveWfs.Items {
uidToWfs[item.UID] = append(uidToWfs[item.UID], item)
}
for _, item := range archivedWfs.Items {
uidToWfs[item.UID] = append(uidToWfs[item.UID], item)
}

for _, v := range uidToWfs {
// The archived workflow we saved in the database will only have "Pending" as the archival status.
// We want to only keep the workflow that has the correct label to display correctly in the UI.
if len(v) == 1 {
mergedWfs = append(mergedWfs, v[0])
} else {
if ok := v[0].Labels[common.LabelKeyWorkflowArchivingStatus] == "Archived"; ok {
mergedWfs = append(mergedWfs, v[0])
} else {
mergedWfs = append(mergedWfs, v[1])
}
}
}
mergedWfsList := v1alpha1.WorkflowList{Items: mergedWfs, ListMeta: liveWfs.ListMeta}
sort.Sort(mergedWfsList.Items)
numWfs := 0
var finalWfs []v1alpha1.Workflow
for _, item := range mergedWfsList.Items {
if numWfsToKeep == 0 || numWfs < numWfsToKeep {
finalWfs = append(finalWfs, item)
numWfs += 1
}
}
return &v1alpha1.WorkflowList{Items: finalWfs, ListMeta: liveWfs.ListMeta}
}

func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.WorkflowListRequest) (*wfv1.WorkflowList, error) {
wfClient := auth.GetWfClient(ctx)

Expand Down
21 changes: 21 additions & 0 deletions server/workflow/workflow_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"testing"
"time"

"github.com/go-jose/go-jose/v3/jwt"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -630,6 +631,26 @@ func (t testWatchWorkflowServer) Send(*workflowpkg.WorkflowWatchEvent) error {
panic("implement me")
}

func TestMergeWithArchivedWorkflows(t *testing.T) {
timeNow := time.Now()
wf1Live := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{UID: "1", CreationTimestamp: metav1.Time{Time: timeNow.Add(time.Second)},
Labels: map[string]string{common.LabelKeyWorkflowArchivingStatus: "Archived"}}}
wf1Archived := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{UID: "1", CreationTimestamp: metav1.Time{Time: timeNow.Add(time.Second)},
Labels: map[string]string{common.LabelKeyWorkflowArchivingStatus: "Pending"}}}
wf2 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{UID: "2", CreationTimestamp: metav1.Time{Time: timeNow.Add(2 * time.Second)}}}
wf3 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{UID: "3", CreationTimestamp: metav1.Time{Time: timeNow.Add(3 * time.Second)}}}
liveWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf1Live, wf2}}
archivedWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf1Archived, wf3, wf2}}
expectedWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf3, wf2, wf1Live}}
expectedShortWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf3, wf2}}
assert.Equal(t, expectedWfList.Items, mergeWithArchivedWorkflows(liveWfList, archivedWfList, 0).Items)
assert.Equal(t, expectedShortWfList.Items, mergeWithArchivedWorkflows(liveWfList, archivedWfList, 2).Items)
}

func TestWatchWorkflows(t *testing.T) {
server, ctx := getWorkflowServer()
wf := &v1alpha1.Workflow{
Expand Down
4 changes: 4 additions & 0 deletions ui/src/models/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,10 @@ export interface Workflow {

export const execSpec = (w: Workflow) => Object.assign({}, w.status.storedWorkflowTemplateSpec, w.spec);

export function isArchivedWorkflow(wf: Workflow): boolean {
return wf.metadata.labels && wf.metadata.labels['workflows.argoproj.io/workflow-archiving-status'] === 'Archived';
}

export type NodeType = 'Pod' | 'Container' | 'Steps' | 'StepGroup' | 'DAG' | 'Retry' | 'Skipped' | 'TaskGroup' | 'Suspend';

export interface NodeStatus {
Expand Down

0 comments on commit 660bbb6

Please sign in to comment.