Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support .zip pipeline package #874

Merged
merged 8 commits into from
Mar 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Binary file not shown.
Empty file.
1 change: 1 addition & 0 deletions backend/src/apiserver/server/test/malformated_zip.zip
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
I pretended to be a tarball
Empty file.
Binary file not shown.
53 changes: 44 additions & 9 deletions backend/src/apiserver/server/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package server

import (
"archive/tar"
"archive/zip"
"bufio"
"bytes"
"compress/gzip"
Expand Down Expand Up @@ -54,13 +55,21 @@ func loadFile(fileReader io.Reader, maxFileLength int) ([]byte, error) {
}

func isSupportedPipelineFormat(fileName string) bool {
return isYamlFile(fileName) || strings.HasSuffix(fileName, ".tar.gz")
return isYamlFile(fileName) || isCompressedTarballFile(fileName) || isZipFile(fileName)
}

func isYamlFile(fileName string) bool {
return strings.HasSuffix(fileName, ".yaml") || strings.HasSuffix(fileName, ".yml")
}

func isZipFile(fileName string) bool {
return strings.HasSuffix(fileName, ".zip")
}

func isCompressedTarballFile(fileName string) bool {
return strings.HasSuffix(fileName, ".tar.gz")
}

func DecompressPipelineTarball(compressedFile []byte) ([]byte, error) {
gzipReader, err := gzip.NewReader(bytes.NewReader(compressedFile))
if err != nil {
Expand All @@ -81,9 +90,31 @@ func DecompressPipelineTarball(compressedFile []byte) ([]byte, error) {
return decompressedFile, err
}

func DecompressPipelineZip(compressedFile []byte) ([]byte, error) {
reader, err := zip.NewReader(bytes.NewReader(compressedFile), int64(len(compressedFile)))
if err != nil {
return nil, util.NewInvalidInputErrorWithDetails(err, "Error extracting pipeline from the zip file. Not a valid zip file.")
}
if len(reader.File) < 1 {
return nil, util.NewInvalidInputErrorWithDetails(err, "Error extracting pipeline from the zip file. Empty zip file.")
}
if !isYamlFile(reader.File[0].Name) {
return nil, util.NewInvalidInputError("Error extracting pipeline from the zip file. Expecting a YAML file inside the zip. Got: %v", reader.File[0].Name)
}
rc, err := reader.File[0].Open()
if err != nil {
return nil, util.NewInvalidInputErrorWithDetails(err, "Error extracting pipeline from the zip file. Failed to read the content.")
}
decompressedFile, err := ioutil.ReadAll(rc)
if err != nil {
return nil, util.NewInvalidInputErrorWithDetails(err, "Error reading pipeline YAML from the zip file.")
}
return decompressedFile, err
}

func ReadPipelineFile(fileName string, fileReader io.Reader, maxFileLength int) ([]byte, error) {
if !isSupportedPipelineFormat(fileName) {
return nil, util.NewInvalidInputError("Unexpected pipeline file format. Support .tar.gz or YAML.")
return nil, util.NewInvalidInputError("Unexpected pipeline file format. Support formats are .zip, .tar.gz or YAML.")
}

// Read file into size limited byte array.
Expand All @@ -92,17 +123,21 @@ func ReadPipelineFile(fileName string, fileReader io.Reader, maxFileLength int)
return nil, util.Wrap(err, "Error read pipeline file.")
}

// Return if file is YAML
if isYamlFile(fileName) {
return pipelineFileBytes, nil
var processedFile []byte
switch {
case isYamlFile(fileName):
processedFile = pipelineFileBytes
case isZipFile(fileName):
processedFile, err = DecompressPipelineZip(pipelineFileBytes)
case isCompressedTarballFile(fileName):
processedFile, err = DecompressPipelineTarball(pipelineFileBytes)
default:
return nil, util.NewInvalidInputError("Unexpected pipeline file format. Support .zip, .tar.gz or YAML.")
}

// Decompress if file is tarball
decompressedFile, err := DecompressPipelineTarball(pipelineFileBytes)
if err != nil {
return nil, util.Wrap(err, "Error decompress the pipeline file")
}
return decompressedFile, nil
return processedFile, nil
}

func printParameters(params []*api.Parameter) string {
Expand Down
43 changes: 41 additions & 2 deletions backend/src/apiserver/server/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestDecompressPipelineTarball(t *testing.T) {
pipelineFile, err := DecompressPipelineTarball(tarballByte)
assert.Nil(t, err)

expectedPipelineFile, _ := ioutil.ReadFile("test/arguments_tarball/arguments-parameters.yaml")
expectedPipelineFile, _ := ioutil.ReadFile("test/arguments-parameters.yaml")
assert.Equal(t, expectedPipelineFile, pipelineFile)
}

Expand All @@ -84,6 +84,36 @@ func TestDecompressPipelineTarball_EmptyTarball(t *testing.T) {
assert.Contains(t, err.Error(), "Not a valid tarball file")
}

func TestDecompressPipelineZip(t *testing.T) {
zipByte, _ := ioutil.ReadFile("test/arguments_zip/arguments-parameters.zip")
pipelineFile, err := DecompressPipelineZip(zipByte)
assert.Nil(t, err)

expectedPipelineFile, _ := ioutil.ReadFile("test/arguments-parameters.yaml")
assert.Equal(t, expectedPipelineFile, pipelineFile)
}

func TestDecompressPipelineZip_MalformattedZip(t *testing.T) {
zipByte, _ := ioutil.ReadFile("test/malformatted_zip.zip")
_, err := DecompressPipelineZip(zipByte)
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "Not a valid zip file")
}

func TestDecompressPipelineZip_NonYamlZip(t *testing.T) {
zipByte, _ := ioutil.ReadFile("test/non_yaml_zip/non_yaml_file.zip")
_, err := DecompressPipelineZip(zipByte)
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "Expecting a YAML file inside the zip")
}

func TestDecompressPipelineZip_EmptyZip(t *testing.T) {
zipByte, _ := ioutil.ReadFile("test/empty_tarball/empty.zip")
_, err := DecompressPipelineZip(zipByte)
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "Not a valid zip file")
}

func TestReadPipelineFile_YAML(t *testing.T) {
file, _ := os.Open("test/arguments-parameters.yaml")
fileBytes, err := ReadPipelineFile("arguments-parameters.yaml", file, MaxFileLength)
Expand All @@ -93,12 +123,21 @@ func TestReadPipelineFile_YAML(t *testing.T) {
assert.Equal(t, expectedFileBytes, fileBytes)
}

func TestReadPipelineFile_Zip(t *testing.T) {
file, _ := os.Open("test/arguments_zip/arguments-parameters.zip")
pipelineFile, err := ReadPipelineFile("arguments-parameters.zip", file, MaxFileLength)
assert.Nil(t, err)

expectedPipelineFile, _ := ioutil.ReadFile("test/arguments-parameters.yaml")
assert.Equal(t, expectedPipelineFile, pipelineFile)
}

func TestReadPipelineFile_Tarball(t *testing.T) {
file, _ := os.Open("test/arguments_tarball/arguments.tar.gz")
pipelineFile, err := ReadPipelineFile("arguments.tar.gz", file, MaxFileLength)
assert.Nil(t, err)

expectedPipelineFile, _ := ioutil.ReadFile("test/arguments_tarball/arguments-parameters.yaml")
expectedPipelineFile, _ := ioutil.ReadFile("test/arguments-parameters.yaml")
assert.Equal(t, expectedPipelineFile, pipelineFile)
}

Expand Down
15 changes: 8 additions & 7 deletions backend/test/pipeline_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,20 @@ func (s *PipelineApiTest) TestPipelineAPI() {
assert.Nil(t, err)
assert.Equal(t, "sequential", sequentialPipeline.Name)

/* ---------- Upload pipelines tarball ---------- */
/* ---------- Upload pipelines zip ---------- */
time.Sleep(1 * time.Second)
argumentUploadPipeline, err := s.pipelineUploadClient.UploadFile(
"resources/zip-arguments.tar.gz", &uploadParams.UploadPipelineParams{Name: util.StringPointer("zip-arguments-parameters")})
"resources/zip-arguments.zip", &uploadParams.UploadPipelineParams{Name: util.StringPointer("zip-arguments-parameters")})
assert.Nil(t, err)
assert.Equal(t, "zip-arguments-parameters", argumentUploadPipeline.Name)

/* ---------- Import pipeline tarball by URL ---------- */
time.Sleep(1 * time.Second)
argumentUrlPipeline, err := s.pipelineClient.Create(&params.CreatePipelineParams{
Body: &pipeline_model.APIPipeline{URL: &pipeline_model.APIURL{
PipelineURL: "https://storage.googleapis.com/ml-pipeline-dataset/arguments.tar.gz"}}})
PipelineURL: "https://storage.googleapis.com/ml-pipeline-dataset/arguments.zip"}}})
assert.Nil(t, err)
assert.Equal(t, "arguments.tar.gz", argumentUrlPipeline.Name)
assert.Equal(t, "arguments.zip", argumentUrlPipeline.Name)

/* ---------- Verify list pipeline works ---------- */
pipelines, totalSize, _, err := s.pipelineClient.List(params.NewListPipelinesParams())
Expand All @@ -108,7 +109,7 @@ func (s *PipelineApiTest) TestPipelineAPI() {
assert.Equal(t, 2, len(listFirstPagePipelines))
assert.Equal(t, 4, totalSize)
assert.Equal(t, "arguments-parameters.yaml", listFirstPagePipelines[0].Name)
assert.Equal(t, "arguments.tar.gz", listFirstPagePipelines[1].Name)
assert.Equal(t, "arguments.zip", listFirstPagePipelines[1].Name)
assert.NotEmpty(t, nextPageToken)

listSecondPagePipelines, totalSize, nextPageToken, err := s.pipelineClient.List(
Expand Down Expand Up @@ -136,7 +137,7 @@ func (s *PipelineApiTest) TestPipelineAPI() {
assert.Equal(t, 2, len(listSecondPagePipelines))
assert.Equal(t, 4, totalSize)
assert.Equal(t, "zip-arguments-parameters", listSecondPagePipelines[0].Name)
assert.Equal(t, "arguments.tar.gz", listSecondPagePipelines[1].Name)
assert.Equal(t, "arguments.zip", listSecondPagePipelines[1].Name)
assert.Empty(t, nextPageToken)

/* ---------- List pipelines sort by unsupported description field. Should fail. ---------- */
Expand All @@ -159,7 +160,7 @@ func (s *PipelineApiTest) TestPipelineAPI() {
assert.Nil(t, err)
assert.Equal(t, 2, len(listSecondPagePipelines))
assert.Equal(t, 4, totalSize)
assert.Equal(t, "arguments.tar.gz", listSecondPagePipelines[0].Name)
assert.Equal(t, "arguments.zip", listSecondPagePipelines[0].Name)
assert.Equal(t, "arguments-parameters.yaml", listSecondPagePipelines[1].Name)
assert.Empty(t, nextPageToken)

Expand Down
Binary file removed backend/test/resources/zip-arguments.tar.gz
Binary file not shown.
Binary file added backend/test/resources/zip-arguments.zip
Binary file not shown.