From b71fa3d2b93f68ab55535643be5b18796c0ea178 Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Wed, 27 Jan 2021 10:10:33 -0800 Subject: [PATCH 01/49] Testing expression work flow. Null inputs not working. --- testdata/no_input_test/request_body.json | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/testdata/no_input_test/request_body.json b/testdata/no_input_test/request_body.json index ca632f33..9ee5021a 100644 --- a/testdata/no_input_test/request_body.json +++ b/testdata/no_input_test/request_body.json @@ -3,7 +3,8 @@ "input_bam": { "class": "File", "location": "NIST7035.1.chrM.bam" - } + }, + "file_array": null }, "manifest": [ { @@ -281,4 +282,4 @@ } ] } -} \ No newline at end of file +} From c70fb7becfe9168c4b2a440ca34088dc2a25fe02 Mon Sep 17 00:00:00 2001 From: Andrew Prokhorenkov Date: Wed, 27 Jan 2021 12:22:54 -0600 Subject: [PATCH 02/49] debug: some extra debug logs --- mariner/input.go | 3 +++ testdata/no_input_test/request_body.json | 3 +-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/mariner/input.go b/mariner/input.go index facfb84f..26794451 100644 --- a/mariner/input.go +++ b/mariner/input.go @@ -256,6 +256,7 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter // Question: how to handle non-array/struct data types? // --------- no preprocessing should have to happen in this case. self, err := tool.loadInputValue(input) + tool.Task.warnf("!!! fails here 259: %v", input) if err != nil { return nil, tool.Task.errorf("failed to load value: %v", err) } @@ -306,6 +307,7 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter // if this tool is a step of a parent workflow but the valueFrom is empty if out == nil { out, err = tool.loadInputValue(input) + tool.Task.warnf("!!! or fails here: 309, %v", input) if err != nil { return nil, tool.Task.errorf("failed to load input value: %v", err) } @@ -475,6 +477,7 @@ func (tool *Tool) loadInputValue(input *cwl.Input) (out interface{}, err error) required = true for _, t := range input.Types { if t.Type == CWLNullType { + tool.Task.warnf("!!! debug, value of type: %v", t.Type) required = false } } diff --git a/testdata/no_input_test/request_body.json b/testdata/no_input_test/request_body.json index 9ee5021a..f5c2f4a4 100644 --- a/testdata/no_input_test/request_body.json +++ b/testdata/no_input_test/request_body.json @@ -3,8 +3,7 @@ "input_bam": { "class": "File", "location": "NIST7035.1.chrM.bam" - }, - "file_array": null + } }, "manifest": [ { From 1970619c2fd51106b95718f8c9ffa81dec5ba241 Mon Sep 17 00:00:00 2001 From: Andrew Prokhorenkov Date: Wed, 27 Jan 2021 16:17:14 -0600 Subject: [PATCH 03/49] debug: see the types --- mariner/input.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mariner/input.go b/mariner/input.go index 26794451..b28866fe 100644 --- a/mariner/input.go +++ b/mariner/input.go @@ -476,8 +476,8 @@ func (tool *Tool) loadInputValue(input *cwl.Input) (out interface{}, err error) // determine if this param is required or optional required = true for _, t := range input.Types { + tool.Task.warnf("!!! debug, value of type: %v", t.Type) if t.Type == CWLNullType { - tool.Task.warnf("!!! debug, value of type: %v", t.Type) required = false } } From 3a639a2bd15171d3b527bb3bd6ac21d38620e99e Mon Sep 17 00:00:00 2001 From: Andrew Prokhorenkov Date: Fri, 29 Jan 2021 16:00:38 -0600 Subject: [PATCH 04/49] debug: test --- mariner/input.go | 32 +++++++++++------------- testdata/no_input_test/request_body.json | 16 ++++++------ 2 files changed, 23 insertions(+), 25 deletions(-) diff --git a/mariner/input.go b/mariner/input.go index b28866fe..44d21580 100644 --- a/mariner/input.go +++ b/mariner/input.go @@ -270,26 +270,22 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter return nil, tool.Task.errorf("failed to set 'self' value in js vm: %v", err) } - /* - // Troubleshooting js - // note: when accessing object fields using keys must use otto.Run("obj.key"), NOT otto.Get("obj.key") + // Troubleshooting js + // note: when accessing object fields using keys must use otto.Run("obj.key"), NOT otto.Get("obj.key") - fmt.Println("self in js:") - jsSelf, err := vm.Get("self") - jsSelfVal, err := jsSelf.Export() - PrintJSON(jsSelfVal) + fmt.Println("self in js:") + jsSelf, err := vm.Get("self") + jsSelfVal, err := jsSelf.Export() + tool.Task.infof("self value: %v", jsSelfVal) + tool.Task.infof("expression: %v", valueFrom) - fmt.Println("Expression:") - PrintJSON(valueFrom) - - fmt.Println("Object.keys(self)") - keys, err := vm.Run("Object.keys(self)") - if err != nil { - fmt.Printf("Error evaluating Object.keys(self): %v\n", err) - } - keysVal, err := keys.Export() - PrintJSON(keysVal) - */ + fmt.Println("Object.keys(self)") + keys, err := vm.Run("Object.keys(self)") + if err != nil { + fmt.Printf("Error evaluating Object.keys(self): %v\n", err) + } + keysVal, err := keys.Export() + tool.Task.infof("keysVal value: %v", keysVal) // eval the expression in the vm, capture result in `out` if out, err = evalExpression(valueFrom, vm); err != nil { diff --git a/testdata/no_input_test/request_body.json b/testdata/no_input_test/request_body.json index f5c2f4a4..1287e41d 100644 --- a/testdata/no_input_test/request_body.json +++ b/testdata/no_input_test/request_body.json @@ -85,13 +85,15 @@ { "inputs": [ { - "type": { - "items": [ - "null", - "File" - ], - "type": "array" - }, + "type": [ + { + "items": "File", + "type": "array" + }, + { + "type": "null" + } + ], "id": "#expressiontool_test.cwl/file_array" } ], From c22603143ac936dd8fe0cf2be4bdb2e5485dac26 Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Fri, 29 Jan 2021 16:18:40 -0800 Subject: [PATCH 05/49] get to expression tool js vm parsing and make tool dir --- mariner/engine.go | 5 +++++ testdata/no_input_test/request_body.json | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/mariner/engine.go b/mariner/engine.go index 5b87e14d..0ec3c830 100644 --- a/mariner/engine.go +++ b/mariner/engine.go @@ -462,6 +462,11 @@ func (engine *K8sEngine) listenForDone(tool *Tool) (err error) { // so I think the expressiontool should run as a job, just like commandlinetools func (engine *K8sEngine) runExpressionTool(tool *Tool) (err error) { engine.infof("begin run ExpressionTool: %v", tool.Task.Root.ID) + + if err = os.Mkdir(tool.WorkingDir); err != nil { + return engine.errorf("failed to make tool working dir: %v; error: %v", tool.Task.Root.ID, err) + } + // note: context has already been loaded if err = os.Chdir(tool.WorkingDir); err != nil { return engine.errorf("failed to move to tool working dir: %v; error: %v", tool.Task.Root.ID, err) diff --git a/testdata/no_input_test/request_body.json b/testdata/no_input_test/request_body.json index 1287e41d..42ee1c3b 100644 --- a/testdata/no_input_test/request_body.json +++ b/testdata/no_input_test/request_body.json @@ -255,7 +255,7 @@ "in": [ { "source": "#subworkflow_test.cwl/test_initworkdir/bam_with_index", - "valueFrom": "$([self, self.secondaryFiles[0]])", + "valueFrom": "NIST7035.1.chrM.bam.bai", "id": "#subworkflow_test.cwl/test_expr/file_array" } ], From fdbe51656ae7e997ebcead8e19a0958ed7650d3f Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Mon, 1 Feb 2021 08:56:14 -0800 Subject: [PATCH 06/49] add FileMode to make tool working dir --- mariner/engine.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mariner/engine.go b/mariner/engine.go index 0ec3c830..03fa652d 100644 --- a/mariner/engine.go +++ b/mariner/engine.go @@ -463,7 +463,7 @@ func (engine *K8sEngine) listenForDone(tool *Tool) (err error) { func (engine *K8sEngine) runExpressionTool(tool *Tool) (err error) { engine.infof("begin run ExpressionTool: %v", tool.Task.Root.ID) - if err = os.Mkdir(tool.WorkingDir); err != nil { + if err = os.Mkdir(tool.WorkingDir, os.ModeDir); err != nil { return engine.errorf("failed to make tool working dir: %v; error: %v", tool.Task.Root.ID, err) } From 439c2ee9db4b82e62bfb7e55384471db8d61c69a Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Mon, 1 Feb 2021 09:40:32 -0800 Subject: [PATCH 07/49] change to MkdirAll for any necessary parent paths --- mariner/engine.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mariner/engine.go b/mariner/engine.go index 03fa652d..a2e9706f 100644 --- a/mariner/engine.go +++ b/mariner/engine.go @@ -463,7 +463,7 @@ func (engine *K8sEngine) listenForDone(tool *Tool) (err error) { func (engine *K8sEngine) runExpressionTool(tool *Tool) (err error) { engine.infof("begin run ExpressionTool: %v", tool.Task.Root.ID) - if err = os.Mkdir(tool.WorkingDir, os.ModeDir); err != nil { + if err = os.MkdirAll(tool.WorkingDir, os.ModeDir); err != nil { return engine.errorf("failed to make tool working dir: %v; error: %v", tool.Task.Root.ID, err) } From 9fff9fcb3a81bb8e39f65ee440dbbd53f622f24a Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Tue, 2 Feb 2021 15:50:23 -0800 Subject: [PATCH 08/49] some debugging for initworkdir --- mariner/engine.go | 6 +++--- mariner/tool.go | 2 ++ 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/mariner/engine.go b/mariner/engine.go index a2e9706f..6fcc6eaf 100644 --- a/mariner/engine.go +++ b/mariner/engine.go @@ -463,9 +463,9 @@ func (engine *K8sEngine) listenForDone(tool *Tool) (err error) { func (engine *K8sEngine) runExpressionTool(tool *Tool) (err error) { engine.infof("begin run ExpressionTool: %v", tool.Task.Root.ID) - if err = os.MkdirAll(tool.WorkingDir, os.ModeDir); err != nil { - return engine.errorf("failed to make tool working dir: %v; error: %v", tool.Task.Root.ID, err) - } +// if err = os.MkdirAll(tool.WorkingDir, os.ModeDir); err != nil { +// return engine.errorf("failed to make tool working dir: %v; error: %v", tool.Task.Root.ID, err) +// } // note: context has already been loaded if err = os.Chdir(tool.WorkingDir); err != nil { diff --git a/mariner/tool.go b/mariner/tool.go index a664b976..d8c9bb99 100644 --- a/mariner/tool.go +++ b/mariner/tool.go @@ -84,6 +84,7 @@ func (engine *K8sEngine) initWorkDirReq(tool *Tool) (err error) { if err != nil { return tool.Task.errorf("error marshalling contents to file: %v", err) } + tool.Task.infof("Converted file to json: %v", b) } result, err := uploader.Upload(&s3manager.UploadInput{ @@ -94,6 +95,7 @@ func (engine *K8sEngine) initWorkDirReq(tool *Tool) (err error) { if err != nil { return fmt.Errorf("upload to s3 failed: %v", err) } + tool.Task.infof("wrote initdir bytes to s3 object: %v", result.Location) fmt.Println("wrote initdir bytes to s3 object:", result.Location) // log } From d788fa08a770baac8be1a942f4b0fb836e851200 Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Tue, 2 Feb 2021 16:19:00 -0800 Subject: [PATCH 09/49] js debugging --- mariner/js.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/mariner/js.go b/mariner/js.go index 4b3b6cc5..06d7f3e8 100644 --- a/mariner/js.go +++ b/mariner/js.go @@ -112,12 +112,17 @@ func (tool *Tool) resolveExpressions(inText string) (outText string, outFile *Fi // get full $(...) expression expression = c1 + c2 + expression + tool.Task.infof("Full $(...) expression: %v", expression) + // eval that thing result, err := evalExpression(expression, tool.InputsVM) if err != nil { return "", outFile, tool.Task.errorf("%v", err) } + tool.Task.infof("expression result: %v", result) + tool.Task.infof("expression result type: %v", result.(type)) + // result ought to be a string (edit: OR a file) switch result.(type) { case string: From 0bb53b89d3f44c347a3fe8afb3c9820f2606f91d Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Tue, 2 Feb 2021 16:30:01 -0800 Subject: [PATCH 10/49] js debugging --- mariner/js.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mariner/js.go b/mariner/js.go index 06d7f3e8..7129d760 100644 --- a/mariner/js.go +++ b/mariner/js.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "strings" + "reflect" "github.com/robertkrimen/otto" ) @@ -121,7 +122,7 @@ func (tool *Tool) resolveExpressions(inText string) (outText string, outFile *Fi } tool.Task.infof("expression result: %v", result) - tool.Task.infof("expression result type: %v", result.(type)) + tool.Task.infof("expression result type: %v", reflect.TypeOf(result)) // result ought to be a string (edit: OR a file) switch result.(type) { From 9c2579b3d3b3f99e84196602204c5521e5409b67 Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Wed, 3 Feb 2021 13:59:47 -0800 Subject: [PATCH 11/49] output and tool debugging --- mariner/output.go | 3 +++ mariner/tool.go | 2 ++ 2 files changed, 5 insertions(+) diff --git a/mariner/output.go b/mariner/output.go index 34f7550e..0fc0bb69 100644 --- a/mariner/output.go +++ b/mariner/output.go @@ -47,6 +47,8 @@ func (engine *K8sEngine) handleCLTOutput(tool *Tool) (err error) { } } + tool.Task.infof("results from engine glob: %v", results) + // 2. Load Contents // no need to handle prefixes here, since the full paths // are already in the File objects stored in `results` @@ -138,6 +140,7 @@ func (engine *K8sEngine) handleCLTOutput(tool *Tool) (err error) { } //// end of 4 step processing pipeline for collecting/handling output files //// + tool.Task.infof("output CWL type: %v", output.Types[0].Type) // at this point we have file results captured in `results` // output should be a CWLFileType or "array of Files" // fixme - make this case handling more specific in the else condition - don't just catch anything diff --git a/mariner/tool.go b/mariner/tool.go index d8c9bb99..ec5d6ed3 100644 --- a/mariner/tool.go +++ b/mariner/tool.go @@ -30,6 +30,8 @@ func (engine *K8sEngine) initWorkDirReq(tool *Tool) (err error) { // logic: exactly one of resultString or resultFile should be returned resultText, resultFile, err := tool.resolveExpressions(listing.Entry) + tool.Task.infof("resultText: %v", resultText) + tool.Task.infof("resultFile: %v", resultFile) switch { case err != nil: return tool.Task.errorf("failed to resolve expressions in entry: %v; error: %v", listing.Entry, err) From 273454a3ea3f9c0888330cbc4928e72323a549e1 Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Wed, 3 Feb 2021 16:24:06 -0800 Subject: [PATCH 12/49] Add inputs to js vm --- mariner/input.go | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/mariner/input.go b/mariner/input.go index 44d21580..d7033e3a 100644 --- a/mariner/input.go +++ b/mariner/input.go @@ -302,11 +302,37 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter // OR // if this tool is a step of a parent workflow but the valueFrom is empty if out == nil { + // get a js vm + vm := tool.JSVM.Copy() // #js-runtime + out, err = tool.loadInputValue(input) - tool.Task.warnf("!!! or fails here: 309, %v", input) + tool.Task.warnf("!!! [Inputs] or fails here: %v", out) if err != nil { return nil, tool.Task.errorf("failed to load input value: %v", err) } + + out, err = preProcessContext(out) + if err != nil { + return nil, tool.Task.errorf("failed to preprocess context for tool input: %v", err) + } + + // set `inputs` variable in vm + if err = vm.Set("inputs", out); err != nil { + return nil, tool.Task.errorf("failed to set 'inputs' value in js vm: %v", err) + } + + jsInputs, err := vm.Get("inputs") + jsInputsVal, err := jsInputs.Export() + tool.Task.infof("inputs value: %v", jsInputsVal) + + fmt.Println("Object.keys(inputs)") + keys, err := vm.Run("Object.keys(inputs)") + if err != nil { + fmt.Printf("Error evaluating Object.keys(inputs): %v\n", err) + } + keysVal, err := keys.Export() + tool.Task.infof("input keysVal value: %v", keysVal) + if out == nil { // implies an optional parameter with no value provided and no default value specified // this input parameter is not used by the tool From d3ed910f8235f0d84580373c6b8442410fdfadfa Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Fri, 5 Feb 2021 12:42:05 -0800 Subject: [PATCH 13/49] input and output debugging --- mariner/input.go | 29 +---------------------------- mariner/output.go | 3 +++ 2 files changed, 4 insertions(+), 28 deletions(-) diff --git a/mariner/input.go b/mariner/input.go index d7033e3a..246f18c2 100644 --- a/mariner/input.go +++ b/mariner/input.go @@ -302,36 +302,9 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter // OR // if this tool is a step of a parent workflow but the valueFrom is empty if out == nil { - // get a js vm - vm := tool.JSVM.Copy() // #js-runtime out, err = tool.loadInputValue(input) - tool.Task.warnf("!!! [Inputs] or fails here: %v", out) - if err != nil { - return nil, tool.Task.errorf("failed to load input value: %v", err) - } - - out, err = preProcessContext(out) - if err != nil { - return nil, tool.Task.errorf("failed to preprocess context for tool input: %v", err) - } - - // set `inputs` variable in vm - if err = vm.Set("inputs", out); err != nil { - return nil, tool.Task.errorf("failed to set 'inputs' value in js vm: %v", err) - } - - jsInputs, err := vm.Get("inputs") - jsInputsVal, err := jsInputs.Export() - tool.Task.infof("inputs value: %v", jsInputsVal) - - fmt.Println("Object.keys(inputs)") - keys, err := vm.Run("Object.keys(inputs)") - if err != nil { - fmt.Printf("Error evaluating Object.keys(inputs): %v\n", err) - } - keysVal, err := keys.Export() - tool.Task.infof("input keysVal value: %v", keysVal) + tool.Task.warnf("!!! or fails here: 309, %v", out) if out == nil { // implies an optional parameter with no value provided and no default value specified diff --git a/mariner/output.go b/mariner/output.go index 0fc0bb69..aea85de8 100644 --- a/mariner/output.go +++ b/mariner/output.go @@ -178,7 +178,9 @@ func (engine *K8sEngine) glob(tool *Tool, output *cwl.Output) (results []*File, } patterns = append(patterns, pattern) } + tool.Task.infof("glob: patterns value: %v", patterns) paths, err := engine.globS3(tool, patterns) + tool.Task.infof("glob: paths value: %v", paths) if err != nil { return results, tool.Task.errorf("%v", err) } @@ -275,6 +277,7 @@ func (tool *Tool) pattern(glob string) (pattern string, err error) { if !ok { return "", tool.Task.errorf("glob expression doesn't return a string pattern") } + tool.Task.infof("resulting pattern: %v", pattern) return pattern, nil } // not an expression, so no eval necessary From 926b49aed05cb344a1c874377f70e513eb90d11c Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Fri, 5 Feb 2021 14:55:36 -0800 Subject: [PATCH 14/49] log s3 glob --- mariner/output.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/mariner/output.go b/mariner/output.go index aea85de8..d4c9d934 100644 --- a/mariner/output.go +++ b/mariner/output.go @@ -229,6 +229,7 @@ func (engine *K8sEngine) globS3(tool *Tool, patterns []string) ([]string, error) var collectFile bool var path string globResults := []string{} + tool.Task.infof("globS3: objectList: %v", objectList) for _, obj := range objectList.Contents { // match key against pattern key = *obj.Key @@ -236,7 +237,7 @@ func (engine *K8sEngine) globS3(tool *Tool, patterns []string) ([]string, error) collectFile = false for _, pattern := range patterns { s3Pattern := strings.TrimPrefix(engine.localPathToS3Key(pattern), "/") - + tool.Task.infof("globS3: s3Pattern: %v", s3Pattern) // handle case of glob pattern not resolving to absolute path // fixme: this is not pretty if !strings.HasPrefix(s3Pattern, engine.UserID) { @@ -245,17 +246,20 @@ func (engine *K8sEngine) globS3(tool *Tool, patterns []string) ([]string, error) } match, err = filepath.Match(s3Pattern, key) + tool.Task.infof("globS3: match: %v", match) if err != nil { return nil, fmt.Errorf("glob pattern matching failed: %v", err) } else if match { collectFile = true } } + tool.Task.infof("globS3: collectFile: %v", collectFile) if collectFile { // this needs to be represented as a filepath, not a "key" // i.e., it needs a slash at the beginning path = engine.s3KeyToLocalPath(fmt.Sprintf("/%s", key)) globResults = append(globResults, path) + tool.Task.infof("globS3: path: %v", path) } } return globResults, nil From 87d0061d6ca2ba7f63bb8323f0b7b31d9148880b Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Fri, 5 Feb 2021 15:06:50 -0800 Subject: [PATCH 15/49] log s3 glob --- mariner/output.go | 1 - 1 file changed, 1 deletion(-) diff --git a/mariner/output.go b/mariner/output.go index d4c9d934..8db3ecb7 100644 --- a/mariner/output.go +++ b/mariner/output.go @@ -229,7 +229,6 @@ func (engine *K8sEngine) globS3(tool *Tool, patterns []string) ([]string, error) var collectFile bool var path string globResults := []string{} - tool.Task.infof("globS3: objectList: %v", objectList) for _, obj := range objectList.Contents { // match key against pattern key = *obj.Key From 81eaa73977e273711ccccf2deb59cf00f5fac8a3 Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Fri, 5 Feb 2021 15:14:07 -0800 Subject: [PATCH 16/49] log s3 glob --- mariner/output.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mariner/output.go b/mariner/output.go index 8db3ecb7..08151a33 100644 --- a/mariner/output.go +++ b/mariner/output.go @@ -252,7 +252,7 @@ func (engine *K8sEngine) globS3(tool *Tool, patterns []string) ([]string, error) collectFile = true } } - tool.Task.infof("globS3: collectFile: %v", collectFile) + if collectFile { // this needs to be represented as a filepath, not a "key" // i.e., it needs a slash at the beginning From d1537be3d976fd2d3871ff998e3398ab67400968 Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Fri, 5 Feb 2021 15:23:34 -0800 Subject: [PATCH 17/49] just log s3 path --- mariner/output.go | 1 - 1 file changed, 1 deletion(-) diff --git a/mariner/output.go b/mariner/output.go index 08151a33..8eb4195f 100644 --- a/mariner/output.go +++ b/mariner/output.go @@ -245,7 +245,6 @@ func (engine *K8sEngine) globS3(tool *Tool, patterns []string) ([]string, error) } match, err = filepath.Match(s3Pattern, key) - tool.Task.infof("globS3: match: %v", match) if err != nil { return nil, fmt.Errorf("glob pattern matching failed: %v", err) } else if match { From 289b28b73342c8f4fa6214a765f66a884e4b1ecb Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Fri, 5 Feb 2021 15:40:44 -0800 Subject: [PATCH 18/49] add error handling to laodInput and s3 bucket logs --- mariner/input.go | 4 ++++ mariner/output.go | 5 +++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/mariner/input.go b/mariner/input.go index 246f18c2..89274135 100644 --- a/mariner/input.go +++ b/mariner/input.go @@ -306,6 +306,10 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter out, err = tool.loadInputValue(input) tool.Task.warnf("!!! or fails here: 309, %v", out) + if err != nil { + return nil, tool.Task.errorf("failed to load input value: %v", err) + } + if out == nil { // implies an optional parameter with no value provided and no default value specified // this input parameter is not used by the tool diff --git a/mariner/output.go b/mariner/output.go index 8eb4195f..8d236f24 100644 --- a/mariner/output.go +++ b/mariner/output.go @@ -229,6 +229,7 @@ func (engine *K8sEngine) globS3(tool *Tool, patterns []string) ([]string, error) var collectFile bool var path string globResults := []string{} + tool.Task.infof("globS3: objectList: %v", objectList) for _, obj := range objectList.Contents { // match key against pattern key = *obj.Key @@ -245,19 +246,19 @@ func (engine *K8sEngine) globS3(tool *Tool, patterns []string) ([]string, error) } match, err = filepath.Match(s3Pattern, key) + tool.Task.infof("globS3: match: %v", match) if err != nil { return nil, fmt.Errorf("glob pattern matching failed: %v", err) } else if match { collectFile = true } } - + tool.Task.infof("globS3: collectFile: %v", collectFile) if collectFile { // this needs to be represented as a filepath, not a "key" // i.e., it needs a slash at the beginning path = engine.s3KeyToLocalPath(fmt.Sprintf("/%s", key)) globResults = append(globResults, path) - tool.Task.infof("globS3: path: %v", path) } } return globResults, nil From 98d17dea28361bb713c33a7efcc7f3fd3e4f94bf Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Fri, 5 Feb 2021 16:24:37 -0800 Subject: [PATCH 19/49] output pattern log for s3 --- mariner/output.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/mariner/output.go b/mariner/output.go index 8d236f24..8dfc664a 100644 --- a/mariner/output.go +++ b/mariner/output.go @@ -243,10 +243,13 @@ func (engine *K8sEngine) globS3(tool *Tool, patterns []string) ([]string, error) if !strings.HasPrefix(s3Pattern, engine.UserID) { s3wkdir := strings.TrimPrefix(engine.localPathToS3Key(tool.WorkingDir), "/") s3Pattern = fmt.Sprintf("%s/%s", strings.TrimSuffix(s3wkdir, "/"), strings.TrimPrefix(s3Pattern, "/")) + tool.Task.infof("globS3: s3wkdir: %v", s3wkdir) + tool.Task.infof("globS3: s3Pattern: %v", s3Pattern) } - + tool.Task.infof("globS3: key: %v", key) match, err = filepath.Match(s3Pattern, key) tool.Task.infof("globS3: match: %v", match) + if err != nil { return nil, fmt.Errorf("glob pattern matching failed: %v", err) } else if match { From 55540ea13a50031cefe6a026ab75151d1e30fd64 Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Mon, 8 Feb 2021 12:01:54 -0800 Subject: [PATCH 20/49] match any sequence after s3Pattern to key --- mariner/output.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mariner/output.go b/mariner/output.go index 8dfc664a..ead41441 100644 --- a/mariner/output.go +++ b/mariner/output.go @@ -242,7 +242,8 @@ func (engine *K8sEngine) globS3(tool *Tool, patterns []string) ([]string, error) // fixme: this is not pretty if !strings.HasPrefix(s3Pattern, engine.UserID) { s3wkdir := strings.TrimPrefix(engine.localPathToS3Key(tool.WorkingDir), "/") - s3Pattern = fmt.Sprintf("%s/%s", strings.TrimSuffix(s3wkdir, "/"), strings.TrimPrefix(s3Pattern, "/")) + // add a "*" at the end to capture substrings of s3Pattern in key + s3Pattern = fmt.Sprintf("%s/%s%s", strings.TrimSuffix(s3wkdir, "/"), strings.TrimPrefix(s3Pattern, "/"), "*") tool.Task.infof("globS3: s3wkdir: %v", s3wkdir) tool.Task.infof("globS3: s3Pattern: %v", s3Pattern) } From add05343382c6e9a633b99fae606b4ed75b3768c Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Mon, 8 Feb 2021 16:24:26 -0800 Subject: [PATCH 21/49] modify default check for inputs for tool --- mariner/output.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/mariner/output.go b/mariner/output.go index ead41441..7d2bb1cc 100644 --- a/mariner/output.go +++ b/mariner/output.go @@ -242,8 +242,7 @@ func (engine *K8sEngine) globS3(tool *Tool, patterns []string) ([]string, error) // fixme: this is not pretty if !strings.HasPrefix(s3Pattern, engine.UserID) { s3wkdir := strings.TrimPrefix(engine.localPathToS3Key(tool.WorkingDir), "/") - // add a "*" at the end to capture substrings of s3Pattern in key - s3Pattern = fmt.Sprintf("%s/%s%s", strings.TrimSuffix(s3wkdir, "/"), strings.TrimPrefix(s3Pattern, "/"), "*") + s3Pattern = fmt.Sprintf("%s/%s", strings.TrimSuffix(s3wkdir, "/"), strings.TrimPrefix(s3Pattern, "/")) tool.Task.infof("globS3: s3wkdir: %v", s3wkdir) tool.Task.infof("globS3: s3Pattern: %v", s3Pattern) } @@ -257,6 +256,7 @@ func (engine *K8sEngine) globS3(tool *Tool, patterns []string) ([]string, error) collectFile = true } } + tool.Task.infof("globS3: collectFile: %v", collectFile) if collectFile { // this needs to be represented as a filepath, not a "key" @@ -264,6 +264,15 @@ func (engine *K8sEngine) globS3(tool *Tool, patterns []string) ([]string, error) path = engine.s3KeyToLocalPath(fmt.Sprintf("/%s", key)) globResults = append(globResults, path) } + + // If no matching file, last attempt tp check the initial workflow S3 input paths for a match + if !collectFile and len(tool.S3Input.Paths) > 0 { + tool.Task.infof("globS3: No match, begin check input default.") + if strings.Compare(s3Pattern, tool.S3Input.Paths[0]) == 0 { + path = tool.S3Input.Paths[0] + globResults = append(globResults, path) + } + } } return globResults, nil } From 3d67c9779923e5c0cce841c5c90236dd5c9e0960 Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Mon, 8 Feb 2021 16:30:19 -0800 Subject: [PATCH 22/49] fix go error --- mariner/output.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mariner/output.go b/mariner/output.go index 7d2bb1cc..62eab6af 100644 --- a/mariner/output.go +++ b/mariner/output.go @@ -266,7 +266,7 @@ func (engine *K8sEngine) globS3(tool *Tool, patterns []string) ([]string, error) } // If no matching file, last attempt tp check the initial workflow S3 input paths for a match - if !collectFile and len(tool.S3Input.Paths) > 0 { + if !collectFile && len(tool.S3Input.Paths) > 0 { tool.Task.infof("globS3: No match, begin check input default.") if strings.Compare(s3Pattern, tool.S3Input.Paths[0]) == 0 { path = tool.S3Input.Paths[0] From cb1a69db86c78027bee04fc8d1c2db722a358269 Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Mon, 8 Feb 2021 16:38:04 -0800 Subject: [PATCH 23/49] move s3Pattern declaration out of loop --- mariner/output.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mariner/output.go b/mariner/output.go index 62eab6af..63885d98 100644 --- a/mariner/output.go +++ b/mariner/output.go @@ -228,6 +228,7 @@ func (engine *K8sEngine) globS3(tool *Tool, patterns []string) ([]string, error) var match bool var collectFile bool var path string + var s3Pattern string globResults := []string{} tool.Task.infof("globS3: objectList: %v", objectList) for _, obj := range objectList.Contents { @@ -236,7 +237,7 @@ func (engine *K8sEngine) globS3(tool *Tool, patterns []string) ([]string, error) collectFile = false for _, pattern := range patterns { - s3Pattern := strings.TrimPrefix(engine.localPathToS3Key(pattern), "/") + s3Pattern = strings.TrimPrefix(engine.localPathToS3Key(pattern), "/") tool.Task.infof("globS3: s3Pattern: %v", s3Pattern) // handle case of glob pattern not resolving to absolute path // fixme: this is not pretty From ce5bfabf359a9b8548a0a077807d7d502c3a7059 Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Mon, 8 Feb 2021 17:17:21 -0800 Subject: [PATCH 24/49] debugging s3 input paths --- mariner/output.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/mariner/output.go b/mariner/output.go index 63885d98..ac34993f 100644 --- a/mariner/output.go +++ b/mariner/output.go @@ -230,6 +230,7 @@ func (engine *K8sEngine) globS3(tool *Tool, patterns []string) ([]string, error) var path string var s3Pattern string globResults := []string{} + tool.Task.infof("globS3: Tool Input Paths: %v", tool.S3Input.Paths) tool.Task.infof("globS3: objectList: %v", objectList) for _, obj := range objectList.Contents { // match key against pattern @@ -266,12 +267,13 @@ func (engine *K8sEngine) globS3(tool *Tool, patterns []string) ([]string, error) globResults = append(globResults, path) } - // If no matching file, last attempt tp check the initial workflow S3 input paths for a match + // If no matching file, last attempt to check the initial workflow S3 input paths for a match if !collectFile && len(tool.S3Input.Paths) > 0 { tool.Task.infof("globS3: No match, begin check input default.") if strings.Compare(s3Pattern, tool.S3Input.Paths[0]) == 0 { - path = tool.S3Input.Paths[0] - globResults = append(globResults, path) + path = engine.s3KeyToLocalPath(fmt.Sprintf("/%s", s3Pattern)) + tool.Task.infof("globS3: default path: %v", path) + globResults = append(globResults, s3Pattern) } } } From 72ca71c6c26a082f2d0c30dc59d01e2db72b1528 Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Mon, 8 Feb 2021 17:34:24 -0800 Subject: [PATCH 25/49] strip s3 path --- mariner/output.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mariner/output.go b/mariner/output.go index ac34993f..f6232813 100644 --- a/mariner/output.go +++ b/mariner/output.go @@ -270,7 +270,8 @@ func (engine *K8sEngine) globS3(tool *Tool, patterns []string) ([]string, error) // If no matching file, last attempt to check the initial workflow S3 input paths for a match if !collectFile && len(tool.S3Input.Paths) > 0 { tool.Task.infof("globS3: No match, begin check input default.") - if strings.Compare(s3Pattern, tool.S3Input.Paths[0]) == 0 { + stripS3Pattern := strings.Split(s3Pattern, "/") + if strings.Compare(stripS3Pattern[len(stripS3Pattern)-1], tool.S3Input.Paths[0]) == 0 { path = engine.s3KeyToLocalPath(fmt.Sprintf("/%s", s3Pattern)) tool.Task.infof("globS3: default path: %v", path) globResults = append(globResults, s3Pattern) From 0fe99642836dd15e0eb5038721aa32521fb6cd05 Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Tue, 9 Feb 2021 11:44:09 -0800 Subject: [PATCH 26/49] fix s3 path and create tool dir if not specified in cwl --- mariner/engine.go | 7 ++++--- mariner/output.go | 4 +--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/mariner/engine.go b/mariner/engine.go index 6fcc6eaf..38db19e1 100644 --- a/mariner/engine.go +++ b/mariner/engine.go @@ -463,9 +463,10 @@ func (engine *K8sEngine) listenForDone(tool *Tool) (err error) { func (engine *K8sEngine) runExpressionTool(tool *Tool) (err error) { engine.infof("begin run ExpressionTool: %v", tool.Task.Root.ID) -// if err = os.MkdirAll(tool.WorkingDir, os.ModeDir); err != nil { -// return engine.errorf("failed to make tool working dir: %v; error: %v", tool.Task.Root.ID, err) -// } + // initial tool directories supported only if InitialWorkDirRequirement specified + if err = os.MkdirAll(tool.WorkingDir, os.ModeDir); err != nil { + return engine.errorf("failed to make tool working dir: %v; error: %v", tool.Task.Root.ID, err) + } // note: context has already been loaded if err = os.Chdir(tool.WorkingDir); err != nil { diff --git a/mariner/output.go b/mariner/output.go index f6232813..cea6cdde 100644 --- a/mariner/output.go +++ b/mariner/output.go @@ -272,9 +272,7 @@ func (engine *K8sEngine) globS3(tool *Tool, patterns []string) ([]string, error) tool.Task.infof("globS3: No match, begin check input default.") stripS3Pattern := strings.Split(s3Pattern, "/") if strings.Compare(stripS3Pattern[len(stripS3Pattern)-1], tool.S3Input.Paths[0]) == 0 { - path = engine.s3KeyToLocalPath(fmt.Sprintf("/%s", s3Pattern)) - tool.Task.infof("globS3: default path: %v", path) - globResults = append(globResults, s3Pattern) + globResults = append(globResults, stripS3Pattern) } } } From d77d8e72460cefa4829bafb9ad78376405a76a3f Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Tue, 9 Feb 2021 11:49:43 -0800 Subject: [PATCH 27/49] fix s3 strip path --- mariner/output.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mariner/output.go b/mariner/output.go index cea6cdde..bd5339c9 100644 --- a/mariner/output.go +++ b/mariner/output.go @@ -272,7 +272,8 @@ func (engine *K8sEngine) globS3(tool *Tool, patterns []string) ([]string, error) tool.Task.infof("globS3: No match, begin check input default.") stripS3Pattern := strings.Split(s3Pattern, "/") if strings.Compare(stripS3Pattern[len(stripS3Pattern)-1], tool.S3Input.Paths[0]) == 0 { - globResults = append(globResults, stripS3Pattern) + path = stripS3Pattern[len(stripS3Pattern)-1] + globResults = append(globResults, path) } } } From f4a14324e153117e23412db4cfcf245c760c07da Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Tue, 9 Feb 2021 12:06:08 -0800 Subject: [PATCH 28/49] test local path for input --- mariner/output.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mariner/output.go b/mariner/output.go index bd5339c9..9dd972fa 100644 --- a/mariner/output.go +++ b/mariner/output.go @@ -272,7 +272,8 @@ func (engine *K8sEngine) globS3(tool *Tool, patterns []string) ([]string, error) tool.Task.infof("globS3: No match, begin check input default.") stripS3Pattern := strings.Split(s3Pattern, "/") if strings.Compare(stripS3Pattern[len(stripS3Pattern)-1], tool.S3Input.Paths[0]) == 0 { - path = stripS3Pattern[len(stripS3Pattern)-1] + path = engine.s3KeyToLocalPath(fmt.Sprintf("/%s", s3Pattern)) + tool.Task.infof("globS3: default path: %v", path) globResults = append(globResults, path) } } From 336a8a3dbe80a041740e0bf45bb9ce23d0dd1572 Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Fri, 12 Feb 2021 13:47:02 -0800 Subject: [PATCH 29/49] troubleshoot sidecar for s3 fetches --- mariner/output.go | 20 ++++++++++---------- sidecar/sidecar.go | 2 +- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/mariner/output.go b/mariner/output.go index 9dd972fa..a8595d64 100644 --- a/mariner/output.go +++ b/mariner/output.go @@ -267,16 +267,16 @@ func (engine *K8sEngine) globS3(tool *Tool, patterns []string) ([]string, error) globResults = append(globResults, path) } - // If no matching file, last attempt to check the initial workflow S3 input paths for a match - if !collectFile && len(tool.S3Input.Paths) > 0 { - tool.Task.infof("globS3: No match, begin check input default.") - stripS3Pattern := strings.Split(s3Pattern, "/") - if strings.Compare(stripS3Pattern[len(stripS3Pattern)-1], tool.S3Input.Paths[0]) == 0 { - path = engine.s3KeyToLocalPath(fmt.Sprintf("/%s", s3Pattern)) - tool.Task.infof("globS3: default path: %v", path) - globResults = append(globResults, path) - } - } +// // This is a dev testing workaround if sidecar is not working, do not use this in production! +// if !collectFile && len(tool.S3Input.Paths) > 0 { +// tool.Task.infof("globS3: No match, begin check input default.") +// stripS3Pattern := strings.Split(s3Pattern, "/") +// if strings.Compare(stripS3Pattern[len(stripS3Pattern)-1], tool.S3Input.Paths[0]) == 0 { +// path = engine.s3KeyToLocalPath(fmt.Sprintf("/%s", s3Pattern)) +// tool.Task.infof("globS3: default path: %v", path) +// globResults = append(globResults, path) +// } +// } } return globResults, nil } diff --git a/sidecar/sidecar.go b/sidecar/sidecar.go index 4c0b5596..d9ad7c92 100644 --- a/sidecar/sidecar.go +++ b/sidecar/sidecar.go @@ -247,7 +247,7 @@ func (fm *S3FileManager) uploadOutputFiles() (err error) { // upload the file contents result, err = uploader.Upload(&s3manager.UploadInput{ Bucket: aws.String(fm.S3BucketName), - Key: aws.String(fm.s3Key(path)), // HERE! - probably will fail, gotta take away the leading slash + Key: aws.String(strings.TrimPrefix(fm.s3Key(path), "/")), Body: f, }) if err != nil { From 169dea493f9264727f8d50c7c225014c072cb71a Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Mon, 15 Feb 2021 13:38:19 -0800 Subject: [PATCH 30/49] test s3 downloader path --- mariner/file.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mariner/file.go b/mariner/file.go index f1a19bcb..e250aaaa 100644 --- a/mariner/file.go +++ b/mariner/file.go @@ -176,7 +176,7 @@ func (engine *K8sEngine) loadContents(f *File) (err error) { downloader := s3manager.NewDownloader(sess) // Location field stores full path, no need to handle prefix here - s3Key := strings.TrimPrefix(engine.localPathToS3Key(f.Location), "/") + s3Key := engine.localPathToS3Key(f.Location) // Create a buffer to write the S3 Object contents to. // see: https://stackoverflow.com/questions/41645377/golang-s3-download-to-buffer-using-s3manager-downloader From e0b8a1182bc032d4595191404b670c2bfa829061 Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Mon, 15 Feb 2021 16:52:34 -0800 Subject: [PATCH 31/49] debug k8s cltArgs for commons data --- mariner/k8s.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/mariner/k8s.go b/mariner/k8s.go index 58824e08..3952062e 100644 --- a/mariner/k8s.go +++ b/mariner/k8s.go @@ -238,6 +238,7 @@ func (tool *Tool) taskContainer() (container *k8sv1.Container, err error) { // fixme func (tool *Tool) cltArgs() []string { tool.Task.infof("begin load CommandLineTool container args") + /* args := []string{ "-c", fmt.Sprintf(` @@ -252,9 +253,10 @@ func (tool *Tool) cltArgs() []string { touch %vdone `, tool.WorkingDir, tool.WorkingDir, tool.WorkingDir, tool.cltBash(), tool.WorkingDir, tool.WorkingDir), } + */ // for debugging - /* + args := []string{ "-c", fmt.Sprintf(` @@ -269,7 +271,7 @@ func (tool *Tool) cltArgs() []string { done `, tool.WorkingDir), } - */ + tool.Task.infof("end load CommandLineTool container args") return args From c09e6714c98790736e12022a5028133d75da0bc1 Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Tue, 16 Feb 2021 08:43:21 -0800 Subject: [PATCH 32/49] turn off cltargs debugger --- mariner/k8s.go | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/mariner/k8s.go b/mariner/k8s.go index 3952062e..3a255175 100644 --- a/mariner/k8s.go +++ b/mariner/k8s.go @@ -238,7 +238,7 @@ func (tool *Tool) taskContainer() (container *k8sv1.Container, err error) { // fixme func (tool *Tool) cltArgs() []string { tool.Task.infof("begin load CommandLineTool container args") - /* + args := []string{ "-c", fmt.Sprintf(` @@ -253,24 +253,23 @@ func (tool *Tool) cltArgs() []string { touch %vdone `, tool.WorkingDir, tool.WorkingDir, tool.WorkingDir, tool.cltBash(), tool.WorkingDir, tool.WorkingDir), } - */ // for debugging - args := []string{ - "-c", - fmt.Sprintf(` - while [[ ! -f %vrun.sh ]]; do - echo "Waiting for sidecar to finish setting up.."; - sleep 5 - done - echo "side done setting up" - echo "staying alive" - while true; do - : - done - `, tool.WorkingDir), - } +// args := []string{ +// "-c", +// fmt.Sprintf(` +// while [[ ! -f %vrun.sh ]]; do +// echo "Waiting for sidecar to finish setting up.."; +// sleep 5 +// done +// echo "side done setting up" +// echo "staying alive" +// while true; do +// : +// done +// `, tool.WorkingDir), +// } tool.Task.infof("end load CommandLineTool container args") From e8673b0048ea0931d130210d3a565a1bb8c5506f Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Wed, 17 Feb 2021 16:06:07 -0800 Subject: [PATCH 33/49] move expressiontools outside of engine --- mariner/engine.go | 36 ++++++++---------------------------- mariner/js.go | 33 +++++++++++++++++++++++++++++++++ mariner/k8s.go | 13 ++++++++----- 3 files changed, 49 insertions(+), 33 deletions(-) diff --git a/mariner/engine.go b/mariner/engine.go index 38db19e1..f6117a9c 100644 --- a/mariner/engine.go +++ b/mariner/engine.go @@ -5,7 +5,6 @@ import ( "context" "encoding/json" "fmt" - "os" "os/exec" "path/filepath" "strings" @@ -453,38 +452,19 @@ func (engine *K8sEngine) listenForDone(tool *Tool) (err error) { return nil } -// #no-fuse - this has to change! -// currently, regrettably, expressiontools run "in the engine", not in separate containers -// need to revisit this in detail -// figure out if expression tools should be dispatched as jobs -// or if it's okay that they run "in the engine" -// probably no actual computation of any kind should run "in the engine" -// so I think the expressiontool should run as a job, just like commandlinetools +// runExpressionTool.. +// 1. Evaluates the tool expression. +// 2. Makes call to RunK8sJob to dispatch job to run the ExpressionTool. func (engine *K8sEngine) runExpressionTool(tool *Tool) (err error) { engine.infof("begin run ExpressionTool: %v", tool.Task.Root.ID) - - // initial tool directories supported only if InitialWorkDirRequirement specified - if err = os.MkdirAll(tool.WorkingDir, os.ModeDir); err != nil { - return engine.errorf("failed to make tool working dir: %v; error: %v", tool.Task.Root.ID, err) + err = tool.evaluateExpression() + if err != nil { + return engine.errorf("failed to evaluate expression for tool: %v; error: %v", tool.Task.Root.ID, err) } - // note: context has already been loaded - if err = os.Chdir(tool.WorkingDir); err != nil { - return engine.errorf("failed to move to tool working dir: %v; error: %v", tool.Task.Root.ID, err) - } - result, err := evalExpression(tool.Task.Root.Expression, tool.InputsVM) + err = engine.dispatchTaskJob(tool) if err != nil { - return engine.errorf("failed to eval expression for ExpressionTool: %v; error: %v", tool.Task.Root.ID, err) - } - os.Chdir("/") // move back (?) to root after tool finishes execution -> or, where should the default directory position be? - - // expression must return a JSON object where the keys are the IDs of the ExpressionTool outputs - // see description of `expression` field here: - // https://www.commonwl.org/v1.0/Workflow.html#ExpressionTool - var ok bool - tool.ExpressionResult, ok = result.(map[string]interface{}) - if !ok { - return engine.errorf("ExpressionTool expression did not return a JSON object: %v", tool.Task.Root.ID) + return engine.errorf("failed to dispatch task job: %v; error: %v", tool.Task.Root.ID, err) } engine.infof("end run ExpressionTool: %v", tool.Task.Root.ID) return nil diff --git a/mariner/js.go b/mariner/js.go index 7129d760..c1738abf 100644 --- a/mariner/js.go +++ b/mariner/js.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "io" + "os" "strings" "reflect" @@ -15,6 +16,38 @@ import ( // EvalExpression evals a single expression (something like $(...) or ${...}) // resolveExpressions processes a string which may contain several embedded expressions, each wrapped in their own $()/${} wrapper +// EvaluateExpression +func (tool *Tool) evaluateExpression() (err error) { + tool.Task.infof("begin evaluate expression") + + // initial tool directory should exist, but create it if it does not. + // not sure if this will work, needs testing. + if err = os.MkdirAll(tool.WorkingDir, os.ModeDir); err != nil { + return engine.errorf("failed to make ExpressionTool working dir: %v; error: %v", tool.Task.Root.ID, err) + } + + // note: context has already been loaded + if err = os.Chdir(tool.WorkingDir); err != nil { + return engine.errorf("failed to move to ExpressionTool working dir: %v; error: %v", tool.Task.Root.ID, err) + } + result, err := evalExpression(tool.Task.Root.Expression, tool.InputsVM) + if err != nil { + return engine.errorf("failed to evaluate expression for ExpressionTool: %v; error: %v", tool.Task.Root.ID, err) + } + os.Chdir("/") // move back (?) to root after tool finishes execution -> or, where should the default directory position be? + + // expression must return a JSON object where the keys are the IDs of the ExpressionTool outputs + // see description of `expression` field here: + // https://www.commonwl.org/v1.0/Workflow.html#ExpressionTool + var ok bool + tool.ExpressionResult, ok = result.(map[string]interface{}) + if !ok { + return engine.errorf("ExpressionTool expression did not return a JSON object: %v", tool.Task.Root.ID) + } + tool.Task.infof("end evaluate expression") + return nil +} + // NOTE: make uniform either UpperCase, or camelCase for naming functions // ----- none of these names really need to be exported, since they get called within the `mariner` package diff --git a/mariner/k8s.go b/mariner/k8s.go index 3a255175..fc883c2d 100644 --- a/mariner/k8s.go +++ b/mariner/k8s.go @@ -213,13 +213,16 @@ func (tool *Tool) taskContainer() (container *k8sv1.Container, err error) { return nil, tool.Task.errorf("failed to load cpu/mem info: %v", err) } - // if not specified use config - container.Command = []string{tool.cltBash()} // fixme - please + // Only add commands if this is a CLT + if tool.Task.Root.Class == CWLCommandLineTool { + // if not specified use config + container.Command = []string{tool.cltBash()} // fixme - please - container.Args = tool.cltArgs() // fixme - make string constant or something + container.Args = tool.cltArgs() // fixme - make string constant or something - if container.Env, err = tool.env(); err != nil { - return nil, tool.Task.errorf("failed to load env info: %v", err) + if container.Env, err = tool.env(); err != nil { + return nil, tool.Task.errorf("failed to load env info: %v", err) + } } tool.Task.infof("end load main container spec") From d9467cd70050dd3ce50a6d1d636df318ea159605 Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Wed, 17 Feb 2021 16:14:07 -0800 Subject: [PATCH 34/49] fix errors to tool task instead of engine --- mariner/js.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/mariner/js.go b/mariner/js.go index c1738abf..9bfb7d6e 100644 --- a/mariner/js.go +++ b/mariner/js.go @@ -23,16 +23,16 @@ func (tool *Tool) evaluateExpression() (err error) { // initial tool directory should exist, but create it if it does not. // not sure if this will work, needs testing. if err = os.MkdirAll(tool.WorkingDir, os.ModeDir); err != nil { - return engine.errorf("failed to make ExpressionTool working dir: %v; error: %v", tool.Task.Root.ID, err) + return tool.Task.errorf("failed to make ExpressionTool working dir: %v; error: %v", tool.Task.Root.ID, err) } // note: context has already been loaded if err = os.Chdir(tool.WorkingDir); err != nil { - return engine.errorf("failed to move to ExpressionTool working dir: %v; error: %v", tool.Task.Root.ID, err) + return tool.Task.errorf("failed to move to ExpressionTool working dir: %v; error: %v", tool.Task.Root.ID, err) } result, err := evalExpression(tool.Task.Root.Expression, tool.InputsVM) if err != nil { - return engine.errorf("failed to evaluate expression for ExpressionTool: %v; error: %v", tool.Task.Root.ID, err) + return tool.Task.errorf("failed to evaluate expression for ExpressionTool: %v; error: %v", tool.Task.Root.ID, err) } os.Chdir("/") // move back (?) to root after tool finishes execution -> or, where should the default directory position be? @@ -42,7 +42,7 @@ func (tool *Tool) evaluateExpression() (err error) { var ok bool tool.ExpressionResult, ok = result.(map[string]interface{}) if !ok { - return engine.errorf("ExpressionTool expression did not return a JSON object: %v", tool.Task.Root.ID) + return tool.Task.errorf("ExpressionTool expression did not return a JSON object: %v", tool.Task.Root.ID) } tool.Task.infof("end evaluate expression") return nil From 6e9c7417d63f227d8b59786c8c8bae3e4bff0bae Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Wed, 17 Feb 2021 16:18:20 -0800 Subject: [PATCH 35/49] add os packaage back to engine --- mariner/engine.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mariner/engine.go b/mariner/engine.go index f6117a9c..ce78823a 100644 --- a/mariner/engine.go +++ b/mariner/engine.go @@ -5,11 +5,11 @@ import ( "context" "encoding/json" "fmt" + "os" "os/exec" "path/filepath" "strings" "sync" - "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" From beb2ba7287a8e67c8081ca7c0d8f48c779e7a1dc Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Thu, 18 Feb 2021 10:57:15 -0800 Subject: [PATCH 36/49] remove sidecar from expressiontools, only add fuse --- mariner/k8s.go | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/mariner/k8s.go b/mariner/k8s.go index fc883c2d..af8e0009 100644 --- a/mariner/k8s.go +++ b/mariner/k8s.go @@ -173,7 +173,6 @@ func (engine *K8sEngine) taskContainers(tool *Tool) (containers []k8sv1.Containe if err != nil { return nil, engine.errorf("failed to load task main container: %v; error: %v", tool.Task.Root.ID, err) } - s3sidecar := engine.s3SidecarContainer(tool) gen3fuse := gen3fuseContainer(engine.Manifest, marinerTask, engine.RunID) workingDir := k8sv1.EnvVar{ Name: "TOOL_WORKING_DIR", @@ -181,7 +180,15 @@ func (engine *K8sEngine) taskContainers(tool *Tool) (containers []k8sv1.Containe } gen3fuse.Env = append(gen3fuse.Env, workingDir) task.Env = append(task.Env, workingDir) - containers = []k8sv1.Container{*task, *s3sidecar, *gen3fuse} + + if tool.Task.Root.Class == CWLCommandLineTool { + s3sidecar := engine.s3SidecarContainer(tool) + containers = []k8sv1.Container{*task, *s3sidecar, *gen3fuse} + } else { + // I don't think ExpressionTools needs sidecar, should be only CLT, to fetch commons data. DOUBLE-CHECK this. + containers = []k8sv1.Container{*task, *gen3fuse} + } + engine.infof("end load container spec for tool: %v", tool.Task.Root.ID) return containers, nil } @@ -217,12 +224,14 @@ func (tool *Tool) taskContainer() (container *k8sv1.Container, err error) { if tool.Task.Root.Class == CWLCommandLineTool { // if not specified use config container.Command = []string{tool.cltBash()} // fixme - please - container.Args = tool.cltArgs() // fixme - make string constant or something + } else { + container.Command = []string{tool.cltBash()} + container.Args = []string{} + } - if container.Env, err = tool.env(); err != nil { - return nil, tool.Task.errorf("failed to load env info: %v", err) - } + if container.Env, err = tool.env(); err != nil { + return nil, tool.Task.errorf("failed to load env info: %v", err) } tool.Task.infof("end load main container spec") From 17903c1d5fc1abee73962ee5465eac7922c8dcd5 Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Thu, 18 Feb 2021 13:43:01 -0800 Subject: [PATCH 37/49] add sidecar back to expressiontools --- mariner/k8s.go | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/mariner/k8s.go b/mariner/k8s.go index af8e0009..9d3afad0 100644 --- a/mariner/k8s.go +++ b/mariner/k8s.go @@ -173,6 +173,8 @@ func (engine *K8sEngine) taskContainers(tool *Tool) (containers []k8sv1.Containe if err != nil { return nil, engine.errorf("failed to load task main container: %v; error: %v", tool.Task.Root.ID, err) } + + s3sidecar := engine.s3SidecarContainer(tool) gen3fuse := gen3fuseContainer(engine.Manifest, marinerTask, engine.RunID) workingDir := k8sv1.EnvVar{ Name: "TOOL_WORKING_DIR", @@ -180,14 +182,15 @@ func (engine *K8sEngine) taskContainers(tool *Tool) (containers []k8sv1.Containe } gen3fuse.Env = append(gen3fuse.Env, workingDir) task.Env = append(task.Env, workingDir) + containers = []k8sv1.Container{*task, *s3sidecar, *gen3fuse} - if tool.Task.Root.Class == CWLCommandLineTool { - s3sidecar := engine.s3SidecarContainer(tool) - containers = []k8sv1.Container{*task, *s3sidecar, *gen3fuse} - } else { - // I don't think ExpressionTools needs sidecar, should be only CLT, to fetch commons data. DOUBLE-CHECK this. - containers = []k8sv1.Container{*task, *gen3fuse} - } +// if tool.Task.Root.Class == CWLCommandLineTool { +// s3sidecar := engine.s3SidecarContainer(tool) +// containers = []k8sv1.Container{*task, *s3sidecar, *gen3fuse} +// } else { +// // I don't think ExpressionTools needs sidecar, should be only CLT, to fetch commons data. DOUBLE-CHECK this. +// containers = []k8sv1.Container{*task, *gen3fuse} +// } engine.infof("end load container spec for tool: %v", tool.Task.Root.ID) return containers, nil @@ -222,13 +225,12 @@ func (tool *Tool) taskContainer() (container *k8sv1.Container, err error) { // Only add commands if this is a CLT if tool.Task.Root.Class == CWLCommandLineTool { - // if not specified use config - container.Command = []string{tool.cltBash()} // fixme - please container.Args = tool.cltArgs() // fixme - make string constant or something } else { - container.Command = []string{tool.cltBash()} container.Args = []string{} } + // if not specified use config + container.Command = []string{tool.cltBash()} // fixme - please if container.Env, err = tool.env(); err != nil { return nil, tool.Task.errorf("failed to load env info: %v", err) From ea2e78768138caade3af8e1e002f7bae2a272e6e Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Thu, 18 Feb 2021 16:11:17 -0800 Subject: [PATCH 38/49] add sidecar to expression tools and handle command args --- mariner/k8s.go | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/mariner/k8s.go b/mariner/k8s.go index 9d3afad0..9cae7c54 100644 --- a/mariner/k8s.go +++ b/mariner/k8s.go @@ -184,14 +184,6 @@ func (engine *K8sEngine) taskContainers(tool *Tool) (containers []k8sv1.Containe task.Env = append(task.Env, workingDir) containers = []k8sv1.Container{*task, *s3sidecar, *gen3fuse} -// if tool.Task.Root.Class == CWLCommandLineTool { -// s3sidecar := engine.s3SidecarContainer(tool) -// containers = []k8sv1.Container{*task, *s3sidecar, *gen3fuse} -// } else { -// // I don't think ExpressionTools needs sidecar, should be only CLT, to fetch commons data. DOUBLE-CHECK this. -// containers = []k8sv1.Container{*task, *gen3fuse} -// } - engine.infof("end load container spec for tool: %v", tool.Task.Root.ID) return containers, nil } @@ -223,12 +215,10 @@ func (tool *Tool) taskContainer() (container *k8sv1.Container, err error) { return nil, tool.Task.errorf("failed to load cpu/mem info: %v", err) } - // Only add commands if this is a CLT - if tool.Task.Root.Class == CWLCommandLineTool { - container.Args = tool.cltArgs() // fixme - make string constant or something - } else { - container.Args = []string{} - } + // fixme - make string constant or something + // We need some standard args to pass to sidecar for ExpressionTools (probably two constants for CLT and EXPTOOL) + container.Args = tool.cltArgs() + // if not specified use config container.Command = []string{tool.cltBash()} // fixme - please @@ -325,6 +315,13 @@ func (tool *Tool) env() (env []k8sv1.EnvVar, err error) { // for marinerTask job func (engine *K8sEngine) s3SidecarEnv(tool *Tool) (env []k8sv1.EnvVar) { engine.infof("load s3 sidecar env for task: %v", tool.Task.Root.ID) + + // create a config command for ExpressionTools to place here instead of this + commandArg := strings.Join(tool.Command.Args, " ") + // then it's an ExpressionTool task + if len(commandArg) < 1: + commandArg = "touch " + tool.WorkingDir + "expression.txt" + env = []k8sv1.EnvVar{ { Name: "AWSCREDS", @@ -344,7 +341,7 @@ func (engine *K8sEngine) s3SidecarEnv(tool *Tool) (env []k8sv1.EnvVar) { }, { Name: "TOOL_COMMAND", // the command from the commandlinetool to actually execute - Value: strings.Join(tool.Command.Args, " "), + Value: commandArg, }, { Name: "TOOL_WORKING_DIR", // the tool's working directory - e.g., '/engine-workspace/workflowRuns/{runID}/{taskID}/' From c897f9f710f554793865dd71e9847b667c8351d0 Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Thu, 18 Feb 2021 16:17:12 -0800 Subject: [PATCH 39/49] debugging --- mariner/k8s.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mariner/k8s.go b/mariner/k8s.go index 9cae7c54..a4698b52 100644 --- a/mariner/k8s.go +++ b/mariner/k8s.go @@ -319,8 +319,9 @@ func (engine *K8sEngine) s3SidecarEnv(tool *Tool) (env []k8sv1.EnvVar) { // create a config command for ExpressionTools to place here instead of this commandArg := strings.Join(tool.Command.Args, " ") // then it's an ExpressionTool task - if len(commandArg) < 1: + if len(commandArg) < 1 { commandArg = "touch " + tool.WorkingDir + "expression.txt" + } env = []k8sv1.EnvVar{ { From 294030404dc64a6aa1179e2eba4be90d7dc521c9 Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Thu, 18 Feb 2021 16:47:45 -0800 Subject: [PATCH 40/49] updated command for expressiontools and add engine listen for done --- mariner/engine.go | 4 ++++ mariner/k8s.go | 10 +++++----- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/mariner/engine.go b/mariner/engine.go index ce78823a..1b01fcab 100644 --- a/mariner/engine.go +++ b/mariner/engine.go @@ -397,6 +397,10 @@ func (engine *K8sEngine) runTool(tool *Tool) (err error) { if err = engine.runExpressionTool(tool); err != nil { return engine.errorf("failed to run ExpressionTool: %v; error: %v", tool.Task.Root.ID, err) } + + if err = engine.listenForDone(tool); err != nil { + return engine.errorf("failed to listen for task to finish: %v; error: %v", tool.Task.Root.ID, err) + } case "CommandLineTool": if err = engine.runCommandLineTool(tool); err != nil { return engine.errorf("failed to run CommandLineTool: %v; error: %v", tool.Task.Root.ID, err) diff --git a/mariner/k8s.go b/mariner/k8s.go index a4698b52..93431555 100644 --- a/mariner/k8s.go +++ b/mariner/k8s.go @@ -316,11 +316,11 @@ func (tool *Tool) env() (env []k8sv1.EnvVar, err error) { func (engine *K8sEngine) s3SidecarEnv(tool *Tool) (env []k8sv1.EnvVar) { engine.infof("load s3 sidecar env for task: %v", tool.Task.Root.ID) - // create a config command for ExpressionTools to place here instead of this - commandArg := strings.Join(tool.Command.Args, " ") - // then it's an ExpressionTool task - if len(commandArg) < 1 { - commandArg = "touch " + tool.WorkingDir + "expression.txt" + // create a config constant for ExpressionTools to place here instead of this + if tool.Task.Root.Class == CWLCommandLineTool { + commandArg := strings.Join(tool.Command.Args, " ") + } else { + commandArg := "touch " + tool.WorkingDir + "expression.txt" } env = []k8sv1.EnvVar{ From 20f579080ab318423cd62e803b46575cbd69a4f8 Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Thu, 18 Feb 2021 16:52:47 -0800 Subject: [PATCH 41/49] update exptool command arg --- mariner/k8s.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/mariner/k8s.go b/mariner/k8s.go index 93431555..afd81b1b 100644 --- a/mariner/k8s.go +++ b/mariner/k8s.go @@ -315,12 +315,10 @@ func (tool *Tool) env() (env []k8sv1.EnvVar, err error) { // for marinerTask job func (engine *K8sEngine) s3SidecarEnv(tool *Tool) (env []k8sv1.EnvVar) { engine.infof("load s3 sidecar env for task: %v", tool.Task.Root.ID) - + commandArg := strings.Join(tool.Command.Args, " ") // create a config constant for ExpressionTools to place here instead of this - if tool.Task.Root.Class == CWLCommandLineTool { - commandArg := strings.Join(tool.Command.Args, " ") - } else { - commandArg := "touch " + tool.WorkingDir + "expression.txt" + if tool.Task.Root.Class == CWLExpressionTool { + commandArg = "touch " + tool.WorkingDir + "expression.txt" } env = []k8sv1.EnvVar{ From 747f432f248eb88099b8dcfb883463e24b164bbc Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Fri, 19 Feb 2021 09:09:23 -0800 Subject: [PATCH 42/49] fix runtime panic --- mariner/k8s.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/mariner/k8s.go b/mariner/k8s.go index afd81b1b..f4507ccf 100644 --- a/mariner/k8s.go +++ b/mariner/k8s.go @@ -315,10 +315,12 @@ func (tool *Tool) env() (env []k8sv1.EnvVar, err error) { // for marinerTask job func (engine *K8sEngine) s3SidecarEnv(tool *Tool) (env []k8sv1.EnvVar) { engine.infof("load s3 sidecar env for task: %v", tool.Task.Root.ID) - commandArg := strings.Join(tool.Command.Args, " ") - // create a config constant for ExpressionTools to place here instead of this - if tool.Task.Root.Class == CWLExpressionTool { - commandArg = "touch " + tool.WorkingDir + "expression.txt" + + // assume this is an ExpressionTool, place this string in config instead of here + commandArg := "touch " + tool.WorkingDir + "expression.txt" + // if it's not, update it to the CommandLineTool commands + if tool.Command != nil { + commandArg = strings.Join(tool.Command.Args, " ") } env = []k8sv1.EnvVar{ From d8cd6c97cb2f7fe051dd19e663594b646e0268dc Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Fri, 19 Feb 2021 10:54:28 -0800 Subject: [PATCH 43/49] clean up --- mariner/engine.go | 4 ++-- mariner/input.go | 8 +++----- mariner/js.go | 8 +------- mariner/k8s.go | 18 ++++++++---------- mariner/output.go | 30 ++++++------------------------ mariner/tool.go | 4 +--- 6 files changed, 21 insertions(+), 51 deletions(-) diff --git a/mariner/engine.go b/mariner/engine.go index 1b01fcab..c0f9038c 100644 --- a/mariner/engine.go +++ b/mariner/engine.go @@ -388,7 +388,7 @@ func (engine *K8sEngine) setupTool(tool *Tool) (err error) { } // RunTool runs the tool -// If ExpressionTool, passes to appropriate handler to eval the expression +// If ExpressionTool, passes to appropriate handler to create k8s job and eval the expression // If CommandLineTool, passes to appropriate handler to create k8s job func (engine *K8sEngine) runTool(tool *Tool) (err error) { engine.infof("begin run tool: %v", tool.Task.Root.ID) @@ -398,7 +398,7 @@ func (engine *K8sEngine) runTool(tool *Tool) (err error) { return engine.errorf("failed to run ExpressionTool: %v; error: %v", tool.Task.Root.ID, err) } - if err = engine.listenForDone(tool); err != nil { + if err = engine.listenForDone(tool); err != nil { return engine.errorf("failed to listen for task to finish: %v; error: %v", tool.Task.Root.ID, err) } case "CommandLineTool": diff --git a/mariner/input.go b/mariner/input.go index 89274135..55bdb68d 100644 --- a/mariner/input.go +++ b/mariner/input.go @@ -256,7 +256,7 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter // Question: how to handle non-array/struct data types? // --------- no preprocessing should have to happen in this case. self, err := tool.loadInputValue(input) - tool.Task.warnf("!!! fails here 259: %v", input) + if err != nil { return nil, tool.Task.errorf("failed to load value: %v", err) } @@ -269,7 +269,7 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter if err = vm.Set("self", self); err != nil { return nil, tool.Task.errorf("failed to set 'self' value in js vm: %v", err) } - + /* // Troubleshooting js // note: when accessing object fields using keys must use otto.Run("obj.key"), NOT otto.Get("obj.key") @@ -285,7 +285,7 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter fmt.Printf("Error evaluating Object.keys(self): %v\n", err) } keysVal, err := keys.Export() - tool.Task.infof("keysVal value: %v", keysVal) + */ // eval the expression in the vm, capture result in `out` if out, err = evalExpression(valueFrom, vm); err != nil { @@ -304,7 +304,6 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter if out == nil { out, err = tool.loadInputValue(input) - tool.Task.warnf("!!! or fails here: 309, %v", out) if err != nil { return nil, tool.Task.errorf("failed to load input value: %v", err) @@ -475,7 +474,6 @@ func (tool *Tool) loadInputValue(input *cwl.Input) (out interface{}, err error) // determine if this param is required or optional required = true for _, t := range input.Types { - tool.Task.warnf("!!! debug, value of type: %v", t.Type) if t.Type == CWLNullType { required = false } diff --git a/mariner/js.go b/mariner/js.go index 9bfb7d6e..1a8acd5f 100644 --- a/mariner/js.go +++ b/mariner/js.go @@ -20,8 +20,7 @@ import ( func (tool *Tool) evaluateExpression() (err error) { tool.Task.infof("begin evaluate expression") - // initial tool directory should exist, but create it if it does not. - // not sure if this will work, needs testing. + // initial tool directory should exist, but create it if it does not. if err = os.MkdirAll(tool.WorkingDir, os.ModeDir); err != nil { return tool.Task.errorf("failed to make ExpressionTool working dir: %v; error: %v", tool.Task.Root.ID, err) } @@ -146,17 +145,12 @@ func (tool *Tool) resolveExpressions(inText string) (outText string, outFile *Fi // get full $(...) expression expression = c1 + c2 + expression - tool.Task.infof("Full $(...) expression: %v", expression) - // eval that thing result, err := evalExpression(expression, tool.InputsVM) if err != nil { return "", outFile, tool.Task.errorf("%v", err) } - tool.Task.infof("expression result: %v", result) - tool.Task.infof("expression result type: %v", reflect.TypeOf(result)) - // result ought to be a string (edit: OR a file) switch result.(type) { case string: diff --git a/mariner/k8s.go b/mariner/k8s.go index f4507ccf..b6eb96c7 100644 --- a/mariner/k8s.go +++ b/mariner/k8s.go @@ -174,7 +174,7 @@ func (engine *K8sEngine) taskContainers(tool *Tool) (containers []k8sv1.Containe return nil, engine.errorf("failed to load task main container: %v; error: %v", tool.Task.Root.ID, err) } - s3sidecar := engine.s3SidecarContainer(tool) + s3sidecar := engine.s3SidecarContainer(tool) gen3fuse := gen3fuseContainer(engine.Manifest, marinerTask, engine.RunID) workingDir := k8sv1.EnvVar{ Name: "TOOL_WORKING_DIR", @@ -215,14 +215,13 @@ func (tool *Tool) taskContainer() (container *k8sv1.Container, err error) { return nil, tool.Task.errorf("failed to load cpu/mem info: %v", err) } - // fixme - make string constant or something - // We need some standard args to pass to sidecar for ExpressionTools (probably two constants for CLT and EXPTOOL) - container.Args = tool.cltArgs() + // fixme - make string constant or something + container.Args = tool.containerArgs() // if not specified use config container.Command = []string{tool.cltBash()} // fixme - please - if container.Env, err = tool.env(); err != nil { + if container.Env, err = tool.env(); err != nil { return nil, tool.Task.errorf("failed to load env info: %v", err) } @@ -240,8 +239,8 @@ func (tool *Tool) taskContainer() (container *k8sv1.Container, err error) { // TOOL_WORKING_DIR is an envVar - no need to inject from go vars here // Q: how to handle case of different possible bash, depending on CLT image specified in CWL? // fixme -func (tool *Tool) cltArgs() []string { - tool.Task.infof("begin load CommandLineTool container args") +func (tool *Tool) containerArgs() []string { + tool.Task.infof("begin load container args") args := []string{ "-c", @@ -275,8 +274,7 @@ func (tool *Tool) cltArgs() []string { // `, tool.WorkingDir), // } - - tool.Task.infof("end load CommandLineTool container args") + tool.Task.infof("end load container args") return args } @@ -316,7 +314,7 @@ func (tool *Tool) env() (env []k8sv1.EnvVar, err error) { func (engine *K8sEngine) s3SidecarEnv(tool *Tool) (env []k8sv1.EnvVar) { engine.infof("load s3 sidecar env for task: %v", tool.Task.Root.ID) - // assume this is an ExpressionTool, place this string in config instead of here + // assume this is an ExpressionTool commandArg := "touch " + tool.WorkingDir + "expression.txt" // if it's not, update it to the CommandLineTool commands if tool.Command != nil { diff --git a/mariner/output.go b/mariner/output.go index a8595d64..41b5f5e2 100644 --- a/mariner/output.go +++ b/mariner/output.go @@ -47,8 +47,6 @@ func (engine *K8sEngine) handleCLTOutput(tool *Tool) (err error) { } } - tool.Task.infof("results from engine glob: %v", results) - // 2. Load Contents // no need to handle prefixes here, since the full paths // are already in the File objects stored in `results` @@ -140,7 +138,7 @@ func (engine *K8sEngine) handleCLTOutput(tool *Tool) (err error) { } //// end of 4 step processing pipeline for collecting/handling output files //// - tool.Task.infof("output CWL type: %v", output.Types[0].Type) + // at this point we have file results captured in `results` // output should be a CWLFileType or "array of Files" // fixme - make this case handling more specific in the else condition - don't just catch anything @@ -178,9 +176,9 @@ func (engine *K8sEngine) glob(tool *Tool, output *cwl.Output) (results []*File, } patterns = append(patterns, pattern) } - tool.Task.infof("glob: patterns value: %v", patterns) + paths, err := engine.globS3(tool, patterns) - tool.Task.infof("glob: paths value: %v", paths) + if err != nil { return results, tool.Task.errorf("%v", err) } @@ -230,8 +228,7 @@ func (engine *K8sEngine) globS3(tool *Tool, patterns []string) ([]string, error) var path string var s3Pattern string globResults := []string{} - tool.Task.infof("globS3: Tool Input Paths: %v", tool.S3Input.Paths) - tool.Task.infof("globS3: objectList: %v", objectList) + for _, obj := range objectList.Contents { // match key against pattern key = *obj.Key @@ -239,18 +236,14 @@ func (engine *K8sEngine) globS3(tool *Tool, patterns []string) ([]string, error) collectFile = false for _, pattern := range patterns { s3Pattern = strings.TrimPrefix(engine.localPathToS3Key(pattern), "/") - tool.Task.infof("globS3: s3Pattern: %v", s3Pattern) // handle case of glob pattern not resolving to absolute path // fixme: this is not pretty if !strings.HasPrefix(s3Pattern, engine.UserID) { s3wkdir := strings.TrimPrefix(engine.localPathToS3Key(tool.WorkingDir), "/") s3Pattern = fmt.Sprintf("%s/%s", strings.TrimSuffix(s3wkdir, "/"), strings.TrimPrefix(s3Pattern, "/")) - tool.Task.infof("globS3: s3wkdir: %v", s3wkdir) - tool.Task.infof("globS3: s3Pattern: %v", s3Pattern) } - tool.Task.infof("globS3: key: %v", key) + match, err = filepath.Match(s3Pattern, key) - tool.Task.infof("globS3: match: %v", match) if err != nil { return nil, fmt.Errorf("glob pattern matching failed: %v", err) @@ -259,7 +252,6 @@ func (engine *K8sEngine) globS3(tool *Tool, patterns []string) ([]string, error) } } - tool.Task.infof("globS3: collectFile: %v", collectFile) if collectFile { // this needs to be represented as a filepath, not a "key" // i.e., it needs a slash at the beginning @@ -267,16 +259,6 @@ func (engine *K8sEngine) globS3(tool *Tool, patterns []string) ([]string, error) globResults = append(globResults, path) } -// // This is a dev testing workaround if sidecar is not working, do not use this in production! -// if !collectFile && len(tool.S3Input.Paths) > 0 { -// tool.Task.infof("globS3: No match, begin check input default.") -// stripS3Pattern := strings.Split(s3Pattern, "/") -// if strings.Compare(stripS3Pattern[len(stripS3Pattern)-1], tool.S3Input.Paths[0]) == 0 { -// path = engine.s3KeyToLocalPath(fmt.Sprintf("/%s", s3Pattern)) -// tool.Task.infof("globS3: default path: %v", path) -// globResults = append(globResults, path) -// } -// } } return globResults, nil } @@ -297,7 +279,7 @@ func (tool *Tool) pattern(glob string) (pattern string, err error) { if !ok { return "", tool.Task.errorf("glob expression doesn't return a string pattern") } - tool.Task.infof("resulting pattern: %v", pattern) + return pattern, nil } // not an expression, so no eval necessary diff --git a/mariner/tool.go b/mariner/tool.go index ec5d6ed3..0d06c029 100644 --- a/mariner/tool.go +++ b/mariner/tool.go @@ -30,8 +30,7 @@ func (engine *K8sEngine) initWorkDirReq(tool *Tool) (err error) { // logic: exactly one of resultString or resultFile should be returned resultText, resultFile, err := tool.resolveExpressions(listing.Entry) - tool.Task.infof("resultText: %v", resultText) - tool.Task.infof("resultFile: %v", resultFile) + switch { case err != nil: return tool.Task.errorf("failed to resolve expressions in entry: %v; error: %v", listing.Entry, err) @@ -97,7 +96,6 @@ func (engine *K8sEngine) initWorkDirReq(tool *Tool) (err error) { if err != nil { return fmt.Errorf("upload to s3 failed: %v", err) } - tool.Task.infof("wrote initdir bytes to s3 object: %v", result.Location) fmt.Println("wrote initdir bytes to s3 object:", result.Location) // log } From 3ac16432200bbbd45af423a9a8b78a482cf33ae5 Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Fri, 19 Feb 2021 10:58:08 -0800 Subject: [PATCH 44/49] remove reflect --- mariner/js.go | 1 - 1 file changed, 1 deletion(-) diff --git a/mariner/js.go b/mariner/js.go index 1a8acd5f..bf0ea359 100644 --- a/mariner/js.go +++ b/mariner/js.go @@ -7,7 +7,6 @@ import ( "io" "os" "strings" - "reflect" "github.com/robertkrimen/otto" ) From 4a7a05581250f8ad242d2ddfad05994747fd0f35 Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Fri, 19 Feb 2021 11:13:23 -0800 Subject: [PATCH 45/49] move command generation for ExpTool container from k8s env to evaluateExpression --- mariner/js.go | 4 ++++ mariner/k8s.go | 9 +-------- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/mariner/js.go b/mariner/js.go index bf0ea359..5de5ac6f 100644 --- a/mariner/js.go +++ b/mariner/js.go @@ -42,6 +42,10 @@ func (tool *Tool) evaluateExpression() (err error) { if !ok { return tool.Task.errorf("ExpressionTool expression did not return a JSON object: %v", tool.Task.Root.ID) } + // we assign a command for the ExpressionTool container for sidecar, it is a task like CLT. + cmdPath := tool.WorkingDir + "expression.txt" + cmd := []string{"touch", cmdPath} + tool.Command = exec.Command(cmd[0], cmd[1:]...) tool.Task.infof("end evaluate expression") return nil } diff --git a/mariner/k8s.go b/mariner/k8s.go index b6eb96c7..c589404b 100644 --- a/mariner/k8s.go +++ b/mariner/k8s.go @@ -314,13 +314,6 @@ func (tool *Tool) env() (env []k8sv1.EnvVar, err error) { func (engine *K8sEngine) s3SidecarEnv(tool *Tool) (env []k8sv1.EnvVar) { engine.infof("load s3 sidecar env for task: %v", tool.Task.Root.ID) - // assume this is an ExpressionTool - commandArg := "touch " + tool.WorkingDir + "expression.txt" - // if it's not, update it to the CommandLineTool commands - if tool.Command != nil { - commandArg = strings.Join(tool.Command.Args, " ") - } - env = []k8sv1.EnvVar{ { Name: "AWSCREDS", @@ -340,7 +333,7 @@ func (engine *K8sEngine) s3SidecarEnv(tool *Tool) (env []k8sv1.EnvVar) { }, { Name: "TOOL_COMMAND", // the command from the commandlinetool to actually execute - Value: commandArg, + Value: strings.Join(tool.Command.Args, " "), }, { Name: "TOOL_WORKING_DIR", // the tool's working directory - e.g., '/engine-workspace/workflowRuns/{runID}/{taskID}/' From df5ea604b2e03c28353fa69eb108e00b2161e5c9 Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Fri, 19 Feb 2021 11:17:10 -0800 Subject: [PATCH 46/49] add os/exec pkg --- mariner/js.go | 1 + 1 file changed, 1 insertion(+) diff --git a/mariner/js.go b/mariner/js.go index 5de5ac6f..fa54677f 100644 --- a/mariner/js.go +++ b/mariner/js.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "os" + "os/exec" "strings" "github.com/robertkrimen/otto" From 063d02a2fcdd111aa8e9e664dfbb3853da82745e Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Fri, 19 Feb 2021 12:41:20 -0800 Subject: [PATCH 47/49] clean up --- mariner/engine.go | 1 - mariner/input.go | 1 - mariner/k8s.go | 4 ---- mariner/output.go | 11 ++--------- mariner/tool.go | 1 - 5 files changed, 2 insertions(+), 16 deletions(-) diff --git a/mariner/engine.go b/mariner/engine.go index c0f9038c..94aad5aa 100644 --- a/mariner/engine.go +++ b/mariner/engine.go @@ -465,7 +465,6 @@ func (engine *K8sEngine) runExpressionTool(tool *Tool) (err error) { if err != nil { return engine.errorf("failed to evaluate expression for tool: %v; error: %v", tool.Task.Root.ID, err) } - err = engine.dispatchTaskJob(tool) if err != nil { return engine.errorf("failed to dispatch task job: %v; error: %v", tool.Task.Root.ID, err) diff --git a/mariner/input.go b/mariner/input.go index 55bdb68d..3a935ba0 100644 --- a/mariner/input.go +++ b/mariner/input.go @@ -256,7 +256,6 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter // Question: how to handle non-array/struct data types? // --------- no preprocessing should have to happen in this case. self, err := tool.loadInputValue(input) - if err != nil { return nil, tool.Task.errorf("failed to load value: %v", err) } diff --git a/mariner/k8s.go b/mariner/k8s.go index c589404b..bb58c7b2 100644 --- a/mariner/k8s.go +++ b/mariner/k8s.go @@ -173,7 +173,6 @@ func (engine *K8sEngine) taskContainers(tool *Tool) (containers []k8sv1.Containe if err != nil { return nil, engine.errorf("failed to load task main container: %v; error: %v", tool.Task.Root.ID, err) } - s3sidecar := engine.s3SidecarContainer(tool) gen3fuse := gen3fuseContainer(engine.Manifest, marinerTask, engine.RunID) workingDir := k8sv1.EnvVar{ @@ -183,7 +182,6 @@ func (engine *K8sEngine) taskContainers(tool *Tool) (containers []k8sv1.Containe gen3fuse.Env = append(gen3fuse.Env, workingDir) task.Env = append(task.Env, workingDir) containers = []k8sv1.Container{*task, *s3sidecar, *gen3fuse} - engine.infof("end load container spec for tool: %v", tool.Task.Root.ID) return containers, nil } @@ -241,7 +239,6 @@ func (tool *Tool) taskContainer() (container *k8sv1.Container, err error) { // fixme func (tool *Tool) containerArgs() []string { tool.Task.infof("begin load container args") - args := []string{ "-c", fmt.Sprintf(` @@ -313,7 +310,6 @@ func (tool *Tool) env() (env []k8sv1.EnvVar, err error) { // for marinerTask job func (engine *K8sEngine) s3SidecarEnv(tool *Tool) (env []k8sv1.EnvVar) { engine.infof("load s3 sidecar env for task: %v", tool.Task.Root.ID) - env = []k8sv1.EnvVar{ { Name: "AWSCREDS", diff --git a/mariner/output.go b/mariner/output.go index 41b5f5e2..19179b12 100644 --- a/mariner/output.go +++ b/mariner/output.go @@ -138,7 +138,6 @@ func (engine *K8sEngine) handleCLTOutput(tool *Tool) (err error) { } //// end of 4 step processing pipeline for collecting/handling output files //// - // at this point we have file results captured in `results` // output should be a CWLFileType or "array of Files" // fixme - make this case handling more specific in the else condition - don't just catch anything @@ -176,9 +175,7 @@ func (engine *K8sEngine) glob(tool *Tool, output *cwl.Output) (results []*File, } patterns = append(patterns, pattern) } - paths, err := engine.globS3(tool, patterns) - if err != nil { return results, tool.Task.errorf("%v", err) } @@ -226,7 +223,6 @@ func (engine *K8sEngine) globS3(tool *Tool, patterns []string) ([]string, error) var match bool var collectFile bool var path string - var s3Pattern string globResults := []string{} for _, obj := range objectList.Contents { @@ -235,7 +231,8 @@ func (engine *K8sEngine) globS3(tool *Tool, patterns []string) ([]string, error) collectFile = false for _, pattern := range patterns { - s3Pattern = strings.TrimPrefix(engine.localPathToS3Key(pattern), "/") + s3Pattern := strings.TrimPrefix(engine.localPathToS3Key(pattern), "/") + // handle case of glob pattern not resolving to absolute path // fixme: this is not pretty if !strings.HasPrefix(s3Pattern, engine.UserID) { @@ -244,21 +241,18 @@ func (engine *K8sEngine) globS3(tool *Tool, patterns []string) ([]string, error) } match, err = filepath.Match(s3Pattern, key) - if err != nil { return nil, fmt.Errorf("glob pattern matching failed: %v", err) } else if match { collectFile = true } } - if collectFile { // this needs to be represented as a filepath, not a "key" // i.e., it needs a slash at the beginning path = engine.s3KeyToLocalPath(fmt.Sprintf("/%s", key)) globResults = append(globResults, path) } - } return globResults, nil } @@ -279,7 +273,6 @@ func (tool *Tool) pattern(glob string) (pattern string, err error) { if !ok { return "", tool.Task.errorf("glob expression doesn't return a string pattern") } - return pattern, nil } // not an expression, so no eval necessary diff --git a/mariner/tool.go b/mariner/tool.go index 0d06c029..41c66cae 100644 --- a/mariner/tool.go +++ b/mariner/tool.go @@ -85,7 +85,6 @@ func (engine *K8sEngine) initWorkDirReq(tool *Tool) (err error) { if err != nil { return tool.Task.errorf("error marshalling contents to file: %v", err) } - tool.Task.infof("Converted file to json: %v", b) } result, err := uploader.Upload(&s3manager.UploadInput{ From fe4aaf3a7df2700e511ca4e0ab1e2e59af57c401 Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Mon, 22 Feb 2021 11:24:45 -0800 Subject: [PATCH 48/49] clean up code and revert user_data_test back to original json --- mariner/engine.go | 24 ++--- mariner/file.go | 25 +---- mariner/input.go | 115 ++--------------------- mariner/js.go | 15 +-- mariner/k8s.go | 44 +-------- mariner/output.go | 1 - mariner/tool.go | 1 - sidecar/sidecar.go | 24 +---- testdata/no_input_test/request_body.json | 18 ++-- 9 files changed, 35 insertions(+), 232 deletions(-) diff --git a/mariner/engine.go b/mariner/engine.go index 94aad5aa..2d584b73 100644 --- a/mariner/engine.go +++ b/mariner/engine.go @@ -5,16 +5,16 @@ import ( "context" "encoding/json" "fmt" - "os" - "os/exec" - "path/filepath" - "strings" - "sync" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/robertkrimen/otto" cwl "github.com/uc-cdis/cwl.go" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -387,9 +387,7 @@ func (engine *K8sEngine) setupTool(tool *Tool) (err error) { return nil } -// RunTool runs the tool -// If ExpressionTool, passes to appropriate handler to create k8s job and eval the expression -// If CommandLineTool, passes to appropriate handler to create k8s job +// RunTool runs the tool from the engine and passes to the appropriate handler to create a k8s job. func (engine *K8sEngine) runTool(tool *Tool) (err error) { engine.infof("begin run tool: %v", tool.Task.Root.ID) switch class := tool.Task.Root.Class; class { @@ -397,7 +395,6 @@ func (engine *K8sEngine) runTool(tool *Tool) (err error) { if err = engine.runExpressionTool(tool); err != nil { return engine.errorf("failed to run ExpressionTool: %v; error: %v", tool.Task.Root.ID, err) } - if err = engine.listenForDone(tool); err != nil { return engine.errorf("failed to listen for task to finish: %v; error: %v", tool.Task.Root.ID, err) } @@ -405,12 +402,7 @@ func (engine *K8sEngine) runTool(tool *Tool) (err error) { if err = engine.runCommandLineTool(tool); err != nil { return engine.errorf("failed to run CommandLineTool: %v; error: %v", tool.Task.Root.ID, err) } - - // collect resource metrics via k8s api - // NOTE: at present, metrics are NOT collected for expressionTools - // this should be fixed go engine.collectResourceMetrics(tool) - if err = engine.listenForDone(tool); err != nil { return engine.errorf("failed to listen for task to finish: %v; error: %v", tool.Task.Root.ID, err) } @@ -456,9 +448,7 @@ func (engine *K8sEngine) listenForDone(tool *Tool) (err error) { return nil } -// runExpressionTool.. -// 1. Evaluates the tool expression. -// 2. Makes call to RunK8sJob to dispatch job to run the ExpressionTool. +// runExpressionTool uses the engine to dispatch a task job for a given tool to evaluate an expression. func (engine *K8sEngine) runExpressionTool(tool *Tool) (err error) { engine.infof("begin run ExpressionTool: %v", tool.Task.Root.ID) err = tool.evaluateExpression() diff --git a/mariner/file.go b/mariner/file.go index e250aaaa..b01f7c9f 100644 --- a/mariner/file.go +++ b/mariner/file.go @@ -168,27 +168,12 @@ func (engine *K8sEngine) s3KeyToLocalPath(key string) string { return strings.Replace(key, engine.UserID, engineWorkspaceVolumeName, 1) } -// loads contents of file into the File.Contents field -// #no-fuse - read from s3, not locally -func (engine *K8sEngine) loadContents(f *File) (err error) { - +// loadContents downloads contents for a file from the engine's S3 file manager to populate the file contents field. +func (engine *K8sEngine) loadContents(file *File) (err error) { sess := engine.S3FileManager.newS3Session() downloader := s3manager.NewDownloader(sess) - - // Location field stores full path, no need to handle prefix here - s3Key := engine.localPathToS3Key(f.Location) - - // Create a buffer to write the S3 Object contents to. - // see: https://stackoverflow.com/questions/41645377/golang-s3-download-to-buffer-using-s3manager-downloader + s3Key := engine.localPathToS3Key(file.Location) buf := &aws.WriteAtBuffer{} - - // read up to 64 KiB from file, as specified in CWL docs - // 1 KiB is 1024 bytes -> 64 KiB is 65536 bytes - // - // S3 sdk supports specifying byte ranges - // in this way: https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35 - - // Write the contents of S3 Object to the buffer s3Obj := &s3.GetObjectInput{ Bucket: aws.String(engine.S3FileManager.S3BucketName), Key: aws.String(s3Key), @@ -198,9 +183,7 @@ func (engine *K8sEngine) loadContents(f *File) (err error) { if err != nil { return fmt.Errorf("failed to download file, %v", err) } - - // populate File.Contents field with contents - f.Contents = string(buf.Bytes()) + file.Contents = string(buf.Bytes()) return nil } diff --git a/mariner/input.go b/mariner/input.go index 3a935ba0..607082c5 100644 --- a/mariner/input.go +++ b/mariner/input.go @@ -219,42 +219,15 @@ func (tool *Tool) processFileList(l interface{}) ([]*File, error) { return out, nil } -// if err and input is not optional, it is a fatal error and the run should fail out +// transformInput parses all input in a workflow from the engine's tool. func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out interface{}, err error) { tool.Task.infof("begin transform input: %v", input.ID) - - /* - NOTE: presently only context loaded into js vm's here is `self` - Will certainly need to add more context to handle all cases - Definitely, definitely need a generalized method for loading appropriate context at appropriate places - In particular, the `inputs` context is probably going to be needed most commonly - - OTHERNOTE: `self` (in js vm) takes on different values in different places, according to cwl docs - see: https://www.commonwl.org/v1.0/Workflow.html#Parameter_references - --- - Steps: - 1. handle ValueFrom case at stepInput level - - if no ValueFrom specified, assign parameter value to `out` to processed in next step - 2. handle ValueFrom case at toolInput level - - initial value is `out` from step 1 - */ localID := lastInPath(input.ID) - - // stepInput ValueFrom case if tool.StepInputMap[localID] != nil { - // no processing needs to happen if the valueFrom field is empty if tool.StepInputMap[localID].ValueFrom != "" { - // here the valueFrom field is not empty, so we need to handle valueFrom valueFrom := tool.StepInputMap[localID].ValueFrom if strings.HasPrefix(valueFrom, "$") { - // valueFrom is an expression that needs to be eval'd - - // get a js vm - vm := tool.JSVM.Copy() // #js-runtime - - // preprocess struct/array so that fields can be accessed in vm - // Question: how to handle non-array/struct data types? - // --------- no preprocessing should have to happen in this case. + vm := tool.JSVM.Copy() self, err := tool.loadInputValue(input) if err != nil { return nil, tool.Task.errorf("failed to load value: %v", err) @@ -263,74 +236,29 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter if err != nil { return nil, tool.Task.errorf("failed to preprocess context: %v", err) } - - // set `self` variable in vm if err = vm.Set("self", self); err != nil { return nil, tool.Task.errorf("failed to set 'self' value in js vm: %v", err) } - /* - // Troubleshooting js - // note: when accessing object fields using keys must use otto.Run("obj.key"), NOT otto.Get("obj.key") - - fmt.Println("self in js:") - jsSelf, err := vm.Get("self") - jsSelfVal, err := jsSelf.Export() - tool.Task.infof("self value: %v", jsSelfVal) - tool.Task.infof("expression: %v", valueFrom) - - fmt.Println("Object.keys(self)") - keys, err := vm.Run("Object.keys(self)") - if err != nil { - fmt.Printf("Error evaluating Object.keys(self): %v\n", err) - } - keysVal, err := keys.Export() - */ - - // eval the expression in the vm, capture result in `out` if out, err = evalExpression(valueFrom, vm); err != nil { return nil, tool.Task.errorf("failed to eval js expression: %v; error: %v", valueFrom, err) } } else { - // valueFrom is not an expression - take raw string/val as value out = valueFrom } } } - // if this tool is not a step of a parent workflow - // OR - // if this tool is a step of a parent workflow but the valueFrom is empty if out == nil { - out, err = tool.loadInputValue(input) - if err != nil { - return nil, tool.Task.errorf("failed to load input value: %v", err) + return nil, tool.Task.errorf("failed to load input value: %v", err) } - if out == nil { - // implies an optional parameter with no value provided and no default value specified - // this input parameter is not used by the tool tool.Task.infof("optional input with no value or default provided - skipping: %v", input.ID) return nil, nil } } - // if file, need to ensure that all file attributes get populated (e.g., basename) - /* - fixme: handle array of files - Q: what about directories (?) - - do this: - - switch statement: - case file - case []file - - note: check types in the param type list? - vs. checking types of actual values - */ - switch { case isFile(out): if out, err = tool.processFile(out); err != nil { @@ -341,14 +269,9 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter return nil, tool.Task.errorf("failed to process file list: %v; error: %v", out, err) } default: - // fmt.Println("is not a file object") + tool.Task.infof("input is not a file object: %v", input.ID) } - // ######### Load Secondary Files ############ - // ######### ought to encapsulate this to a function - // ######### and call it for inputs and outputs - // ######### since it's basically the same process both times - if len(input.SecondaryFiles) > 0 { var fileArray []*File switch { @@ -357,48 +280,33 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter case isArrayOfFile(out): fileArray = out.([]*File) default: - // this is a fatal error - engine should fail out here - // but don't panic - actually handle this err and fail out gracefully - panic("invalid input: secondary files specified for a non-file input") + return nil, tool.Task.errorf("invalid input: secondary files specified for a non-file input.") } - for _, entry := range input.SecondaryFiles { val := entry.Entry if strings.HasPrefix(val, "$") { vm := tool.JSVM.Copy() for _, fileObj := range fileArray { - // preprocess output file object self, err := preProcessContext(fileObj) if err != nil { return nil, tool.Task.errorf("%v", err) } - // set `self` variable name - // assuming it is okay to use one vm for all evaluations and just reset the `self` variable before each eval vm.Set("self", self) - - // eval js jsResult, err := evalExpression(val, vm) if err != nil { return nil, tool.Task.errorf("%v", err) } - - // retrieve secondaryFile's path (type interface{} with underlying type string) sFilePath, ok := jsResult.(string) if !ok { return nil, tool.Task.errorf("secondaryFile expression did not return string") } - if exist, _ := engine.fileExists(sFilePath); !exist { - // fatal error - panic("secondary file doesn't exist") + return nil, tool.Task.errorf("secondary file doesn't exist") } - - // get file object for secondaryFile and append it to the input file's SecondaryFiles field sFileObj := fileObject(sFilePath) fileObj.SecondaryFiles = append(fileObj.SecondaryFiles, sFileObj) } } else { - // follow those two steps indicated at the bottom of the secondaryFiles field description suffix, carats := trimLeading(val, "^") for _, fileObj := range fileArray { engine.loadSFilesFromPattern(tool, fileObj, suffix, carats) @@ -414,15 +322,11 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter } } - // at this point, variable `out` is the transformed input thus far (even if no transformation actually occured) - // so `out` will be what we work with in this next block as an initial value - // tool inputBinding ValueFrom case if input.Binding != nil && input.Binding.ValueFrom != nil { valueFrom := input.Binding.ValueFrom.String if strings.HasPrefix(valueFrom, "$") { - vm := tool.JSVM.Copy() // #js-runtime + vm := tool.JSVM.Copy() var context interface{} - // fixme: handle array of files switch out.(type) { case *File, []*File: context, err = preProcessContext(out) @@ -432,17 +336,14 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter default: context = out } - - vm.Set("self", context) // NOTE: again, will more than likely need additional context here to cover other cases + vm.Set("self", context) if out, err = evalExpression(valueFrom, vm); err != nil { return nil, tool.Task.errorf("failed to eval expression: %v; error: %v", valueFrom, err) } } else { - // not an expression, so no eval necessary - take raw value out = valueFrom } } - tool.Task.infof("end transform input: %v", input.ID) return out, nil } diff --git a/mariner/js.go b/mariner/js.go index fa54677f..d8a45ab7 100644 --- a/mariner/js.go +++ b/mariner/js.go @@ -16,16 +16,12 @@ import ( // EvalExpression evals a single expression (something like $(...) or ${...}) // resolveExpressions processes a string which may contain several embedded expressions, each wrapped in their own $()/${} wrapper -// EvaluateExpression +// evaluateExpression evaluates the expression from the tool in its virtual machine. func (tool *Tool) evaluateExpression() (err error) { tool.Task.infof("begin evaluate expression") - - // initial tool directory should exist, but create it if it does not. if err = os.MkdirAll(tool.WorkingDir, os.ModeDir); err != nil { - return tool.Task.errorf("failed to make ExpressionTool working dir: %v; error: %v", tool.Task.Root.ID, err) + return tool.Task.errorf("failed to make ExpressionTool working dir: %v; error: %v", tool.Task.Root.ID, err) } - - // note: context has already been loaded if err = os.Chdir(tool.WorkingDir); err != nil { return tool.Task.errorf("failed to move to ExpressionTool working dir: %v; error: %v", tool.Task.Root.ID, err) } @@ -33,17 +29,12 @@ func (tool *Tool) evaluateExpression() (err error) { if err != nil { return tool.Task.errorf("failed to evaluate expression for ExpressionTool: %v; error: %v", tool.Task.Root.ID, err) } - os.Chdir("/") // move back (?) to root after tool finishes execution -> or, where should the default directory position be? - - // expression must return a JSON object where the keys are the IDs of the ExpressionTool outputs - // see description of `expression` field here: - // https://www.commonwl.org/v1.0/Workflow.html#ExpressionTool + os.Chdir("/") var ok bool tool.ExpressionResult, ok = result.(map[string]interface{}) if !ok { return tool.Task.errorf("ExpressionTool expression did not return a JSON object: %v", tool.Task.Root.ID) } - // we assign a command for the ExpressionTool container for sidecar, it is a task like CLT. cmdPath := tool.WorkingDir + "expression.txt" cmd := []string{"touch", cmdPath} tool.Command = exec.Command(cmd[0], cmd[1:]...) diff --git a/mariner/k8s.go b/mariner/k8s.go index bb58c7b2..1bdd4ea7 100644 --- a/mariner/k8s.go +++ b/mariner/k8s.go @@ -194,10 +194,7 @@ func (engine *K8sEngine) s3SidecarContainer(tool *Tool) (container *k8sv1.Contai return container } -// FIXME - TODO - insert some error/warning handling here -// in case errors/warnings creating the container as specified in the cwl -// additionally, add logic to check if the tool has specified each field -// if a field is not specified, the spec should be filled out using values from the mariner-config +// taskContainer sets up and returns a k8s container for the tool task. func (tool *Tool) taskContainer() (container *k8sv1.Container, err error) { tool.Task.infof("begin load main container spec") conf := Config.Containers.Task @@ -205,38 +202,21 @@ func (tool *Tool) taskContainer() (container *k8sv1.Container, err error) { container.Name = conf.Name container.VolumeMounts = volumeMounts(marinerTask) container.ImagePullPolicy = conf.pullPolicy() - container.Image = tool.dockerImage() tool.Task.Log.ContainerImage = container.Image - if container.Resources, err = tool.resourceReqs(); err != nil { return nil, tool.Task.errorf("failed to load cpu/mem info: %v", err) } - - // fixme - make string constant or something container.Args = tool.containerArgs() - - // if not specified use config - container.Command = []string{tool.cltBash()} // fixme - please - + container.Command = []string{tool.cltBash()} if container.Env, err = tool.env(); err != nil { return nil, tool.Task.errorf("failed to load env info: %v", err) } - tool.Task.infof("end load main container spec") return container, nil } -// wait for sidecar to setup -// in particular wait until run.sh exists (run.sh is the command for the Tool) -// as soon as run.sh exists, run this script -// HERE TODO - put this in a bash script -// actually don't, because the CLT runs in its own container -// - won't have the mariner repo, and we shouldn't clone it in there -// so, just make this string a constant or something in the config file -// TOOL_WORKING_DIR is an envVar - no need to inject from go vars here -// Q: how to handle case of different possible bash, depending on CLT image specified in CWL? -// fixme +// containerArgs creates the necessary command arguments in a tool container for sidecar. func (tool *Tool) containerArgs() []string { tool.Task.infof("begin load container args") args := []string{ @@ -253,24 +233,6 @@ func (tool *Tool) containerArgs() []string { touch %vdone `, tool.WorkingDir, tool.WorkingDir, tool.WorkingDir, tool.cltBash(), tool.WorkingDir, tool.WorkingDir), } - - // for debugging - -// args := []string{ -// "-c", -// fmt.Sprintf(` -// while [[ ! -f %vrun.sh ]]; do -// echo "Waiting for sidecar to finish setting up.."; -// sleep 5 -// done -// echo "side done setting up" -// echo "staying alive" -// while true; do -// : -// done -// `, tool.WorkingDir), -// } - tool.Task.infof("end load container args") return args } diff --git a/mariner/output.go b/mariner/output.go index 19179b12..34f7550e 100644 --- a/mariner/output.go +++ b/mariner/output.go @@ -224,7 +224,6 @@ func (engine *K8sEngine) globS3(tool *Tool, patterns []string) ([]string, error) var collectFile bool var path string globResults := []string{} - for _, obj := range objectList.Contents { // match key against pattern key = *obj.Key diff --git a/mariner/tool.go b/mariner/tool.go index 41c66cae..a664b976 100644 --- a/mariner/tool.go +++ b/mariner/tool.go @@ -30,7 +30,6 @@ func (engine *K8sEngine) initWorkDirReq(tool *Tool) (err error) { // logic: exactly one of resultString or resultFile should be returned resultText, resultFile, err := tool.resolveExpressions(listing.Entry) - switch { case err != nil: return tool.Task.errorf("failed to resolve expressions in entry: %v; error: %v", listing.Entry, err) diff --git a/sidecar/sidecar.go b/sidecar/sidecar.go index d9ad7c92..a22a3772 100644 --- a/sidecar/sidecar.go +++ b/sidecar/sidecar.go @@ -207,9 +207,8 @@ func (fm *S3FileManager) waitForTaskToFinish() error { return nil } -// 5. upload this task's output to s3 +// uploadOutputFiles utilizes a file manager to upload output files for a task. func (fm *S3FileManager) uploadOutputFiles() (err error) { - // collect paths of all files in the task working directory paths := []string{} _ = filepath.Walk(fm.TaskWorkingDir, func(path string, info os.FileInfo, err error) error { if !info.IsDir() { @@ -217,34 +216,21 @@ func (fm *S3FileManager) uploadOutputFiles() (err error) { } return nil }) - - // note: uploader is safe for concurrent use sess := fm.newS3Session() uploader := s3manager.NewUploader(sess) - var result *s3manager.UploadOutput var wg sync.WaitGroup guard := make(chan struct{}, fm.MaxConcurrent) for _, p := range paths { - // blocks if guard channel is already full to capacity - // proceeds as soon as there is an open slot in the channel guard <- struct{}{} - wg.Add(1) go func(path string) { defer wg.Done() - - // debug - fmt.Println("trying to upload file:", path) - - // open file for reading f, err := os.Open(path) if err != nil { fmt.Println("failed to open file:", path, err) return } - - // upload the file contents result, err = uploader.Upload(&s3manager.UploadInput{ Bucket: aws.String(fm.S3BucketName), Key: aws.String(strings.TrimPrefix(fm.s3Key(path), "/")), @@ -255,16 +241,10 @@ func (fm *S3FileManager) uploadOutputFiles() (err error) { return } fmt.Println("file uploaded to location:", result.Location) - - // seems that files are already closed by this point - // close the file - very important if err = f.Close(); err != nil { fmt.Println("failed to close file:", err) - // return + return } - - // release this spot in the guard channel - // so the next goroutine can run <-guard }(p) } diff --git a/testdata/no_input_test/request_body.json b/testdata/no_input_test/request_body.json index 42ee1c3b..f5c2f4a4 100644 --- a/testdata/no_input_test/request_body.json +++ b/testdata/no_input_test/request_body.json @@ -85,15 +85,13 @@ { "inputs": [ { - "type": [ - { - "items": "File", - "type": "array" - }, - { - "type": "null" - } - ], + "type": { + "items": [ + "null", + "File" + ], + "type": "array" + }, "id": "#expressiontool_test.cwl/file_array" } ], @@ -255,7 +253,7 @@ "in": [ { "source": "#subworkflow_test.cwl/test_initworkdir/bam_with_index", - "valueFrom": "NIST7035.1.chrM.bam.bai", + "valueFrom": "$([self, self.secondaryFiles[0]])", "id": "#subworkflow_test.cwl/test_expr/file_array" } ], From 6c93799c0abfcc33e89e8574088f90a7d0ba35f8 Mon Sep 17 00:00:00 2001 From: cterrazas2 Date: Mon, 22 Feb 2021 14:46:27 -0800 Subject: [PATCH 49/49] file already closed for output --- sidecar/sidecar.go | 1 - 1 file changed, 1 deletion(-) diff --git a/sidecar/sidecar.go b/sidecar/sidecar.go index a22a3772..edd633f4 100644 --- a/sidecar/sidecar.go +++ b/sidecar/sidecar.go @@ -243,7 +243,6 @@ func (fm *S3FileManager) uploadOutputFiles() (err error) { fmt.Println("file uploaded to location:", result.Location) if err = f.Close(); err != nil { fmt.Println("failed to close file:", err) - return } <-guard }(p)