Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Garbage collect the completed workflow after persisted to database #1802

Merged
merged 11 commits into from
Aug 13, 2019
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion backend/Dockerfile.persistenceagent
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ COPY --from=builder /bin/persistence_agent /bin/persistence_agent
COPY --from=builder /go/src/github.com/kubeflow/pipelines/third_party/license.txt /bin/license.txt

ENV NAMESPACE ""
ENV TTL_SECONDS_AFTER_WORKFLOW_FINISH ""

CMD persistence_agent --logtostderr=true --namespace=${NAMESPACE}
CMD persistence_agent --logtostderr=true --namespace=${NAMESPACE} --ttlSecondsAfterWorkflowFinish=${TTL_SECONDS_AFTER_WORKFLOW_FINISH}
41 changes: 22 additions & 19 deletions backend/src/agent/persistence/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,30 @@ import (
)

var (
masterURL string
kubeconfig string
initializeTimeout time.Duration
timeout time.Duration
mlPipelineAPIServerName string
mlPipelineAPIServerPort string
mlPipelineAPIServerBasePath string
mlPipelineServiceHttpPort string
mlPipelineServiceGRPCPort string
namespace string
masterURL string
kubeconfig string
initializeTimeout time.Duration
timeout time.Duration
mlPipelineAPIServerName string
mlPipelineAPIServerPort string
mlPipelineAPIServerBasePath string
mlPipelineServiceHttpPort string
mlPipelineServiceGRPCPort string
namespace string
ttlSecondsAfterWorkflowFinish int64
)

const (
kubeconfigFlagName = "kubeconfig"
masterFlagName = "master"
initializationTimeoutFlagName = "initializeTimeout"
timeoutFlagName = "timeout"
mlPipelineAPIServerBasePathFlagName = "mlPipelineAPIServerBasePath"
mlPipelineAPIServerNameFlagName = "mlPipelineAPIServerName"
mlPipelineAPIServerHttpPortFlagName = "mlPipelineServiceHttpPort"
mlPipelineAPIServerGRPCPortFlagName = "mlPipelineServiceGRPCPort"
namespaceFlagName = "namespace"
kubeconfigFlagName = "kubeconfig"
masterFlagName = "master"
initializationTimeoutFlagName = "initializeTimeout"
timeoutFlagName = "timeout"
mlPipelineAPIServerBasePathFlagName = "mlPipelineAPIServerBasePath"
mlPipelineAPIServerNameFlagName = "mlPipelineAPIServerName"
mlPipelineAPIServerHttpPortFlagName = "mlPipelineServiceHttpPort"
mlPipelineAPIServerGRPCPortFlagName = "mlPipelineServiceGRPCPort"
namespaceFlagName = "namespace"
ttlSecondsAfterWorkflowFinishFlagName = "ttlSecondsAfterWorkflowFinish"
)

func main() {
Expand Down Expand Up @@ -122,4 +124,5 @@ func init() {
flag.StringVar(&mlPipelineAPIServerBasePath, mlPipelineAPIServerBasePathFlagName,
"/apis/v1beta1", "The base path for the ML pipeline API server.")
flag.StringVar(&namespace, namespaceFlagName, "", "The namespace name used for Kubernetes informers to obtain the listers.")
flag.Int64Var(&ttlSecondsAfterWorkflowFinish, ttlSecondsAfterWorkflowFinishFlagName, 3600, "The TTL for Argo workflow to persist after workflow finish.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we should make it to longer like 7 days to not cause surprise to existing users?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
10 changes: 5 additions & 5 deletions backend/src/agent/persistence/persistence_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ type PersistenceAgent struct {

// NewPersistenceAgent returns a new persistence agent.
func NewPersistenceAgent(
swfInformerFactory swfinformers.SharedInformerFactory,
workflowInformerFactory workflowinformers.SharedInformerFactory,
pipelineClient *client.PipelineClient,
time util.TimeInterface) *PersistenceAgent {
swfInformerFactory swfinformers.SharedInformerFactory,
workflowInformerFactory workflowinformers.SharedInformerFactory,
pipelineClient *client.PipelineClient,
time util.TimeInterface) *PersistenceAgent {
// obtain references to shared informers
swfInformer := swfInformerFactory.Scheduledworkflow().V1beta1().ScheduledWorkflows()
workflowInformer := workflowInformerFactory.Argoproj().V1alpha1().Workflows()
Expand All @@ -64,7 +64,7 @@ func NewPersistenceAgent(

workflowWorker := worker.NewPersistenceWorker(time, workflowregister.Kind,
workflowInformer.Informer(), true,
worker.NewWorkflowSaver(workflowClient, pipelineClient))
worker.NewWorkflowSaver(workflowClient, pipelineClient, ttlSecondsAfterWorkflowFinish))

agent := &PersistenceAgent{
swfClient: swfClient,
Expand Down
10 changes: 5 additions & 5 deletions backend/src/agent/persistence/worker/persistence_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestPersistenceWorker_Success(t *testing.T) {
pipelineClient := client.NewPipelineClientFake()

// Set up peristence worker
saver := NewWorkflowSaver(workflowClient, pipelineClient)
saver := NewWorkflowSaver(workflowClient, pipelineClient, 100)
eventHandler := NewFakeEventHandler()
worker := NewPersistenceWorker(
util.NewFakeTimeForEpoch(),
Expand Down Expand Up @@ -84,7 +84,7 @@ func TestPersistenceWorker_NotFoundError(t *testing.T) {
pipelineClient := client.NewPipelineClientFake()

// Set up peristence worker
saver := NewWorkflowSaver(workflowClient, pipelineClient)
saver := NewWorkflowSaver(workflowClient, pipelineClient, 100)
eventHandler := NewFakeEventHandler()
worker := NewPersistenceWorker(
util.NewFakeTimeForEpoch(),
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestPersistenceWorker_GetWorklowError(t *testing.T) {
pipelineClient := client.NewPipelineClientFake()

// Set up peristence worker
saver := NewWorkflowSaver(workflowClient, pipelineClient)
saver := NewWorkflowSaver(workflowClient, pipelineClient, 100)
eventHandler := NewFakeEventHandler()
worker := NewPersistenceWorker(
util.NewFakeTimeForEpoch(),
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestPersistenceWorker_ReportWorkflowRetryableError(t *testing.T) {
"My Retriable Error"))

// Set up peristence worker
saver := NewWorkflowSaver(workflowClient, pipelineClient)
saver := NewWorkflowSaver(workflowClient, pipelineClient, 100)
eventHandler := NewFakeEventHandler()
worker := NewPersistenceWorker(
util.NewFakeTimeForEpoch(),
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestPersistenceWorker_ReportWorkflowNonRetryableError(t *testing.T) {
"My Permanent Error"))

// Set up peristence worker
saver := NewWorkflowSaver(workflowClient, pipelineClient)
saver := NewWorkflowSaver(workflowClient, pipelineClient, 100)
eventHandler := NewFakeEventHandler()
worker := NewPersistenceWorker(
util.NewFakeTimeForEpoch(),
Expand Down
24 changes: 16 additions & 8 deletions backend/src/agent/persistence/worker/workflow_saver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,24 @@ import (
"github.com/kubeflow/pipelines/backend/src/common/util"
log "github.com/sirupsen/logrus"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"time"
)

// WorkflowSaver provides a function to persist a workflow to a database.
type WorkflowSaver struct {
client client.WorkflowClientInterface
pipelineClient client.PipelineClientInterface
metricsReporter *MetricsReporter
client client.WorkflowClientInterface
pipelineClient client.PipelineClientInterface
metricsReporter *MetricsReporter
ttlSecondsAfterWorkflowFinish int64
}

func NewWorkflowSaver(client client.WorkflowClientInterface,
pipelineClient client.PipelineClientInterface) *WorkflowSaver {
pipelineClient client.PipelineClientInterface, ttlSecondsAfterWorkflowFinish int64) *WorkflowSaver {
return &WorkflowSaver{
client: client,
pipelineClient: pipelineClient,
metricsReporter: NewMetricsReporter(pipelineClient),
client: client,
pipelineClient: pipelineClient,
metricsReporter: NewMetricsReporter(pipelineClient),
ttlSecondsAfterWorkflowFinish: ttlSecondsAfterWorkflowFinish,
}
}

Expand All @@ -53,7 +56,12 @@ func (s *WorkflowSaver) Save(key string, namespace string, name string, nowEpoch
"Workflow (%s): transient failure: %v", key, err)

}

if wf.PersistedFinalState() && time.Now().Unix()-wf.FinishedAt() < s.ttlSecondsAfterWorkflowFinish {
hongye-sun marked this conversation as resolved.
Show resolved Hide resolved
// Skip persisting the workflow if the workflow is finished
// and the workflow hasn't being passing the TTL
log.Infof("Skip syncing Workflow (%v): workflow marked as persisted.", name)
return nil
}
// Save this Workflow to the database.
err = s.pipelineClient.ReportWorkflow(wf)
retry := util.HasCustomCode(err, util.CUSTOM_CODE_TRANSIENT)
Expand Down
80 changes: 65 additions & 15 deletions backend/src/agent/persistence/worker/workflow_saver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package worker

import (
"fmt"
"github.com/kubeflow/pipelines/bazel-pipelines/external/go_sdk/src/time"
"testing"

workflowapi "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
Expand All @@ -39,9 +40,7 @@ func TestWorkflow_Save_Success(t *testing.T) {

workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow)

saver := NewWorkflowSaver(
workflowFake,
pipelineFake)
saver := NewWorkflowSaver(workflowFake, pipelineFake, 100)

err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20)

Expand All @@ -53,9 +52,7 @@ func TestWorkflow_Save_NotFoundDuringGet(t *testing.T) {
workflowFake := client.NewWorkflowClientFake()
pipelineFake := client.NewPipelineClientFake()

saver := NewWorkflowSaver(
workflowFake,
pipelineFake)
saver := NewWorkflowSaver(workflowFake, pipelineFake, 100)

err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20)

Expand All @@ -70,9 +67,7 @@ func TestWorkflow_Save_ErrorDuringGet(t *testing.T) {

workflowFake.Put("MY_NAMESPACE", "MY_NAME", nil)

saver := NewWorkflowSaver(
workflowFake,
pipelineFake)
saver := NewWorkflowSaver(workflowFake, pipelineFake, 100)

err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20)

Expand All @@ -97,9 +92,7 @@ func TestWorkflow_Save_PermanentFailureWhileReporting(t *testing.T) {

workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow)

saver := NewWorkflowSaver(
workflowFake,
pipelineFake)
saver := NewWorkflowSaver(workflowFake, pipelineFake, 100)

err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20)

Expand All @@ -124,13 +117,70 @@ func TestWorkflow_Save_TransientFailureWhileReporting(t *testing.T) {

workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow)

saver := NewWorkflowSaver(
workflowFake,
pipelineFake)
saver := NewWorkflowSaver(workflowFake, pipelineFake, 100)

err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20)

assert.Equal(t, true, util.HasCustomCode(err, util.CUSTOM_CODE_TRANSIENT))
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "transient failure")
}

func TestWorkflow_Save_SkippedDueToFinalStatue(t *testing.T) {
workflowFake := client.NewWorkflowClientFake()
pipelineFake := client.NewPipelineClientFake()

// Add this will result in failure unless reporting is skipped
pipelineFake.SetError(util.NewCustomError(fmt.Errorf("Error"), util.CUSTOM_CODE_PERMANENT,
"My Permanent Error"))

workflow := util.NewWorkflow(&workflowapi.Workflow{
ObjectMeta: metav1.ObjectMeta{
Namespace: "MY_NAMESPACE",
Name: "MY_NAME",
Labels: map[string]string{util.LabelKeyWorkflowPersistedFinalState: "true"},
},
})

workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow)

saver := NewWorkflowSaver(workflowFake, pipelineFake, 100)

err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20)

assert.Equal(t, false, util.HasCustomCode(err, util.CUSTOM_CODE_TRANSIENT))
assert.Equal(t, nil, err)
}

func TestWorkflow_Save_FinalStatueNotSkippedDueToExceedTTL(t *testing.T) {
workflowFake := client.NewWorkflowClientFake()
pipelineFake := client.NewPipelineClientFake()

// Add this will result in failure unless reporting is skipped
pipelineFake.SetError(util.NewCustomError(fmt.Errorf("Error"), util.CUSTOM_CODE_PERMANENT,
"My Permanent Error"))

workflow := util.NewWorkflow(&workflowapi.Workflow{
ObjectMeta: metav1.ObjectMeta{
Namespace: "MY_NAMESPACE",
Name: "MY_NAME",
Labels: map[string]string{util.LabelKeyWorkflowPersistedFinalState: "true"},
},
Status: workflowapi.WorkflowStatus{
FinishedAt: metav1.Now(),
},
})

workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow)

saver := NewWorkflowSaver(workflowFake, pipelineFake, 1)

// Sleep 2 seconds to make sure workflow passed TTL
time.Sleep(2 * time.Second)

err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20)

assert.Equal(t, false, util.HasCustomCode(err, util.CUSTOM_CODE_TRANSIENT))
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "permanent failure")
}
Loading