Skip to content

Commit

Permalink
Add current iteration item to the PipelineLoopPipelineRunStatus (kube…
Browse files Browse the repository at this point in the history
  • Loading branch information
wzhanw authored Mar 25, 2022
1 parent 6a9f88f commit a7f6c0a
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ type PipelineLoopRunStatus struct {
type PipelineLoopPipelineRunStatus struct {
// iteration number
Iteration int `json:"iteration,omitempty"`
// the current iteration item
IterationItem interface{} `json:"iterationItem,omitempty"`
// Status is the TaskRunStatus for the corresponding TaskRun
// +optional
Status *v1beta1.PipelineRunStatus `json:"status,omitempty"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
duckv1 "knative.dev/pkg/apis/duck/v1"

"github.com/hashicorp/go-multierror"
"github.com/kubeflow/kfp-tekton/tekton-catalog/cache/pkg"
cache "github.com/kubeflow/kfp-tekton/tekton-catalog/cache/pkg"
"github.com/kubeflow/kfp-tekton/tekton-catalog/cache/pkg/model"
"github.com/kubeflow/kfp-tekton/tekton-catalog/pipeline-loops/pkg/apis/pipelineloop"
pipelineloopv1alpha1 "github.com/kubeflow/kfp-tekton/tekton-catalog/pipeline-loops/pkg/apis/pipelineloop/v1alpha1"
Expand Down Expand Up @@ -367,7 +367,7 @@ func (c *Reconciler) reconcile(ctx context.Context, run *v1alpha1.Run, status *p
}

// Update status of PipelineRuns. Return the PipelineRun representing the highest loop iteration.
highestIteration, currentRunningPrs, failedPrs, err := c.updatePipelineRunStatus(ctx, run, status)
highestIteration, currentRunningPrs, failedPrs, err := c.updatePipelineRunStatus(ctx, iterationElements, run, status)
if err != nil {
return fmt.Errorf("error updating PipelineRun status for Run %s/%s: %w", run.Namespace, run.Name, err)
}
Expand Down Expand Up @@ -447,8 +447,9 @@ func (c *Reconciler) reconcile(ctx context.Context, run *v1alpha1.Run, status *p
return fmt.Errorf("error creating PipelineRun from Run %s while retrying: %w", run.Name, err)
}
status.PipelineRuns[pr.Name] = &pipelineloopv1alpha1.PipelineLoopPipelineRunStatus{
Iteration: highestIteration,
Status: &pr.Status,
Iteration: highestIteration,
IterationItem: iterationElements[highestIteration-1],
Status: &pr.Status,
}
logger.Infof("Retried failed pipelineRun: %s with new pipelineRun: %s", failedPr.Name, pr.Name)
}
Expand Down Expand Up @@ -523,10 +524,10 @@ func (c *Reconciler) reconcile(ctx context.Context, run *v1alpha1.Run, status *p
if err != nil {
return fmt.Errorf("error creating PipelineRun from Run %s: %w", run.Name, err)
}

status.PipelineRuns[pr.Name] = &pipelineloopv1alpha1.PipelineLoopPipelineRunStatus{
Iteration: nextIteration,
Status: &pr.Status,
Iteration: nextIteration,
IterationItem: iterationElements[nextIteration-1],
Status: &pr.Status,
}
nextIteration++
if nextIteration > totalIterations {
Expand Down Expand Up @@ -664,7 +665,7 @@ func (c *Reconciler) cancelAllPipelineRuns(ctx context.Context, run *v1alpha1.Ru
return nil
}

func (c *Reconciler) updatePipelineRunStatus(ctx context.Context, run *v1alpha1.Run, status *pipelineloopv1alpha1.PipelineLoopRunStatus) (int, []*v1beta1.PipelineRun, []*v1beta1.PipelineRun, error) {
func (c *Reconciler) updatePipelineRunStatus(ctx context.Context, iterationElements []interface{}, run *v1alpha1.Run, status *pipelineloopv1alpha1.PipelineLoopRunStatus) (int, []*v1beta1.PipelineRun, []*v1beta1.PipelineRun, error) {
logger := logging.FromContext(ctx)
highestIteration := 0
var currentRunningPrs []*v1beta1.PipelineRun
Expand Down Expand Up @@ -728,8 +729,9 @@ func (c *Reconciler) updatePipelineRunStatus(ctx context.Context, run *v1alpha1.
}
}
status.PipelineRuns[pr.Name] = &pipelineloopv1alpha1.PipelineLoopPipelineRunStatus{
Iteration: iteration,
Status: &pr.Status,
Iteration: iteration,
IterationItem: iterationElements[iteration-1],
Status: &pr.Status,
}
if iteration > highestIteration {
highestIteration = iteration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@ func checkRunStatus(t *testing.T, run *v1alpha1.Run, expectedStatus map[string]p
t.Errorf("Run status for PipelineRun %s has iteration number %d instead of %d",
expectedPipelineRunName, actualPipelineRunStatus.Iteration, expectedPipelineRunStatus.Iteration)
}
acturalIterationItem, error := json.Marshal(actualPipelineRunStatus.IterationItem)
expectedIterationItem, _ := json.Marshal(expectedPipelineRunStatus.IterationItem)
if error != nil || string(acturalIterationItem) != string(expectedIterationItem) {
t.Errorf("Run status for PipelineRun %s has iteration item %v instead of %v",
expectedPipelineRunName, actualPipelineRunStatus.IterationItem, expectedPipelineRunStatus.IterationItem)
}
if d := cmp.Diff(expectedPipelineRunStatus.Status, actualPipelineRunStatus.Status, cmpopts.IgnoreTypes(apis.Condition{}.LastTransitionTime.Inner.Time)); d != "" {
t.Errorf("Run status for PipelineRun %s is incorrect. Diff %s", expectedPipelineRunName, diff.PrintWantGot(d))
}
Expand Down Expand Up @@ -1599,10 +1605,11 @@ func TestReconcilePipelineLoopRun(t *testing.T) {
}

// Verify Run status contains status for all PipelineRuns.
_, iterationElements, _ := computeIterations(tc.run, &tc.pipelineloop.Spec)
expectedPipelineRuns := map[string]pipelineloopv1alpha1.PipelineLoopPipelineRunStatus{}
i := 1
for _, pr := range tc.expectedPipelineruns {
expectedPipelineRuns[pr.Name] = pipelineloopv1alpha1.PipelineLoopPipelineRunStatus{Iteration: i, Status: &pr.Status}
expectedPipelineRuns[pr.Name] = pipelineloopv1alpha1.PipelineLoopPipelineRunStatus{Iteration: i, IterationItem: iterationElements[i-1], Status: &pr.Status}
if pr.Labels["deleted"] != "True" {
i = i + 1 // iteration remain same, incase previous pr was a retry.
}
Expand Down

0 comments on commit a7f6c0a

Please sign in to comment.