Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
zanedurante authored Jul 29, 2019
2 parents 6ae7ed3 + 0283ddd commit 12bfc6c
Show file tree
Hide file tree
Showing 92 changed files with 2,134 additions and 358 deletions.
2 changes: 2 additions & 0 deletions backend/src/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func startRpcServer(resourceManager *resource.ResourceManager) {
api.RegisterRunServiceServer(s, server.NewRunServer(resourceManager))
api.RegisterJobServiceServer(s, server.NewJobServer(resourceManager))
api.RegisterReportServiceServer(s, server.NewReportServer(resourceManager))
api.RegisterVisualizationServiceServer(s, server.NewVisualizationServer(resourceManager))

// Register reflection service on gRPC server.
reflection.Register(s)
Expand All @@ -106,6 +107,7 @@ func startHttpProxy(resourceManager *resource.ResourceManager) {
registerHttpHandlerFromEndpoint(api.RegisterJobServiceHandlerFromEndpoint, "JobService", ctx, mux)
registerHttpHandlerFromEndpoint(api.RegisterRunServiceHandlerFromEndpoint, "RunService", ctx, mux)
registerHttpHandlerFromEndpoint(api.RegisterReportServiceHandlerFromEndpoint, "ReportService", ctx, mux)
registerHttpHandlerFromEndpoint(api.RegisterVisualizationServiceHandlerFromEndpoint, "Visualization", ctx, mux)

// Create a top level mux to include both pipeline upload server and gRPC servers.
topMux := http.NewServeMux()
Expand Down
2 changes: 2 additions & 0 deletions backend/src/apiserver/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"run_server.go",
"test_util.go",
"util.go",
"visualization_server.go",
],
importpath = "github.com/kubeflow/pipelines/backend/src/apiserver/server",
visibility = ["//visibility:public"],
Expand Down Expand Up @@ -50,6 +51,7 @@ go_test(
"run_metric_util_test.go",
"run_server_test.go",
"util_test.go",
"visualization_server_test.go",
],
data = glob(["test/**/*"]), # keep
embed = [":go_default_library"],
Expand Down
78 changes: 78 additions & 0 deletions backend/src/apiserver/server/visualization_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package server

import (
"context"
"encoding/json"
"fmt"
"github.com/kubeflow/pipelines/backend/api/go_client"
"github.com/kubeflow/pipelines/backend/src/apiserver/resource"
"github.com/kubeflow/pipelines/backend/src/common/util"
"io/ioutil"
"net/http"
"net/url"
"strings"
)

type VisualizationServer struct {
resourceManager *resource.ResourceManager
serviceURL string
}

func (s *VisualizationServer) CreateVisualization(ctx context.Context, request *go_client.CreateVisualizationRequest) (*go_client.Visualization, error) {
if err := s.validateCreateVisualizationRequest(request); err != nil {
return nil, err
}
body, err := s.generateVisualizationFromRequest(request)
if err != nil {
return nil, err
}
request.Visualization.Html = string(body)
return request.Visualization, nil
}

// validateCreateVisualizationRequest ensures that a go_client.Visualization
// object has valid values.
// It returns an error if a go_client.Visualization object does not have valid
// values.
func (s *VisualizationServer) validateCreateVisualizationRequest(request *go_client.CreateVisualizationRequest) error {
if len(request.Visualization.InputPath) == 0 {
return util.NewInvalidInputError("A visualization requires an InputPath to be provided. Received %s", request.Visualization.InputPath)
}
// Manually set Arguments to empty JSON if nothing is provided. This is done
// because visualizations such as TFDV and TFMA only require an InputPath to
// be provided for a visualization to be generated. If no JSON is provided
// json.Valid will fail without this check as an empty string is provided for
// those visualizations.
if len(request.Visualization.Arguments) == 0 {
request.Visualization.Arguments = "{}"
}
if !json.Valid([]byte(request.Visualization.Arguments)) {
return util.NewInvalidInputError("A visualization requires valid JSON to be provided as Arguments. Received %s", request.Visualization.Arguments)
}
return nil
}

// generateVisualizationFromRequest communicates with the python visualization
// service to generate HTML visualizations from a request.
// It returns the generated HTML as a string and any error that is encountered.
func (s *VisualizationServer) generateVisualizationFromRequest(request *go_client.CreateVisualizationRequest) ([]byte, error) {
visualizationType := strings.ToLower(go_client.Visualization_Type_name[int32(request.Visualization.Type)])
arguments := fmt.Sprintf("--type %s --input_path %s --arguments '%s'", visualizationType, request.Visualization.InputPath, request.Visualization.Arguments)
resp, err := http.PostForm(s.serviceURL, url.Values{"arguments": {arguments}})
if err != nil {
return nil, util.Wrap(err, "Unable to initialize visualization request.")
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf(resp.Status)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, util.Wrap(err, "Unable to parse visualization response.")
}
return body, nil
}

func NewVisualizationServer(resourceManager *resource.ResourceManager) *VisualizationServer {
return &VisualizationServer{resourceManager: resourceManager, serviceURL: "http://visualization-service.kubeflow"}
}
117 changes: 117 additions & 0 deletions backend/src/apiserver/server/visualization_server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package server

import (
"github.com/kubeflow/pipelines/backend/api/go_client"
"github.com/stretchr/testify/assert"
"net/http"
"net/http/httptest"
"testing"
)

func TestValidateCreateVisualizationRequest(t *testing.T) {
clients, manager, _ := initWithExperiment(t)
defer clients.Close()
server := NewVisualizationServer(manager)
visualization := &go_client.Visualization{
Type: go_client.Visualization_ROC_CURVE,
InputPath: "gs://ml-pipeline/roc/data.csv",
Arguments: "{}",
}
request := &go_client.CreateVisualizationRequest{
Visualization: visualization,
}
err := server.validateCreateVisualizationRequest(request)
assert.Nil(t, err)
}

func TestValidateCreateVisualizationRequest_ArgumentsAreEmpty(t *testing.T) {
clients, manager, _ := initWithExperiment(t)
defer clients.Close()
server := NewVisualizationServer(manager)
visualization := &go_client.Visualization{
Type: go_client.Visualization_ROC_CURVE,
InputPath: "gs://ml-pipeline/roc/data.csv",
Arguments: "",
}
request := &go_client.CreateVisualizationRequest{
Visualization: visualization,
}
err := server.validateCreateVisualizationRequest(request)
assert.Nil(t, err)
}

func TestValidateCreateVisualizationRequest_InputPathIsEmpty(t *testing.T) {
clients, manager, _ := initWithExperiment(t)
defer clients.Close()
server := NewVisualizationServer(manager)
visualization := &go_client.Visualization{
Type: go_client.Visualization_ROC_CURVE,
InputPath: "",
Arguments: "{}",
}
request := &go_client.CreateVisualizationRequest{
Visualization: visualization,
}
err := server.validateCreateVisualizationRequest(request)
assert.Contains(t, err.Error(), "A visualization requires an InputPath to be provided. Received")
}

func TestValidateCreateVisualizationRequest_ArgumentsNotValidJSON(t *testing.T) {
clients, manager, _ := initWithExperiment(t)
defer clients.Close()
server := NewVisualizationServer(manager)
visualization := &go_client.Visualization{
Type: go_client.Visualization_ROC_CURVE,
InputPath: "gs://ml-pipeline/roc/data.csv",
Arguments: "{",
}
request := &go_client.CreateVisualizationRequest{
Visualization: visualization,
}
err := server.validateCreateVisualizationRequest(request)
assert.Contains(t, err.Error(), "A visualization requires valid JSON to be provided as Arguments. Received {")
}

func TestGenerateVisualization(t *testing.T) {
clients, manager, _ := initWithExperiment(t)
defer clients.Close()
httpServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
assert.Equal(t, "/", req.URL.String())
rw.Write([]byte("roc_curve"))
}))
defer httpServer.Close()
server := &VisualizationServer{resourceManager: manager, serviceURL: httpServer.URL}
visualization := &go_client.Visualization{
Type: go_client.Visualization_ROC_CURVE,
InputPath: "gs://ml-pipeline/roc/data.csv",
Arguments: "{}",
}
request := &go_client.CreateVisualizationRequest{
Visualization: visualization,
}
body, err := server.generateVisualizationFromRequest(request)
assert.Equal(t, []byte("roc_curve"), body)
assert.Nil(t, err)
}

func TestGenerateVisualization_ServerError(t *testing.T) {
clients, manager, _ := initWithExperiment(t)
defer clients.Close()
httpServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
assert.Equal(t, "/", req.URL.String())
rw.WriteHeader(500)
}))
defer httpServer.Close()
server := &VisualizationServer{resourceManager: manager, serviceURL: httpServer.URL}
visualization := &go_client.Visualization{
Type: go_client.Visualization_ROC_CURVE,
InputPath: "gs://ml-pipeline/roc/data.csv",
Arguments: "{}",
}
request := &go_client.CreateVisualizationRequest{
Visualization: visualization,
}
body, err := server.generateVisualizationFromRequest(request)
assert.Nil(t, body)
assert.Equal(t, "500 Internal Server Error", err.Error())
}
10 changes: 8 additions & 2 deletions backend/src/common/util/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (w *Workflow) ScheduledAtInSecOr0() int64 {
}

func (w *Workflow) FinishedAt() int64 {
if w.Status.FinishedAt.IsZero(){
if w.Status.FinishedAt.IsZero() {
// If workflow is not finished
return 0
}
Expand Down Expand Up @@ -165,7 +165,13 @@ func (w *Workflow) GetWorkflowSpec() *Workflow {
workflow := w.DeepCopy()
workflow.Status = workflowapi.WorkflowStatus{}
workflow.TypeMeta = metav1.TypeMeta{Kind: w.Kind, APIVersion: w.APIVersion}
workflow.ObjectMeta = metav1.ObjectMeta{Name: w.Name, GenerateName: w.GenerateName}
// To prevent collisions, clear name, set GenerateName to first 200 runes of previous name.
nameRunes := []rune(w.Name)
length := len(nameRunes)
if length > 200 {
length = 200
}
workflow.ObjectMeta = metav1.ObjectMeta{GenerateName: string(nameRunes[:length])}
return NewWorkflow(workflow)
}

Expand Down
36 changes: 35 additions & 1 deletion backend/src/common/util/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,41 @@ func TestGetWorkflowSpec(t *testing.T) {

expected := &workflowapi.Workflow{
ObjectMeta: metav1.ObjectMeta{
Name: "WORKFLOW_NAME",
GenerateName: "WORKFLOW_NAME",
},
Spec: workflowapi.WorkflowSpec{
Arguments: workflowapi.Arguments{
Parameters: []workflowapi.Parameter{
{Name: "PARAM", Value: StringPointer("VALUE")},
},
},
},
}

assert.Equal(t, expected, workflow.GetWorkflowSpec().Get())
}

func TestGetWorkflowSpecTruncatesNameIfLongerThan200Runes(t *testing.T) {
workflow := NewWorkflow(&workflowapi.Workflow{
ObjectMeta: metav1.ObjectMeta{
Name: "THIS_NAME_IS_GREATER_THAN_200_CHARACTERS_AND_WILL_BE_TRUNCATED_AFTER_THE_X_OOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOXZZZZZZZZ",
Labels: map[string]string{"key": "value"},
},
Spec: workflowapi.WorkflowSpec{
Arguments: workflowapi.Arguments{
Parameters: []workflowapi.Parameter{
{Name: "PARAM", Value: StringPointer("VALUE")},
},
},
},
Status: workflowapi.WorkflowStatus{
Message: "I AM A MESSAGE",
},
})

expected := &workflowapi.Workflow{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "THIS_NAME_IS_GREATER_THAN_200_CHARACTERS_AND_WILL_BE_TRUNCATED_AFTER_THE_X_OOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOX",
},
Spec: workflowapi.WorkflowSpec{
Arguments: workflowapi.Arguments{
Expand Down
2 changes: 1 addition & 1 deletion component_sdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from setuptools import setup

PACKAGE_NAME = "kfp-component"
VERSION = '0.1.24'
VERSION = '0.1.25'

setup(
name=PACKAGE_NAME,
Expand Down
2 changes: 1 addition & 1 deletion components/dataflow/predict/component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ outputs:
- {name: Predictions dir, type: GCSPath, description: 'GCS or local directory.'} #Will contain prediction_results-* and schema.json files; TODO: Split outputs and replace dir with single file # type: {GCSPath: {path_type: Directory}}
implementation:
container:
image: gcr.io/ml-pipeline/ml-pipeline-dataflow-tf-predict:ac833a084b32324b56ca56e9109e05cde02816a4
image: gcr.io/ml-pipeline/ml-pipeline-dataflow-tf-predict:fe639f41661d8e17fcda64ff8242127620b80ba0
command: [python2, /ml/predict.py]
args: [
--data, {inputValue: Data file pattern},
Expand Down
2 changes: 1 addition & 1 deletion components/dataflow/tfdv/component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ outputs:
- {name: Validation result, type: String, description: Indicates whether anomalies were detected or not.}
implementation:
container:
image: gcr.io/ml-pipeline/ml-pipeline-dataflow-tfdv:ac833a084b32324b56ca56e9109e05cde02816a4
image: gcr.io/ml-pipeline/ml-pipeline-dataflow-tfdv:fe639f41661d8e17fcda64ff8242127620b80ba0
command: [python2, /ml/validate.py]
args: [
--csv-data-for-inference, {inputValue: Inference data},
Expand Down
2 changes: 1 addition & 1 deletion components/dataflow/tfma/component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ outputs:
- {name: Analysis results dir, type: GCSPath, description: GCS or local directory where the analysis results should were written.} # type: {GCSPath: {path_type: Directory}}
implementation:
container:
image: gcr.io/ml-pipeline/ml-pipeline-dataflow-tfma:ac833a084b32324b56ca56e9109e05cde02816a4
image: gcr.io/ml-pipeline/ml-pipeline-dataflow-tfma:fe639f41661d8e17fcda64ff8242127620b80ba0
command: [python2, /ml/model_analysis.py]
args: [
--model, {inputValue: Model},
Expand Down
2 changes: 1 addition & 1 deletion components/dataflow/tft/component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ outputs:
- {name: Transformed data dir, type: GCSPath} # type: {GCSPath: {path_type: Directory}}
implementation:
container:
image: gcr.io/ml-pipeline/ml-pipeline-dataflow-tft:ac833a084b32324b56ca56e9109e05cde02816a4
image: gcr.io/ml-pipeline/ml-pipeline-dataflow-tft:fe639f41661d8e17fcda64ff8242127620b80ba0
command: [python2, /ml/transform.py]
args: [
--train, {inputValue: Training data file pattern},
Expand Down
2 changes: 1 addition & 1 deletion components/gcp/bigquery/query/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.14/kfp.tar
import kfp.components as comp

bigquery_query_op = comp.load_component_from_url(
'https://raw.githubusercontent.com/kubeflow/pipelines/1f65a564d4d44fa5a0dc6c59929ca2211ebb3d1c/components/gcp/bigquery/query/component.yaml')
'https://raw.githubusercontent.com/kubeflow/pipelines/0b07e456b1f319d8b7a7301274f55c00fda9f537/components/gcp/bigquery/query/component.yaml')
help(bigquery_query_op)
```

Expand Down
2 changes: 1 addition & 1 deletion components/gcp/bigquery/query/component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ outputs:
type: GCSPath
implementation:
container:
image: gcr.io/ml-pipeline/ml-pipeline-gcp:ac833a084b32324b56ca56e9109e05cde02816a4
image: gcr.io/ml-pipeline/ml-pipeline-gcp:fe639f41661d8e17fcda64ff8242127620b80ba0
args: [
kfp_component.google.bigquery, query,
--query, {inputValue: query},
Expand Down
2 changes: 1 addition & 1 deletion components/gcp/bigquery/query/sample.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
"import kfp.components as comp\n",
"\n",
"bigquery_query_op = comp.load_component_from_url(\n",
" 'https://raw.githubusercontent.com/kubeflow/pipelines/1f65a564d4d44fa5a0dc6c59929ca2211ebb3d1c/components/gcp/bigquery/query/component.yaml')\n",
" 'https://raw.githubusercontent.com/kubeflow/pipelines/0b07e456b1f319d8b7a7301274f55c00fda9f537/components/gcp/bigquery/query/component.yaml')\n",
"help(bigquery_query_op)"
]
},
Expand Down
2 changes: 1 addition & 1 deletion components/gcp/dataflow/launch_python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.14/kfp.tar
import kfp.components as comp

dataflow_python_op = comp.load_component_from_url(
'https://raw.githubusercontent.com/kubeflow/pipelines/1f65a564d4d44fa5a0dc6c59929ca2211ebb3d1c/components/gcp/dataflow/launch_python/component.yaml')
'https://raw.githubusercontent.com/kubeflow/pipelines/0b07e456b1f319d8b7a7301274f55c00fda9f537/components/gcp/dataflow/launch_python/component.yaml')
help(dataflow_python_op)
```

Expand Down
2 changes: 1 addition & 1 deletion components/gcp/dataflow/launch_python/component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ outputs:
type: String
implementation:
container:
image: gcr.io/ml-pipeline/ml-pipeline-gcp:ac833a084b32324b56ca56e9109e05cde02816a4
image: gcr.io/ml-pipeline/ml-pipeline-gcp:fe639f41661d8e17fcda64ff8242127620b80ba0
args: [
kfp_component.google.dataflow, launch_python,
--python_file_path, {inputValue: python_file_path},
Expand Down
2 changes: 1 addition & 1 deletion components/gcp/dataflow/launch_python/sample.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
"import kfp.components as comp\n",
"\n",
"dataflow_python_op = comp.load_component_from_url(\n",
" 'https://raw.githubusercontent.com/kubeflow/pipelines/1f65a564d4d44fa5a0dc6c59929ca2211ebb3d1c/components/gcp/dataflow/launch_python/component.yaml')\n",
" 'https://raw.githubusercontent.com/kubeflow/pipelines/0b07e456b1f319d8b7a7301274f55c00fda9f537/components/gcp/dataflow/launch_python/component.yaml')\n",
"help(dataflow_python_op)"
]
},
Expand Down
Loading

0 comments on commit 12bfc6c

Please sign in to comment.