Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backend] Cache - Add cache_enabled label for cache filtering #3352

Merged
merged 18 commits into from
Mar 31, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,8 @@ func (r *ResourceManager) CreateRun(apiRun *api.Run) (*model.RunDetail, error) {

// Disable istio sidecar injection
workflow.SetAnnotationsToAllTemplates(util.AnnotationKeyIstioSidecarInject, util.AnnotationValueIstioSidecarInjectDisabled)
// Add a KFP specific label for cache service filtering
workflow.SetLabelsToAllTemplates(util.LabelKeyCacheEnabled, util.LabelValueCacheEnabled)
// Append provided parameter
workflow.OverrideParameters(parameters)

Expand Down
13 changes: 9 additions & 4 deletions backend/src/common/util/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,20 @@ const (
// It captures whether the name of the owning ScheduledWorkflow.
LabelKeyWorkflowScheduledWorkflowName = constants.FullName + "/scheduledWorkflowName"


LabelKeyWorkflowRunId = "pipeline/runid"
LabelKeyWorkflowRunId = "pipeline/runid"
LabelKeyWorkflowPersistedFinalState = "pipeline/persistedFinalState"

// LabelKeyWorkflowEpoch is a Workflow annotation key.
// It captures the the name of the Run.
AnnotationKeyRunName = "pipelines.kubeflow.org/run_name"

AnnotationKeyIstioSidecarInject = "sidecar.istio.io/inject"
AnnotationValueIstioSidecarInjectEnabled = "true"
AnnotationKeyIstioSidecarInject = "sidecar.istio.io/inject"
AnnotationValueIstioSidecarInjectEnabled = "true"
AnnotationValueIstioSidecarInjectDisabled = "false"

// LabelKeyCacheEnabled is a workflow annotation key.
// It captures whether this run will be selected by cache service.
LabelKeyCacheEnabled = "pipelines.kubeflow.org/cache_enabled"
LabelValueCacheEnabled = "true"
LabelValueCacheDisabled = "false"
)
13 changes: 13 additions & 0 deletions backend/src/common/util/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,19 @@ func (w *Workflow) SetAnnotationsToAllTemplates(key string, value string) {
}
}

// SetLabels sets labels on all templates in a Workflow
func (w *Workflow) SetLabelsToAllTemplates(key string, value string) {
if len(w.Spec.Templates) == 0 {
return
}
for index, _ := range w.Spec.Templates {
if w.Spec.Templates[index].Metadata.Labels == nil {
w.Spec.Templates[index].Metadata.Labels = make(map[string]string)
}
w.Spec.Templates[index].Metadata.Labels[key] = value
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this code skip setting the cache value if it exists? This way it will respect user's input if user disable the cache for a given step.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The cache server will only receive the pod with labels: cache_enabled = true. In the future we can have UI support with toggling cache behavior for a run with this label. Per step cache behavior control will be base on MaxCacheStaleness.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this code skip setting the cache value if it exists? This way it will respect user's input if user disable the cache for a given step.

Per-step cache disabling is performed using a different mechanism (setting task.max_cache_staleness = "0").

I'm OK with not changing the value if it already exists. But it should not exist since it's a vendored label and SDK does not set it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused. Do we have two flags to control the cache for a given step, cache_enabled and max_cache_staleness? if one supersets the other, we should consider dedup them.

If cache_enabled is not exposed to end user, what's the purpose of having it? Should it be a feature flag at the caching server side, instead of at the workflow side?

}
}

// SetOwnerReferences sets owner references on a Workflow.
func (w *Workflow) SetOwnerReferences(schedule *swfapi.ScheduledWorkflow) {
w.OwnerReferences = []metav1.OwnerReference{
Expand Down
37 changes: 36 additions & 1 deletion backend/src/common/util/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
package util

import (
"github.com/ghodss/yaml"
"testing"

workflowapi "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/ghodss/yaml"
swfapi "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -252,6 +253,40 @@ func TestWorkflow_SetOwnerReferences(t *testing.T) {
assert.Equal(t, expected, workflow.Get())
}

func TestWorkflow_SetLabelsToAllTemplates(t *testing.T) {
workflow := NewWorkflow(&workflowapi.Workflow{
ObjectMeta: metav1.ObjectMeta{
Name: "WORKFLOW_NAME",
},
Spec: workflowapi.WorkflowSpec{
Templates: []workflowapi.Template{
workflowapi.Template{
Metadata: workflowapi.Metadata{},
},
},
},
})
workflow.SetLabelsToAllTemplates("key", "value")
expected := &workflowapi.Workflow{
ObjectMeta: metav1.ObjectMeta{
Name: "WORKFLOW_NAME",
},
Spec: workflowapi.WorkflowSpec{
Templates: []workflowapi.Template{
workflowapi.Template{
Metadata: workflowapi.Metadata{
Labels: map[string]string{
"key": "value",
},
},
},
},
},
}

assert.Equal(t, expected, workflow.Get())
}

func TestSetLabels(t *testing.T) {
workflow := NewWorkflow(&workflowapi.Workflow{
ObjectMeta: metav1.ObjectMeta{
Expand Down