Skip to content

Commit

Permalink
Fixed the compilation errors due to recent backend changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
Ark-kun committed Jan 25, 2019
1 parent 36d9912 commit a374585
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 47 deletions.
1 change: 1 addition & 0 deletions backend/src/apiserver/resource/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"//backend/src/crd/pkg/client/clientset/versioned/typed/scheduledworkflow/v1beta1:go_default_library",
"@com_github_argoproj_argo//pkg/apis/workflow/v1alpha1:go_default_library",
"@com_github_argoproj_argo//pkg/client/clientset/versioned/typed/workflow/v1alpha1:go_default_library",
"@com_github_cenkalti_backoff//:go_default_library",
"@com_github_golang_glog//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library",
Expand Down
37 changes: 36 additions & 1 deletion backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

workflowapi "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
workflowclient "github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"github.com/cenkalti/backoff"
"github.com/golang/glog"
api "github.com/kubeflow/pipelines/backend/api/go_client"
"github.com/kubeflow/pipelines/backend/src/apiserver/common"
Expand Down Expand Up @@ -262,8 +263,42 @@ func (r *ResourceManager) ListJobs(filterContext *common.FilterContext, opts *li
return r.jobStore.ListJobs(filterContext, opts)
}

// TerminateWorkflow terminates a workflow by setting its activeDeadlineSeconds to 0
func TerminateWorkflow(wfClient workflowclient.WorkflowInterface, name string) error {
patchObj := map[string]interface{}{
"spec": map[string]interface{}{
"activeDeadlineSeconds": 0,
},
}
var err error
patch, err := json.Marshal(patchObj)
if err != nil {
return util.NewInternalServerError(err, "Unexpected error while marshalling a patch object.")
}

var operation = func() error {
_, err = wfClient.Patch(name, types.MergePatchType, patch)
//if err != nil && !apierr.IsConflict(err) //we can stop immediately
return err
}
var backoffPolicy = backoff.WithMaxRetries(backoff.NewConstantBackOff(100), 10)
err = backoff.Retry(operation, backoffPolicy)
return err
}

func (r *ResourceManager) TerminateRun(runId string) error {
return r.runStore.TerminateRun(runId)
run, err := r.runStore.GetRun(runId)
if err != nil {
return util.NewInternalServerError(err, "Failed to get run info: %s", err.Error())
}

err = r.runStore.TerminateRun(runId)
if err != nil {
return util.Wrap(err, "Terminate pipeline failed")
}

err = TerminateWorkflow(r.workflowClient, run.Name)
return util.Wrap(err, "Terminate pipeline failed")
}

func (r *ResourceManager) GetJob(id string) (*model.Job, error) {
Expand Down
4 changes: 0 additions & 4 deletions backend/src/apiserver/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,11 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//backend/api:go_default_library",
"//backend/src/apiserver/client:go_default_library",
"//backend/src/apiserver/common:go_default_library",
"//backend/src/apiserver/list:go_default_library",
"//backend/src/apiserver/model:go_default_library",
"//backend/src/common/util:go_default_library",
"@com_github_argoproj_argo//pkg/apis/workflow/v1alpha1:go_default_library",
"@com_github_argoproj_argo//pkg/client/clientset/versioned/typed/workflow/v1alpha1:go_default_library",
"@com_github_cenkalti_backoff//:go_default_library",
"@com_github_ghodss_yaml//:go_default_library",
"@com_github_go_sql_driver_mysql//:go_default_library",
"@com_github_golang_glog//:go_default_library",
Expand All @@ -40,7 +37,6 @@ go_library(
"@com_github_minio_minio_go//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_vividcortex_mysqlerr//:go_default_library",
"@io_k8s_apimachinery//pkg/types:go_default_library",
"@io_k8s_apimachinery//pkg/util/json:go_default_library",
],
)
Expand Down
43 changes: 2 additions & 41 deletions backend/src/apiserver/storage/run_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,12 @@ import (

sq "github.com/Masterminds/squirrel"
workflowapi "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
workflowclient "github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"github.com/cenkalti/backoff"
"github.com/golang/glog"
api "github.com/kubeflow/pipelines/backend/api/go_client"
"github.com/kubeflow/pipelines/backend/src/apiserver/client"
"github.com/kubeflow/pipelines/backend/src/apiserver/common"
"github.com/kubeflow/pipelines/backend/src/apiserver/list"
"github.com/kubeflow/pipelines/backend/src/apiserver/model"
"github.com/kubeflow/pipelines/backend/src/common/util"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
)

Expand Down Expand Up @@ -493,36 +489,8 @@ func NewRunStore(db *DB, time util.TimeInterface) *RunStore {
return &RunStore{db: db, resourceReferenceStore: NewResourceReferenceStore(db), time: time}
}

// TerminateWorkflow terminates a workflow by setting its activeDeadlineSeconds to 0
func TerminateWorkflow(wfClient workflowclient.WorkflowInterface, name string) error {
patchObj := map[string]interface{}{
"spec": map[string]interface{}{
"activeDeadlineSeconds": 0,
},
}
var err error
patch, err := json.Marshal(patchObj)
if err != nil {
return util.NewInternalServerError(err, "Unexpected error while marshalling a patch object.")
}

var operation = func() error {
_, err = wfClient.Patch(name, types.MergePatchType, patch)
//if err != nil && !apierr.IsConflict(err) //we can stop immediately
return err
}
var backoffPolicy = backoff.WithMaxRetries(backoff.NewConstantBackOff(100), 10)
err = backoff.Retry(operation, backoffPolicy)
return err
}

func (s *RunStore) TerminateRun(runId string) error {
run, err := s.GetRun(runId)
if err != nil {
return util.NewInternalServerError(err, "Failed to get run info: %s", err.Error())
}

_, err = s.db.Exec(`
_, err := s.db.Exec(`
UPDATE run_details
SET Conditions = ?,
WHERE UUID = ? AND Conditions = ?`,
Expand All @@ -531,12 +499,5 @@ func (s *RunStore) TerminateRun(runId string) error {
return util.NewInternalServerError(err, "Failed to start terminating the run: %s", err.Error())
}

workflowInterface, err := client.CreateWorkflowClient("default")
if err != nil {
return err
}

err = TerminateWorkflow(workflowInterface, run.Name)

return err
return nil
}
2 changes: 1 addition & 1 deletion backend/src/common/client/api_server/run_client_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,6 @@ func (c *RunClientFake) Terminate(params *runparams.TerminateRunParams) error {
case RunForDefaultTest:
return nil
default:
return fmt.Errorf(InvalidFakeRequest)
return fmt.Errorf(InvalidFakeRequest, params.RunID)
}
}

0 comments on commit a374585

Please sign in to comment.