Skip to content

Commit

Permalink
support .zip pipeline package (kubeflow#874)
Browse files Browse the repository at this point in the history
* support zip

* update integration test

* debug

* comments

* fix

* Update .cloudbuild.yaml

* Update .release.cloudbuild.yaml

* Update .travis.yml
  • Loading branch information
IronPan authored and k8s-ci-robot committed Mar 1, 2019
1 parent cec04bf commit 423c477
Show file tree
Hide file tree
Showing 11 changed files with 94 additions and 54 deletions.

This file was deleted.

Binary file not shown.
Empty file.
1 change: 1 addition & 0 deletions backend/src/apiserver/server/test/malformated_zip.zip
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
I pretended to be a tarball
Empty file.
Binary file not shown.
53 changes: 44 additions & 9 deletions backend/src/apiserver/server/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package server

import (
"archive/tar"
"archive/zip"
"bufio"
"bytes"
"compress/gzip"
Expand Down Expand Up @@ -54,13 +55,21 @@ func loadFile(fileReader io.Reader, maxFileLength int) ([]byte, error) {
}

func isSupportedPipelineFormat(fileName string) bool {
return isYamlFile(fileName) || strings.HasSuffix(fileName, ".tar.gz")
return isYamlFile(fileName) || isCompressedTarballFile(fileName) || isZipFile(fileName)
}

func isYamlFile(fileName string) bool {
return strings.HasSuffix(fileName, ".yaml") || strings.HasSuffix(fileName, ".yml")
}

func isZipFile(fileName string) bool {
return strings.HasSuffix(fileName, ".zip")
}

func isCompressedTarballFile(fileName string) bool {
return strings.HasSuffix(fileName, ".tar.gz")
}

func DecompressPipelineTarball(compressedFile []byte) ([]byte, error) {
gzipReader, err := gzip.NewReader(bytes.NewReader(compressedFile))
if err != nil {
Expand All @@ -81,9 +90,31 @@ func DecompressPipelineTarball(compressedFile []byte) ([]byte, error) {
return decompressedFile, err
}

func DecompressPipelineZip(compressedFile []byte) ([]byte, error) {
reader, err := zip.NewReader(bytes.NewReader(compressedFile), int64(len(compressedFile)))
if err != nil {
return nil, util.NewInvalidInputErrorWithDetails(err, "Error extracting pipeline from the zip file. Not a valid zip file.")
}
if len(reader.File) < 1 {
return nil, util.NewInvalidInputErrorWithDetails(err, "Error extracting pipeline from the zip file. Empty zip file.")
}
if !isYamlFile(reader.File[0].Name) {
return nil, util.NewInvalidInputError("Error extracting pipeline from the zip file. Expecting a YAML file inside the zip. Got: %v", reader.File[0].Name)
}
rc, err := reader.File[0].Open()
if err != nil {
return nil, util.NewInvalidInputErrorWithDetails(err, "Error extracting pipeline from the zip file. Failed to read the content.")
}
decompressedFile, err := ioutil.ReadAll(rc)
if err != nil {
return nil, util.NewInvalidInputErrorWithDetails(err, "Error reading pipeline YAML from the zip file.")
}
return decompressedFile, err
}

func ReadPipelineFile(fileName string, fileReader io.Reader, maxFileLength int) ([]byte, error) {
if !isSupportedPipelineFormat(fileName) {
return nil, util.NewInvalidInputError("Unexpected pipeline file format. Support .tar.gz or YAML.")
return nil, util.NewInvalidInputError("Unexpected pipeline file format. Support formats are .zip, .tar.gz or YAML.")
}

// Read file into size limited byte array.
Expand All @@ -92,17 +123,21 @@ func ReadPipelineFile(fileName string, fileReader io.Reader, maxFileLength int)
return nil, util.Wrap(err, "Error read pipeline file.")
}

// Return if file is YAML
if isYamlFile(fileName) {
return pipelineFileBytes, nil
var processedFile []byte
switch {
case isYamlFile(fileName):
processedFile = pipelineFileBytes
case isZipFile(fileName):
processedFile, err = DecompressPipelineZip(pipelineFileBytes)
case isCompressedTarballFile(fileName):
processedFile, err = DecompressPipelineTarball(pipelineFileBytes)
default:
return nil, util.NewInvalidInputError("Unexpected pipeline file format. Support .zip, .tar.gz or YAML.")
}

// Decompress if file is tarball
decompressedFile, err := DecompressPipelineTarball(pipelineFileBytes)
if err != nil {
return nil, util.Wrap(err, "Error decompress the pipeline file")
}
return decompressedFile, nil
return processedFile, nil
}

func printParameters(params []*api.Parameter) string {
Expand Down
43 changes: 41 additions & 2 deletions backend/src/apiserver/server/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestDecompressPipelineTarball(t *testing.T) {
pipelineFile, err := DecompressPipelineTarball(tarballByte)
assert.Nil(t, err)

expectedPipelineFile, _ := ioutil.ReadFile("test/arguments_tarball/arguments-parameters.yaml")
expectedPipelineFile, _ := ioutil.ReadFile("test/arguments-parameters.yaml")
assert.Equal(t, expectedPipelineFile, pipelineFile)
}

Expand All @@ -84,6 +84,36 @@ func TestDecompressPipelineTarball_EmptyTarball(t *testing.T) {
assert.Contains(t, err.Error(), "Not a valid tarball file")
}

func TestDecompressPipelineZip(t *testing.T) {
zipByte, _ := ioutil.ReadFile("test/arguments_zip/arguments-parameters.zip")
pipelineFile, err := DecompressPipelineZip(zipByte)
assert.Nil(t, err)

expectedPipelineFile, _ := ioutil.ReadFile("test/arguments-parameters.yaml")
assert.Equal(t, expectedPipelineFile, pipelineFile)
}

func TestDecompressPipelineZip_MalformattedZip(t *testing.T) {
zipByte, _ := ioutil.ReadFile("test/malformatted_zip.zip")
_, err := DecompressPipelineZip(zipByte)
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "Not a valid zip file")
}

func TestDecompressPipelineZip_NonYamlZip(t *testing.T) {
zipByte, _ := ioutil.ReadFile("test/non_yaml_zip/non_yaml_file.zip")
_, err := DecompressPipelineZip(zipByte)
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "Expecting a YAML file inside the zip")
}

func TestDecompressPipelineZip_EmptyZip(t *testing.T) {
zipByte, _ := ioutil.ReadFile("test/empty_tarball/empty.zip")
_, err := DecompressPipelineZip(zipByte)
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "Not a valid zip file")
}

func TestReadPipelineFile_YAML(t *testing.T) {
file, _ := os.Open("test/arguments-parameters.yaml")
fileBytes, err := ReadPipelineFile("arguments-parameters.yaml", file, MaxFileLength)
Expand All @@ -93,12 +123,21 @@ func TestReadPipelineFile_YAML(t *testing.T) {
assert.Equal(t, expectedFileBytes, fileBytes)
}

func TestReadPipelineFile_Zip(t *testing.T) {
file, _ := os.Open("test/arguments_zip/arguments-parameters.zip")
pipelineFile, err := ReadPipelineFile("arguments-parameters.zip", file, MaxFileLength)
assert.Nil(t, err)

expectedPipelineFile, _ := ioutil.ReadFile("test/arguments-parameters.yaml")
assert.Equal(t, expectedPipelineFile, pipelineFile)
}

func TestReadPipelineFile_Tarball(t *testing.T) {
file, _ := os.Open("test/arguments_tarball/arguments.tar.gz")
pipelineFile, err := ReadPipelineFile("arguments.tar.gz", file, MaxFileLength)
assert.Nil(t, err)

expectedPipelineFile, _ := ioutil.ReadFile("test/arguments_tarball/arguments-parameters.yaml")
expectedPipelineFile, _ := ioutil.ReadFile("test/arguments-parameters.yaml")
assert.Equal(t, expectedPipelineFile, pipelineFile)
}

Expand Down
15 changes: 8 additions & 7 deletions backend/test/pipeline_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,20 @@ func (s *PipelineApiTest) TestPipelineAPI() {
assert.Nil(t, err)
assert.Equal(t, "sequential", sequentialPipeline.Name)

/* ---------- Upload pipelines tarball ---------- */
/* ---------- Upload pipelines zip ---------- */
time.Sleep(1 * time.Second)
argumentUploadPipeline, err := s.pipelineUploadClient.UploadFile(
"resources/zip-arguments.tar.gz", &uploadParams.UploadPipelineParams{Name: util.StringPointer("zip-arguments-parameters")})
"resources/zip-arguments.zip", &uploadParams.UploadPipelineParams{Name: util.StringPointer("zip-arguments-parameters")})
assert.Nil(t, err)
assert.Equal(t, "zip-arguments-parameters", argumentUploadPipeline.Name)

/* ---------- Import pipeline tarball by URL ---------- */
time.Sleep(1 * time.Second)
argumentUrlPipeline, err := s.pipelineClient.Create(&params.CreatePipelineParams{
Body: &pipeline_model.APIPipeline{URL: &pipeline_model.APIURL{
PipelineURL: "https://storage.googleapis.com/ml-pipeline-dataset/arguments.tar.gz"}}})
PipelineURL: "https://storage.googleapis.com/ml-pipeline-dataset/arguments.zip"}}})
assert.Nil(t, err)
assert.Equal(t, "arguments.tar.gz", argumentUrlPipeline.Name)
assert.Equal(t, "arguments.zip", argumentUrlPipeline.Name)

/* ---------- Verify list pipeline works ---------- */
pipelines, totalSize, _, err := s.pipelineClient.List(params.NewListPipelinesParams())
Expand All @@ -108,7 +109,7 @@ func (s *PipelineApiTest) TestPipelineAPI() {
assert.Equal(t, 2, len(listFirstPagePipelines))
assert.Equal(t, 4, totalSize)
assert.Equal(t, "arguments-parameters.yaml", listFirstPagePipelines[0].Name)
assert.Equal(t, "arguments.tar.gz", listFirstPagePipelines[1].Name)
assert.Equal(t, "arguments.zip", listFirstPagePipelines[1].Name)
assert.NotEmpty(t, nextPageToken)

listSecondPagePipelines, totalSize, nextPageToken, err := s.pipelineClient.List(
Expand Down Expand Up @@ -136,7 +137,7 @@ func (s *PipelineApiTest) TestPipelineAPI() {
assert.Equal(t, 2, len(listSecondPagePipelines))
assert.Equal(t, 4, totalSize)
assert.Equal(t, "zip-arguments-parameters", listSecondPagePipelines[0].Name)
assert.Equal(t, "arguments.tar.gz", listSecondPagePipelines[1].Name)
assert.Equal(t, "arguments.zip", listSecondPagePipelines[1].Name)
assert.Empty(t, nextPageToken)

/* ---------- List pipelines sort by unsupported description field. Should fail. ---------- */
Expand All @@ -159,7 +160,7 @@ func (s *PipelineApiTest) TestPipelineAPI() {
assert.Nil(t, err)
assert.Equal(t, 2, len(listSecondPagePipelines))
assert.Equal(t, 4, totalSize)
assert.Equal(t, "arguments.tar.gz", listSecondPagePipelines[0].Name)
assert.Equal(t, "arguments.zip", listSecondPagePipelines[0].Name)
assert.Equal(t, "arguments-parameters.yaml", listSecondPagePipelines[1].Name)
assert.Empty(t, nextPageToken)

Expand Down
Binary file removed backend/test/resources/zip-arguments.tar.gz
Binary file not shown.
Binary file added backend/test/resources/zip-arguments.zip
Binary file not shown.

0 comments on commit 423c477

Please sign in to comment.