Skip to content

Commit

Permalink
feature(backend): support embedded-status: minimal (kubeflow#976)
Browse files Browse the repository at this point in the history
* feature(backend): support embedded-status: minimal

Add logic to support embedded-status feature when using
minimal setting. For now, the TaskRun/Run status is retrieved
and inserted to PipelineRun.Status and stored into ml-pipeline
backend storage.

Signed-off-by: Yihong Wang <yh.wang@ibm.com>

* Add embedded-status feature flag

Use `full` as the default setting for embedded-status
feature flag.

Signed-off-by: Yihong Wang <yh.wang@ibm.com>
  • Loading branch information
yhwang authored Jun 16, 2022
1 parent 66c0944 commit 5fd2b73
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 8 deletions.
65 changes: 62 additions & 3 deletions backend/src/agent/persistence/client/workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,14 @@
package client

import (
"context"
"fmt"

"github.com/kubeflow/pipelines/backend/src/common/util"
wfapi "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
wfclientset "github.com/tektoncd/pipeline/pkg/client/clientset/versioned"
"github.com/tektoncd/pipeline/pkg/client/informers/externalversions/pipeline/v1beta1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"k8s.io/client-go/tools/cache"
)
Expand All @@ -27,13 +33,17 @@ type WorkflowClientInterface interface {

// WorkflowClient is a client to call the Workflow API.
type WorkflowClient struct {
informer v1beta1.PipelineRunInformer
informer v1beta1.PipelineRunInformer
clientset *wfclientset.Clientset
}

// NewWorkflowClient creates an instance of the WorkflowClient.
func NewWorkflowClient(informer v1beta1.PipelineRunInformer) *WorkflowClient {
func NewWorkflowClient(informer v1beta1.PipelineRunInformer,
clientset *wfclientset.Clientset) *WorkflowClient {

return &WorkflowClient{
informer: informer,
informer: informer,
clientset: clientset,
}
}

Expand Down Expand Up @@ -61,5 +71,54 @@ func (c *WorkflowClient) Get(namespace string, name string) (
return nil, util.NewCustomError(err, code,
"Error retrieving workflow (%v) in namespace (%v): %v", name, namespace, err)
}
if workflow.Status.ChildReferences != nil {
hasTaskRun, hasRun := false, false
for _, child := range workflow.Status.ChildReferences {
switch child.Kind {
case "TaskRun":
hasTaskRun = true
case "Run":
hasRun = true
default:
}
}
// TODO: restruct the workflow to contain taskrun/run status, these 2 field
// will be removed in the future
if hasTaskRun {
// fetch taskrun status and insert into Status.TaskRuns
taskruns, err := c.clientset.TektonV1beta1().TaskRuns(namespace).List(context.Background(), v1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", util.LabelKeyWorkflowRunId, workflow.Labels[util.LabelKeyWorkflowRunId]),
})
if err != nil {
return nil, util.NewInternalServerError(err, "can't fetch taskruns")
}

taskrunStatuses := make(map[string]*wfapi.PipelineRunTaskRunStatus, len(taskruns.Items))
for _, taskrun := range taskruns.Items {
taskrunStatuses[taskrun.Name] = &wfapi.PipelineRunTaskRunStatus{
PipelineTaskName: taskrun.Labels["tekton.dev/pipelineTask"],
Status: taskrun.Status.DeepCopy(),
}
}
workflow.Status.TaskRuns = taskrunStatuses
}
if hasRun {
runs, err := c.clientset.TektonV1alpha1().Runs(namespace).List(context.Background(), v1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", util.LabelKeyWorkflowRunId, workflow.Labels[util.LabelKeyWorkflowRunId]),
})
if err != nil {
return nil, util.NewInternalServerError(err, "can't fetch runs")
}
runStatuses := make(map[string]*wfapi.PipelineRunRunStatus, len(runs.Items))
for _, run := range runs.Items {
runStatuses[run.Name] = &wfapi.PipelineRunRunStatus{
PipelineTaskName: run.Labels["tekton.dev/pipelineTask"],
Status: run.Status.DeepCopy(),
}
}
workflow.Status.Runs = runStatuses
}
}

return util.NewWorkflow(workflow), nil
}
1 change: 1 addition & 0 deletions backend/src/agent/persistence/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func main() {
controller := NewPersistenceAgent(
swfInformerFactory,
workflowInformerFactory,
workflowClient,
pipelineClient,
util.NewRealTime())

Expand Down
8 changes: 5 additions & 3 deletions backend/src/agent/persistence/persistence_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
swfinformers "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/informers/externalversions"
log "github.com/sirupsen/logrus"
workflowregister "github.com/tektoncd/pipeline/pkg/apis/pipeline"
wfclientset "github.com/tektoncd/pipeline/pkg/client/clientset/versioned"
workflowinformers "github.com/tektoncd/pipeline/pkg/client/informers/externalversions"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -46,24 +47,25 @@ type PersistenceAgent struct {
func NewPersistenceAgent(
swfInformerFactory swfinformers.SharedInformerFactory,
workflowInformerFactory workflowinformers.SharedInformerFactory,
clientset *wfclientset.Clientset,
pipelineClient *client.PipelineClient,
time util.TimeInterface) *PersistenceAgent {
// obtain references to shared informers
swfInformer := swfInformerFactory.Scheduledworkflow().V1beta1().ScheduledWorkflows()
workflowInformer := workflowInformerFactory.Tekton().V1beta1().PipelineRuns()
informer := workflowInformerFactory.Tekton().V1beta1().PipelineRuns()

// Add controller types to the default Kubernetes Scheme so Events can be
// logged for controller types.
swfScheme.AddToScheme(scheme.Scheme)

swfClient := client.NewScheduledWorkflowClient(swfInformer)
workflowClient := client.NewWorkflowClient(workflowInformer)
workflowClient := client.NewWorkflowClient(informer, clientset)

swfWorker := worker.NewPersistenceWorker(time, swfregister.Kind, swfInformer.Informer(), true,
worker.NewScheduledWorkflowSaver(swfClient, pipelineClient))

workflowWorker := worker.NewPersistenceWorker(time, workflowregister.PipelineRunControllerName,
workflowInformer.Informer(), true,
informer.Informer(), true,
worker.NewWorkflowSaver(workflowClient, pipelineClient, ttlSecondsAfterWorkflowFinish))

agent := &PersistenceAgent{
Expand Down
3 changes: 2 additions & 1 deletion frontend/src/lib/WorkflowParser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ export default class WorkflowParser {
// Find the output that matches this input and pull the value
if (
statusMap.get(component['name']) &&
statusMap.get(component['name'])['status']['taskSpec']
statusMap.get(component['name'])['status']['taskSpec'] &&
statusMap.get(component['name'])['status']['taskSpec']['params']
) {
for (const statusParam of statusMap.get(component['name'])!['status']['taskSpec']['params'])
if (statusParam['name'] === param['name']) statusParam['value'] = paramValue;
Expand Down
17 changes: 16 additions & 1 deletion guides/advanced_user_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ This page is an advanced KFP-Tekton guide on how to use Tekton specific features
- [Custom Task Limitations](#custom-task-limitations)
- [Custom Task for OpsGroup](#custom-task-for-opsgroup)
- [Tekton Pipeline Config for PodTemplate](#tekton-pipeline-config-for-podtemplate)
- [Tekton Feature Flags](#tekton-feature-flags)

## Using Tekton Custom Task on KFP-Tekton

Expand Down Expand Up @@ -201,4 +202,18 @@ pipeline_conf.set_automount_service_account_token(False) # InputType: Bool
self._test_pipeline_workflow(test_pipeline, 'test.yaml', tekton_pipeline_conf=pipeline_conf)
```

For more details on how this can be used in a real pipeline, visit the [Tekton Pipeline Conf example](/sdk/python/tests/compiler/testdata/tekton_pipeline_conf.py).
For more details on how this can be used in a real pipeline, visit the [Tekton Pipeline Conf example](/sdk/python/tests/compiler/testdata/tekton_pipeline_conf.py).

## Tekton Feature Flags
Tekton provides some features that you can configure via `feature-flgas` configmap under `tekton-pipelines`
namespace. Use these tekton features may impact kfp-tekton backend. Here is the list of features that
impact kfp-tekton backend:
- enable-custom-tasks: You can turn on/off custom task support by setting its value to true/false.
The default value for kfp-tekton deployment is `true`. If you are using custom tasks and set the flag value
to `false`, the pipeline will be in the running state until timeout. Because custom tasks inside the pipeline
are not able to be handled properly.
- embedded-status: You can find details for this feature flag [here](https://github.com/tektoncd/community/blob/main/teps/0100-embedded-taskruns-and-runs-status-in-pipelineruns.md).
The default value for kfp-tekton deployment is `full`, which stores all TaskRuns/Runs statuses under PipelineRun's status.
kfp-tekton backend also supports the `minimal` setting, which only records the list of TaskRuns/Runs under PipelineRun's status.
In this case, statuses of TaskRuns/Runs only exist in their own CRs. kfp-tekton backend retrieves statuses of TaskRuns/Runs
from individual CR, aggregates, and stores them into the backend storage.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ rules:
- pipelineruns
- taskruns
- conditions
- runs
verbs:
- create
- get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ metadata:
app.kubernetes.io/part-of: tekton-pipelines
data:
enable-custom-tasks: "true"
embedded-status: "full"
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
listers "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1beta1"
"github.com/tektoncd/pipeline/pkg/names"
"github.com/tektoncd/pipeline/pkg/reconciler/events"
tkstatus "github.com/tektoncd/pipeline/pkg/status"
"go.uber.org/zap"
"gomodules.xyz/jsonpatch/v2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -763,6 +764,48 @@ func (c *Reconciler) updatePipelineRunStatus(ctx context.Context, iterationEleme
if iteration > highestIteration {
highestIteration = iteration
}
if pr.Status.ChildReferences != nil {
//fetch taskruns/runs status specifically for pipelineloop-break-operation first
for _, child := range pr.Status.ChildReferences {
if strings.HasPrefix(child.PipelineTaskName, "pipelineloop-break-operation") {
switch child.Kind {
case "TaskRun":
tr, err := tkstatus.GetTaskRunStatusForPipelineTask(ctx, c.pipelineClientSet, run.Namespace, child)
if err != nil {
logger.Errorf("can not get status for TaskRun, %v", err)
return 0, nil, nil, fmt.Errorf("could not get TaskRun %s."+
" %#v", child.Name, err)
}
if pr.Status.TaskRuns == nil {
pr.Status.TaskRuns = make(map[string]*v1beta1.PipelineRunTaskRunStatus)
}
pr.Status.TaskRuns[child.Name] = &v1beta1.PipelineRunTaskRunStatus{
PipelineTaskName: child.PipelineTaskName,
WhenExpressions: child.WhenExpressions,
Status: tr.DeepCopy(),
}
case "Run":
run, err := tkstatus.GetRunStatusForPipelineTask(ctx, c.pipelineClientSet, run.Namespace, child)
if err != nil {
logger.Errorf("can not get status for Run, %v", err)
return 0, nil, nil, fmt.Errorf("could not get Run %s."+
" %#v", child.Name, err)
}
if pr.Status.Runs == nil {
pr.Status.Runs = make(map[string]*v1beta1.PipelineRunRunStatus)
}

pr.Status.Runs[child.Name] = &v1beta1.PipelineRunRunStatus{
PipelineTaskName: child.PipelineTaskName,
WhenExpressions: child.WhenExpressions,
Status: run.DeepCopy(),
}
default:
//ignore
}
}
}
}
for _, runStatus := range pr.Status.Runs {
if strings.HasPrefix(runStatus.PipelineTaskName, "pipelineloop-break-operation") {
if !runStatus.Status.GetCondition(apis.ConditionSucceeded).IsUnknown() {
Expand Down

0 comments on commit 5fd2b73

Please sign in to comment.