Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable pipeline packages with multiple files #939

Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
name: whalesay
inputs:
- name: param1
- name: param2
implementation:
container:
image: docker/whalesay:latest
command: [cowsay]
args: [{inputValue: param1}, {inputValue: param2}]
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: arguments-parameters-
spec:
entrypoint: whalesay
arguments:
parameters:
- name: param1
value: hello
- name: param2

templates:
- name: whalesay
inputs:
parameters:
- name: param1
- name: param2
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["{{inputs.parameters.param1}}-{{inputs.parameters.param2}}"]
Binary file not shown.
Binary file not shown.
50 changes: 45 additions & 5 deletions backend/src/apiserver/server/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ func isYamlFile(fileName string) bool {
return strings.HasSuffix(fileName, ".yaml") || strings.HasSuffix(fileName, ".yml")
}

func isPipelineYamlFile(fileName string) bool {
return fileName == "pipeline.yaml"
}

func isZipFile(compressedFile []byte) bool {
return len(compressedFile) > 2 && compressedFile[0] == '\x50' && compressedFile[1] == '\x4B' //Signature of zip file is "PK"
}
Expand All @@ -75,13 +79,38 @@ func DecompressPipelineTarball(compressedFile []byte) ([]byte, error) {
if err != nil {
return nil, util.NewInvalidInputErrorWithDetails(err, "Error extracting pipeline from the tarball file. Not a valid tarball file.")
}
// New behavior: searching for the "pipeline.yaml" file.
tarReader := tar.NewReader(gzipReader)
for {
header, err := tarReader.Next()
if err == io.EOF {
tarReader = nil
break
}
if err != nil {
return nil, util.NewInvalidInputErrorWithDetails(err, "Error extracting pipeline from the tarball file. Not a valid tarball file.")
}
if isPipelineYamlFile(header.Name) {
//Found the pipeline file.
break
}
}
// Old behavior - taking the first file in the archive
if tarReader == nil {
// Resetting the reader
gzipReader, err = gzip.NewReader(bytes.NewReader(compressedFile))
if err != nil {
return nil, util.NewInvalidInputErrorWithDetails(err, "Error extracting pipeline from the tarball file. Not a valid tarball file.")
}
tarReader = tar.NewReader(gzipReader)
}

header, err := tarReader.Next()
if err != nil || header == nil {
if err != nil {
return nil, util.NewInvalidInputErrorWithDetails(err, "Error extracting pipeline from the tarball file. Not a valid tarball file.")
}
if !isYamlFile(header.Name) {
return nil, util.NewInvalidInputError("Error extracting pipeline from the tarball file. Expecting a YAML file inside the tarball. Got: %v", header.Name)
return nil, util.NewInvalidInputError("Error extracting pipeline from the tarball file. Expecting a pipeline.yaml file inside the tarball. Got: %v", header.Name)
}
decompressedFile, err := ioutil.ReadAll(tarReader)
if err != nil {
Expand All @@ -98,10 +127,21 @@ func DecompressPipelineZip(compressedFile []byte) ([]byte, error) {
if len(reader.File) < 1 {
return nil, util.NewInvalidInputErrorWithDetails(err, "Error extracting pipeline from the zip file. Empty zip file.")
}
if !isYamlFile(reader.File[0].Name) {
return nil, util.NewInvalidInputError("Error extracting pipeline from the zip file. Expecting a YAML file inside the zip. Got: %v", reader.File[0].Name)

// Old behavior - taking the first file in the archive
pipelineYamlFile := reader.File[0]
// New behavior: searching for the "pipeline.yaml" file.
for _, file := range reader.File {
if isPipelineYamlFile(file.Name) {
pipelineYamlFile = file
break
}
}

if !isYamlFile(pipelineYamlFile.Name) {
return nil, util.NewInvalidInputError("Error extracting pipeline from the zip file. Expecting a pipeline.yaml file inside the zip. Got: %v", pipelineYamlFile.Name)
}
rc, err := reader.File[0].Open()
rc, err := pipelineYamlFile.Open()
if err != nil {
return nil, util.NewInvalidInputErrorWithDetails(err, "Error extracting pipeline from the zip file. Failed to read the content.")
}
Expand Down
18 changes: 18 additions & 0 deletions backend/src/apiserver/server/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,15 @@ func TestReadPipelineFile_Zip_AnyExtension(t *testing.T) {
assert.Equal(t, expectedPipelineFile, pipelineFile)
}

func TestReadPipelineFile_MultifileZip(t *testing.T) {
file, _ := os.Open("test/pipeline_plus_component/pipeline_plus_component.zip")
pipelineFile, err := ReadPipelineFile("pipeline_plus_component.ai-hub-package", file, MaxFileLength)
assert.Nil(t, err)

expectedPipelineFile, _ := ioutil.ReadFile("test/pipeline_plus_component/pipeline.yaml")
assert.Equal(t, expectedPipelineFile, pipelineFile)
}

func TestReadPipelineFile_Tarball(t *testing.T) {
file, _ := os.Open("test/arguments_tarball/arguments.tar.gz")
pipelineFile, err := ReadPipelineFile("arguments.tar.gz", file, MaxFileLength)
Expand All @@ -166,6 +175,15 @@ func TestReadPipelineFile_Tarball_AnyExtension(t *testing.T) {
assert.Equal(t, expectedPipelineFile, pipelineFile)
}

func TestReadPipelineFile_MultifileTarball(t *testing.T) {
file, _ := os.Open("test/pipeline_plus_component/pipeline_plus_component.tar.gz")
pipelineFile, err := ReadPipelineFile("pipeline_plus_component.ai-hub-package", file, MaxFileLength)
assert.Nil(t, err)

expectedPipelineFile, _ := ioutil.ReadFile("test/pipeline_plus_component/pipeline.yaml")
assert.Equal(t, expectedPipelineFile, pipelineFile)
}

func TestReadPipelineFile_UnknownFileFormat(t *testing.T) {
file, _ := os.Open("test/unknown_format.foo")
_, err := ReadPipelineFile("unknown_format.foo", file, MaxFileLength)
Expand Down
12 changes: 6 additions & 6 deletions backend/test/integration/pipeline_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,17 @@ func (s *PipelineApiTest) TestPipelineAPI() {
/* ---------- Upload pipelines zip ---------- */
time.Sleep(1 * time.Second)
argumentUploadPipeline, err := s.pipelineUploadClient.UploadFile(
"../resources/zip-arguments.zip", &uploadParams.UploadPipelineParams{Name: util.StringPointer("zip-arguments-parameters")})
"../resources/arguments.pipeline.zip", &uploadParams.UploadPipelineParams{Name: util.StringPointer("zip-arguments-parameters")})
assert.Nil(t, err)
assert.Equal(t, "zip-arguments-parameters", argumentUploadPipeline.Name)

/* ---------- Import pipeline tarball by URL ---------- */
time.Sleep(1 * time.Second)
argumentUrlPipeline, err := s.pipelineClient.Create(&params.CreatePipelineParams{
Body: &pipeline_model.APIPipeline{URL: &pipeline_model.APIURL{
PipelineURL: "https://storage.googleapis.com/ml-pipeline-dataset/arguments.zip"}}})
PipelineURL: "https://storage.googleapis.com/ml-pipeline-dataset/arguments.pipeline.zip"}}})
assert.Nil(t, err)
assert.Equal(t, "arguments.zip", argumentUrlPipeline.Name)
assert.Equal(t, "arguments.pipeline.zip", argumentUrlPipeline.Name)

/* ---------- Verify list pipeline works ---------- */
pipelines, totalSize, _, err := s.pipelineClient.List(params.NewListPipelinesParams())
Expand All @@ -111,7 +111,7 @@ func (s *PipelineApiTest) TestPipelineAPI() {
assert.Equal(t, 2, len(listFirstPagePipelines))
assert.Equal(t, 4, totalSize)
assert.Equal(t, "arguments-parameters.yaml", listFirstPagePipelines[0].Name)
assert.Equal(t, "arguments.zip", listFirstPagePipelines[1].Name)
assert.Equal(t, "arguments.pipeline.zip", listFirstPagePipelines[1].Name)
assert.NotEmpty(t, nextPageToken)

listSecondPagePipelines, totalSize, nextPageToken, err := s.pipelineClient.List(
Expand Down Expand Up @@ -139,7 +139,7 @@ func (s *PipelineApiTest) TestPipelineAPI() {
assert.Equal(t, 2, len(listSecondPagePipelines))
assert.Equal(t, 4, totalSize)
assert.Equal(t, "zip-arguments-parameters", listSecondPagePipelines[0].Name)
assert.Equal(t, "arguments.zip", listSecondPagePipelines[1].Name)
assert.Equal(t, "arguments.pipeline.zip", listSecondPagePipelines[1].Name)
assert.Empty(t, nextPageToken)

/* ---------- List pipelines sort by unsupported description field. Should fail. ---------- */
Expand All @@ -162,7 +162,7 @@ func (s *PipelineApiTest) TestPipelineAPI() {
assert.Nil(t, err)
assert.Equal(t, 2, len(listSecondPagePipelines))
assert.Equal(t, 4, totalSize)
assert.Equal(t, "arguments.zip", listSecondPagePipelines[0].Name)
assert.Equal(t, "arguments.pipeline.zip", listSecondPagePipelines[0].Name)
assert.Equal(t, "arguments-parameters.yaml", listSecondPagePipelines[1].Name)
assert.Empty(t, nextPageToken)

Expand Down
Binary file added backend/test/resources/arguments.pipeline.zip
Binary file not shown.