Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support .zip pipeline package #874

Merged
merged 8 commits into from
Mar 1, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright 2018 Google LLC
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come we need a new sample for a new format? It seems the same one could be used for all formats.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was having the source file and zipped file side by side so reader dont need to unzip file to know what is inside. but i think it's not necessary. removed

#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: arguments-parameters-
spec:
entrypoint: whalesay
arguments:
parameters:
- name: param1
value: hello
- name: param2

templates:
- name: whalesay
inputs:
parameters:
- name: param1
- name: param2
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["{{inputs.parameters.param1}}-{{inputs.parameters.param2}}"]
Binary file not shown.
Empty file.
1 change: 1 addition & 0 deletions backend/src/apiserver/server/test/malformated_zip.zip
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
I pretended to be a tarball
Empty file.
Binary file not shown.
53 changes: 44 additions & 9 deletions backend/src/apiserver/server/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package server

import (
"archive/tar"
"archive/zip"
"bufio"
"bytes"
"compress/gzip"
Expand Down Expand Up @@ -54,13 +55,21 @@ func loadFile(fileReader io.Reader, maxFileLength int) ([]byte, error) {
}

func isSupportedPipelineFormat(fileName string) bool {
return isYamlFile(fileName) || strings.HasSuffix(fileName, ".tar.gz")
return isYamlFile(fileName) || isCompressedTarballFile(fileName) || isZipFile(fileName)
}

func isYamlFile(fileName string) bool {
return strings.HasSuffix(fileName, ".yaml") || strings.HasSuffix(fileName, ".yml")
}

func isZipFile(fileName string) bool {
return strings.HasSuffix(fileName, ".zip")
}

func isCompressedTarballFile(fileName string) bool {
return strings.HasSuffix(fileName, ".tar.gz")
}

func DecompressPipelineTarball(compressedFile []byte) ([]byte, error) {
gzipReader, err := gzip.NewReader(bytes.NewReader(compressedFile))
if err != nil {
Expand All @@ -81,9 +90,31 @@ func DecompressPipelineTarball(compressedFile []byte) ([]byte, error) {
return decompressedFile, err
}

func DecompressPipelineZip(compressedFile []byte) ([]byte, error) {
reader, err := zip.NewReader(bytes.NewReader(compressedFile), int64(len(compressedFile)))
if err != nil {
return nil, util.NewInvalidInputErrorWithDetails(err, "Error extracting pipeline from the zip file. Not a valid zip file.")
}
if len(reader.File) < 1 {
return nil, util.NewInvalidInputErrorWithDetails(err, "Error extracting pipeline from the zip file. Empty zip file.")
}
if !isYamlFile(reader.File[0].Name) {
return nil, util.NewInvalidInputError("Error extracting pipeline from the zip file. Expecting a YAML file inside the zip. Got: %v", reader.File[0].Name)
}
rc, err := reader.File[0].Open()
if err != nil {
return nil, util.NewInvalidInputErrorWithDetails(err, "Error extracting pipeline from the zip file. Failed to read the content.")
}
decompressedFile, err := ioutil.ReadAll(rc)
if err != nil {
return nil, util.NewInvalidInputErrorWithDetails(err, "Error reading pipeline YAML from the zip file.")
}
return decompressedFile, err
}

func ReadPipelineFile(fileName string, fileReader io.Reader, maxFileLength int) ([]byte, error) {
if !isSupportedPipelineFormat(fileName) {
return nil, util.NewInvalidInputError("Unexpected pipeline file format. Support .tar.gz or YAML.")
return nil, util.NewInvalidInputError("Unexpected pipeline file format. Support formats are .zip, .tar.gz or YAML.")
}

// Read file into size limited byte array.
Expand All @@ -92,17 +123,21 @@ func ReadPipelineFile(fileName string, fileReader io.Reader, maxFileLength int)
return nil, util.Wrap(err, "Error read pipeline file.")
}

// Return if file is YAML
if isYamlFile(fileName) {
return pipelineFileBytes, nil
var processedFile []byte
switch {
case isYamlFile(fileName):
processedFile = pipelineFileBytes
case isZipFile(fileName):
processedFile, err = DecompressPipelineZip(pipelineFileBytes)
case isCompressedTarballFile(fileName):
processedFile, err = DecompressPipelineTarball(pipelineFileBytes)
default:
return nil, util.NewInvalidInputError("Unexpected pipeline file format. Support .zip, .tar.gz or YAML.")
}

// Decompress if file is tarball
decompressedFile, err := DecompressPipelineTarball(pipelineFileBytes)
if err != nil {
return nil, util.Wrap(err, "Error decompress the pipeline file")
}
return decompressedFile, nil
return processedFile, nil
}

func printParameters(params []*api.Parameter) string {
Expand Down
39 changes: 39 additions & 0 deletions backend/src/apiserver/server/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,36 @@ func TestDecompressPipelineTarball_EmptyTarball(t *testing.T) {
assert.Contains(t, err.Error(), "Not a valid tarball file")
}

func TestDecompressPipelineZip(t *testing.T) {
zipByte, _ := ioutil.ReadFile("test/arguments_zip/arguments-parameters.zip")
pipelineFile, err := DecompressPipelineZip(zipByte)
assert.Nil(t, err)

expectedPipelineFile, _ := ioutil.ReadFile("test/arguments_zip/arguments-parameters.yaml")
assert.Equal(t, expectedPipelineFile, pipelineFile)
}

func TestDecompressPipelineZip_MalformattedZip(t *testing.T) {
zipByte, _ := ioutil.ReadFile("test/malformatted_zip.zip")
_, err := DecompressPipelineZip(zipByte)
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "Not a valid zip file")
}

func TestDecompressPipelineZip_NonYamlZip(t *testing.T) {
zipByte, _ := ioutil.ReadFile("test/non_yaml_zip/non_yaml_file.zip")
_, err := DecompressPipelineZip(zipByte)
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "Expecting a YAML file inside the zip")
}

func TestDecompressPipelineZip_EmptyZip(t *testing.T) {
zipByte, _ := ioutil.ReadFile("test/empty_tarball/empty.zip")
_, err := DecompressPipelineZip(zipByte)
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "Not a valid zip file")
}

func TestReadPipelineFile_YAML(t *testing.T) {
file, _ := os.Open("test/arguments-parameters.yaml")
fileBytes, err := ReadPipelineFile("arguments-parameters.yaml", file, MaxFileLength)
Expand All @@ -93,6 +123,15 @@ func TestReadPipelineFile_YAML(t *testing.T) {
assert.Equal(t, expectedFileBytes, fileBytes)
}

func TestReadPipelineFile_Zip(t *testing.T) {
file, _ := os.Open("test/arguments_zip/arguments-parameters.zip")
pipelineFile, err := ReadPipelineFile("arguments-parameters.zip", file, MaxFileLength)
assert.Nil(t, err)

expectedPipelineFile, _ := ioutil.ReadFile("test/arguments_zip/arguments-parameters.yaml")
assert.Equal(t, expectedPipelineFile, pipelineFile)
}

func TestReadPipelineFile_Tarball(t *testing.T) {
file, _ := os.Open("test/arguments_tarball/arguments.tar.gz")
pipelineFile, err := ReadPipelineFile("arguments.tar.gz", file, MaxFileLength)
Expand Down
15 changes: 8 additions & 7 deletions backend/test/pipeline_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,20 @@ func (s *PipelineApiTest) TestPipelineAPI() {
assert.Nil(t, err)
assert.Equal(t, "sequential", sequentialPipeline.Name)

/* ---------- Upload pipelines tarball ---------- */
/* ---------- Upload pipelines zip ---------- */
time.Sleep(1 * time.Second)
argumentUploadPipeline, err := s.pipelineUploadClient.UploadFile(
"resources/zip-arguments.tar.gz", &uploadParams.UploadPipelineParams{Name: util.StringPointer("zip-arguments-parameters")})
"resources/zip-arguments.zip", &uploadParams.UploadPipelineParams{Name: util.StringPointer("zip-arguments-parameters")})
assert.Nil(t, err)
assert.Equal(t, "zip-arguments-parameters", argumentUploadPipeline.Name)

/* ---------- Import pipeline tarball by URL ---------- */
time.Sleep(1 * time.Second)
argumentUrlPipeline, err := s.pipelineClient.Create(&params.CreatePipelineParams{
Body: &pipeline_model.APIPipeline{URL: &pipeline_model.APIURL{
PipelineURL: "https://storage.googleapis.com/ml-pipeline-dataset/arguments.tar.gz"}}})
PipelineURL: "https://storage.googleapis.com/ml-pipeline-dataset/arguments.zip"}}})
assert.Nil(t, err)
assert.Equal(t, "arguments.tar.gz", argumentUrlPipeline.Name)
assert.Equal(t, "arguments.zip", argumentUrlPipeline.Name)

/* ---------- Verify list pipeline works ---------- */
pipelines, totalSize, _, err := s.pipelineClient.List(params.NewListPipelinesParams())
Expand All @@ -108,7 +109,7 @@ func (s *PipelineApiTest) TestPipelineAPI() {
assert.Equal(t, 2, len(listFirstPagePipelines))
assert.Equal(t, 4, totalSize)
assert.Equal(t, "arguments-parameters.yaml", listFirstPagePipelines[0].Name)
assert.Equal(t, "arguments.tar.gz", listFirstPagePipelines[1].Name)
assert.Equal(t, "arguments.zip", listFirstPagePipelines[1].Name)
assert.NotEmpty(t, nextPageToken)

listSecondPagePipelines, totalSize, nextPageToken, err := s.pipelineClient.List(
Expand Down Expand Up @@ -136,7 +137,7 @@ func (s *PipelineApiTest) TestPipelineAPI() {
assert.Equal(t, 2, len(listSecondPagePipelines))
assert.Equal(t, 4, totalSize)
assert.Equal(t, "zip-arguments-parameters", listSecondPagePipelines[0].Name)
assert.Equal(t, "arguments.tar.gz", listSecondPagePipelines[1].Name)
assert.Equal(t, "arguments.zip", listSecondPagePipelines[1].Name)
assert.Empty(t, nextPageToken)

/* ---------- List pipelines sort by unsupported description field. Should fail. ---------- */
Expand All @@ -159,7 +160,7 @@ func (s *PipelineApiTest) TestPipelineAPI() {
assert.Nil(t, err)
assert.Equal(t, 2, len(listSecondPagePipelines))
assert.Equal(t, 4, totalSize)
assert.Equal(t, "arguments.tar.gz", listSecondPagePipelines[0].Name)
assert.Equal(t, "arguments.zip", listSecondPagePipelines[0].Name)
assert.Equal(t, "arguments-parameters.yaml", listSecondPagePipelines[1].Name)
assert.Empty(t, nextPageToken)

Expand Down
Binary file removed backend/test/resources/zip-arguments.tar.gz
Binary file not shown.
Binary file added backend/test/resources/zip-arguments.zip
Binary file not shown.