Skip to content

Commit

Permalink
chore(v2): artifact passing via local path (#6285)
Browse files Browse the repository at this point in the history
* chore(v2): add implicit dependencies for artifacts

* chore(v2): artifact passing via local path

* clean up
  • Loading branch information
Bobgy authored Aug 11, 2021
1 parent 62a4181 commit 2b78d16
Show file tree
Hide file tree
Showing 12 changed files with 506 additions and 186 deletions.
2 changes: 1 addition & 1 deletion samples/test/two_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def train_op(
)


@dsl.pipeline(name='two_step_pipeline')
@dsl.pipeline(name='two-step-pipeline')
def two_step_pipeline(uri: str = 'uri-to-import', some_int: int = 1234):
preprocess_task = preprocess_op(uri=uri, some_int=some_int)
train_task = train_op(
Expand Down
5 changes: 5 additions & 0 deletions samples/test/two_step_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ def verify_with_specific_pipeline_root(
pipeline_func=two_step_pipeline,
mode=kfp.dsl.PipelineExecutionMode.V1_LEGACY
),
TestCase(
pipeline_func=two_step_pipeline,
verify_func=verify,
mode=kfp.dsl.PipelineExecutionMode.V2_ENGINE,
),
# Verify default pipeline_root with MinIO
TestCase(
pipeline_func=two_step_pipeline,
Expand Down
14 changes: 8 additions & 6 deletions v2/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ image-launcher-dev:
# make dev runs developing actions from end to end for KFP v2.
# Build images, push them to dev image registry, build backend compiler, compile pipelines and run them.
.PHONY: dev
dev: image-dev build/compiler pipeline-hello_world pipeline-producer_consumer_param
dev: image-dev build/compiler pipeline/v2/hello_world pipeline/v2/producer_consumer_param pipeline/test/two_step

.PHONY: build image-dev
build: build/launcher build/compiler build/driver
Expand All @@ -91,14 +91,16 @@ build/driver:
image-driver-dev:

# tests pipelines
pipeline-hello_world:
pipeline-producer_consumer_param:
pipeline/v2/hello_world:
pipeline/v2/producer_consumer_param:
pipeline/test/two_step:

# Run a test pipeline using v2 CLI compiler (v2 engine mode).
.PHONY: pipeline-%
pipeline-%:
.PHONY: pipeline/%
pipeline/%:
tmp="$$(mktemp -d)" \
&& dsl-compile-v2 --py $(REPO_ROOT)/samples/v2/$*.py --out "$${tmp}/$*.json" \
&& mkdir -p "$$(dirname $${tmp}/$*)" \
&& dsl-compile-v2 --py $(REPO_ROOT)/samples/$*.py --out "$${tmp}/$*.json" \
&& build/compiler --spec "$${tmp}/$*.json" --driver "$(DEV_IMAGE_PREFIX)driver:latest" --launcher "$(DEV_IMAGE_PREFIX)launcher-v2:latest" > "$${tmp}/$*.yaml" \
&& if which argo >/dev/null; then argo lint "$${tmp}/$*.yaml"; else echo "argo CLI not found, skip linting"; fi \
&& kfp run submit -f "$${tmp}/$*.yaml" -e default -r "$*_$${RANDOM}"
Expand Down
21 changes: 19 additions & 2 deletions v2/cmd/driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
package main

import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -91,14 +93,17 @@ func drive() (err error) {
if err = validate(); err != nil {
return err
}
glog.Infof("input ComponentSpec:%s\n", prettyPrint(*componentSpecJson))
componentSpec := &pipelinespec.ComponentSpec{}
if err := jsonpb.UnmarshalString(*componentSpecJson, componentSpec); err != nil {
return fmt.Errorf("failed to unmarshal component spec, error: %w\ncomponentSpec: %v", err, componentSpecJson)
}
glog.Infof("input TaskSpec:%s\n", prettyPrint(*taskSpecJson))
taskSpec := &pipelinespec.PipelineTaskSpec{}
if err := jsonpb.UnmarshalString(*taskSpecJson, taskSpec); err != nil {
return fmt.Errorf("failed to unmarshal task spec, error: %w\ntask: %v", err, taskSpecJson)
}
glog.Infof("input RuntimeConfig:%s\n", prettyPrint(*runtimeConfigJson))
runtimeConfig := &pipelinespec.PipelineJob_RuntimeConfig{}
if err := jsonpb.UnmarshalString(*runtimeConfigJson, runtimeConfig); err != nil {
return fmt.Errorf("failed to unmarshal runtime config, error: %w\nruntimeConfig: %v", err, runtimeConfigJson)
Expand Down Expand Up @@ -134,28 +139,40 @@ func drive() (err error) {
return err
}
if execution.ID != 0 {
glog.Infof("output execution.ID=%v", execution.ID)
if err = writeFile(*executionIDPath, []byte(fmt.Sprint(execution.ID))); err != nil {
return fmt.Errorf("failed to write execution ID to file: %w", err)
}
}
if execution.Context != 0 {
glog.Infof("output execution.Context=%v", execution.Context)
if err = writeFile(*contextIDPath, []byte(fmt.Sprint(execution.Context))); err != nil {
return fmt.Errorf("failed to write context ID to file: %w", err)
}
}
if execution.ExecutorInput != nil {
marshaler := jsonpb.Marshaler{}
executorInputJson, err := marshaler.MarshalToString(execution.ExecutorInput)
executorInputJSON, err := marshaler.MarshalToString(execution.ExecutorInput)
if err != nil {
return fmt.Errorf("failed to marshal ExecutorInput to JSON: %w", err)
}
if err = writeFile(*executorInputPath, []byte(executorInputJson)); err != nil {
glog.Infof("output ExecutorInput:%s\n", prettyPrint(executorInputJSON))
if err = writeFile(*executorInputPath, []byte(executorInputJSON)); err != nil {
return fmt.Errorf("failed to write ExecutorInput to file: %w", err)
}
}
return nil
}

func prettyPrint(jsonStr string) string {
var prettyJSON bytes.Buffer
err := json.Indent(&prettyJSON, []byte(jsonStr), "", " ")
if err != nil {
return jsonStr
}
return string(prettyJSON.Bytes())
}

func writeFile(path string, data []byte) (err error) {
if path == "" {
return fmt.Errorf("path is not specified")
Expand Down
3 changes: 1 addition & 2 deletions v2/cmd/launcher-v2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ var (
componentSpecJSON = flag.String("component_spec", "", "The JSON-encoded ComponentSpec.")
podName = flag.String("pod_name", "", "Kubernetes Pod name.")
podUID = flag.String("pod_uid", "", "Kubernetes Pod UID.")
pipelineRoot = flag.String("pipeline_root", "", "The root output directory in which to store output artifacts.")
mlmdServerAddress = flag.String("mlmd_server_address", "", "The MLMD gRPC server address.")
mlmdServerPort = flag.String("mlmd_server_port", "8080", "The MLMD gRPC server port.")
)
Expand Down Expand Up @@ -61,14 +60,14 @@ func run() error {
Namespace: namespace,
PodName: *podName,
PodUID: *podUID,
PipelineRoot: *pipelineRoot,
MLMDServerAddress: *mlmdServerAddress,
MLMDServerPort: *mlmdServerPort,
}
launcher, err := component.NewLauncherV2(ctx, *executionID, *executorInputJSON, *componentSpecJSON, flag.Args(), opts)
if err != nil {
return err
}
glog.V(5).Info(launcher.Info())
if err := launcher.Execute(ctx); err != nil {
return err
}
Expand Down
59 changes: 37 additions & 22 deletions v2/compiler/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,35 +180,50 @@ func (c *workflowCompiler) addDAGDriverTemplate() string {

func addImplicitDependencies(dagSpec *pipelinespec.DagSpec) error {
for _, task := range dagSpec.GetTasks() {
wrap := func(err error) error {
return fmt.Errorf("failed to add implicit deps: %w", err)
}
addDep := func(producer string) error {
if _, ok := dagSpec.GetTasks()[producer]; !ok {
return fmt.Errorf("unknown producer task %q in DAG", producer)
}
if task.DependentTasks == nil {
task.DependentTasks = make([]string, 0)
}
// add the dependency if it's not already added
found := false
for _, dep := range task.DependentTasks {
if dep == producer {
found = true
break
}
}
if !found {
task.DependentTasks = append(task.DependentTasks, producer)
}
return nil
}
// TODO(Bobgy): add implicit dependencies introduced by artifacts
for _, input := range task.GetInputs().GetParameters() {
wrap := func(err error) error {
return fmt.Errorf("failed to add implicit deps: %w", err)
}
switch input.Kind.(type) {
switch input.GetKind().(type) {
case *pipelinespec.TaskInputsSpec_InputParameterSpec_TaskOutputParameter:
producer := input.GetTaskOutputParameter().GetProducerTask()
_, ok := dagSpec.GetTasks()[producer]
if !ok {
return wrap(fmt.Errorf("unknown producer task %q in DAG", producer))
}
if task.DependentTasks == nil {
task.DependentTasks = make([]string, 0)
}
// add the dependency if it's not already added
found := false
for _, dep := range task.DependentTasks {
if dep == producer {
found = true
}
}
if !found {
task.DependentTasks = append(task.DependentTasks, producer)
if err := addDep(input.GetTaskOutputParameter().GetProducerTask()); err != nil {
return wrap(err)
}
case *pipelinespec.TaskInputsSpec_InputParameterSpec_TaskFinalStatus_:
return wrap(fmt.Errorf("task final status not supported yet"))
default:
// other input types do not introduce implicit dependencies
// other parameter input types do not introduce implicit dependencies
}
}
for _, input := range task.GetInputs().GetArtifacts() {
switch input.GetKind().(type) {
case *pipelinespec.TaskInputsSpec_InputArtifactSpec_TaskOutputArtifact:
if err := addDep(input.GetTaskOutputArtifact().GetProducerTask()); err != nil {
return wrap(err)
}
default:
// other artifact input types do not introduce implicit dependencies
}
}
}
Expand Down
Loading

0 comments on commit 2b78d16

Please sign in to comment.