Skip to content
This repository has been archived by the owner on Jul 14, 2022. It is now read-only.

(PXP-7446): Feat/expression tools #45

Merged
merged 49 commits into from
Feb 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
b71fa3d
Testing expression work flow. Null inputs not working.
cterrazas2 Jan 27, 2021
c70fb7b
debug: some extra debug logs
m0nhawk Jan 27, 2021
1970619
debug: see the types
m0nhawk Jan 27, 2021
3a639a2
debug: test
m0nhawk Jan 29, 2021
c226031
get to expression tool js vm parsing and make tool dir
cterrazas2 Jan 30, 2021
fdbe516
add FileMode to make tool working dir
cterrazas2 Feb 1, 2021
439c2ee
change to MkdirAll for any necessary parent paths
cterrazas2 Feb 1, 2021
9fff9fc
some debugging for initworkdir
cterrazas2 Feb 2, 2021
d788fa0
js debugging
cterrazas2 Feb 3, 2021
0bb53b8
js debugging
cterrazas2 Feb 3, 2021
9c2579b
output and tool debugging
cterrazas2 Feb 3, 2021
273454a
Add inputs to js vm
cterrazas2 Feb 4, 2021
d3ed910
input and output debugging
cterrazas2 Feb 5, 2021
926b49a
log s3 glob
cterrazas2 Feb 5, 2021
87d0061
log s3 glob
cterrazas2 Feb 5, 2021
81eaa73
log s3 glob
cterrazas2 Feb 5, 2021
d1537be
just log s3 path
cterrazas2 Feb 5, 2021
289b28b
add error handling to laodInput and s3 bucket logs
cterrazas2 Feb 5, 2021
98d17de
output pattern log for s3
cterrazas2 Feb 6, 2021
55540ea
match any sequence after s3Pattern to key
cterrazas2 Feb 8, 2021
add0534
modify default check for inputs for tool
cterrazas2 Feb 9, 2021
3d67c97
fix go error
cterrazas2 Feb 9, 2021
cb1a69d
move s3Pattern declaration out of loop
cterrazas2 Feb 9, 2021
ce5bfab
debugging s3 input paths
cterrazas2 Feb 9, 2021
72ca71c
strip s3 path
cterrazas2 Feb 9, 2021
0fe9964
fix s3 path and create tool dir if not specified in cwl
cterrazas2 Feb 9, 2021
d77d8e7
fix s3 strip path
cterrazas2 Feb 9, 2021
f4a1432
test local path for input
cterrazas2 Feb 9, 2021
336a8a3
troubleshoot sidecar for s3 fetches
cterrazas2 Feb 12, 2021
169dea4
test s3 downloader path
cterrazas2 Feb 15, 2021
e0b8a11
debug k8s cltArgs for commons data
cterrazas2 Feb 16, 2021
c09e671
turn off cltargs debugger
cterrazas2 Feb 16, 2021
e8673b0
move expressiontools outside of engine
cterrazas2 Feb 18, 2021
d9467cd
fix errors to tool task instead of engine
cterrazas2 Feb 18, 2021
6e9c741
add os packaage back to engine
cterrazas2 Feb 18, 2021
beb2ba7
remove sidecar from expressiontools, only add fuse
cterrazas2 Feb 18, 2021
17903c1
add sidecar back to expressiontools
cterrazas2 Feb 18, 2021
ea2e787
add sidecar to expression tools and handle command args
cterrazas2 Feb 19, 2021
c897f9f
debugging
cterrazas2 Feb 19, 2021
2940304
updated command for expressiontools and add engine listen for done
cterrazas2 Feb 19, 2021
20f5790
update exptool command arg
cterrazas2 Feb 19, 2021
747f432
fix runtime panic
cterrazas2 Feb 19, 2021
d8cd6c9
clean up
cterrazas2 Feb 19, 2021
3ac1643
remove reflect
cterrazas2 Feb 19, 2021
4a7a055
move command generation for ExpTool container from k8s env to evaluat…
cterrazas2 Feb 19, 2021
df5ea60
add os/exec pkg
cterrazas2 Feb 19, 2021
063d02a
clean up
cterrazas2 Feb 19, 2021
fe4aaf3
clean up code and revert user_data_test back to original json
cterrazas2 Feb 22, 2021
6c93799
file already closed for output
cterrazas2 Feb 22, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 15 additions & 36 deletions mariner/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,16 @@ import (
"context"
"encoding/json"
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/robertkrimen/otto"
cwl "github.com/uc-cdis/cwl.go"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand Down Expand Up @@ -388,26 +387,22 @@ func (engine *K8sEngine) setupTool(tool *Tool) (err error) {
return nil
}

// RunTool runs the tool
// If ExpressionTool, passes to appropriate handler to eval the expression
// If CommandLineTool, passes to appropriate handler to create k8s job
// RunTool runs the tool from the engine and passes to the appropriate handler to create a k8s job.
func (engine *K8sEngine) runTool(tool *Tool) (err error) {
engine.infof("begin run tool: %v", tool.Task.Root.ID)
switch class := tool.Task.Root.Class; class {
case "ExpressionTool":
if err = engine.runExpressionTool(tool); err != nil {
return engine.errorf("failed to run ExpressionTool: %v; error: %v", tool.Task.Root.ID, err)
}
if err = engine.listenForDone(tool); err != nil {
return engine.errorf("failed to listen for task to finish: %v; error: %v", tool.Task.Root.ID, err)
}
case "CommandLineTool":
if err = engine.runCommandLineTool(tool); err != nil {
return engine.errorf("failed to run CommandLineTool: %v; error: %v", tool.Task.Root.ID, err)
}

// collect resource metrics via k8s api
// NOTE: at present, metrics are NOT collected for expressionTools
// this should be fixed
go engine.collectResourceMetrics(tool)

if err = engine.listenForDone(tool); err != nil {
return engine.errorf("failed to listen for task to finish: %v; error: %v", tool.Task.Root.ID, err)
}
Expand Down Expand Up @@ -453,32 +448,16 @@ func (engine *K8sEngine) listenForDone(tool *Tool) (err error) {
return nil
}

// #no-fuse - this has to change!
// currently, regrettably, expressiontools run "in the engine", not in separate containers
// need to revisit this in detail
// figure out if expression tools should be dispatched as jobs
// or if it's okay that they run "in the engine"
// probably no actual computation of any kind should run "in the engine"
// so I think the expressiontool should run as a job, just like commandlinetools
// runExpressionTool uses the engine to dispatch a task job for a given tool to evaluate an expression.
func (engine *K8sEngine) runExpressionTool(tool *Tool) (err error) {
engine.infof("begin run ExpressionTool: %v", tool.Task.Root.ID)
// note: context has already been loaded
if err = os.Chdir(tool.WorkingDir); err != nil {
return engine.errorf("failed to move to tool working dir: %v; error: %v", tool.Task.Root.ID, err)
}
result, err := evalExpression(tool.Task.Root.Expression, tool.InputsVM)
err = tool.evaluateExpression()
if err != nil {
return engine.errorf("failed to eval expression for ExpressionTool: %v; error: %v", tool.Task.Root.ID, err)
return engine.errorf("failed to evaluate expression for tool: %v; error: %v", tool.Task.Root.ID, err)
}
os.Chdir("/") // move back (?) to root after tool finishes execution -> or, where should the default directory position be?

// expression must return a JSON object where the keys are the IDs of the ExpressionTool outputs
// see description of `expression` field here:
// https://www.commonwl.org/v1.0/Workflow.html#ExpressionTool
var ok bool
tool.ExpressionResult, ok = result.(map[string]interface{})
if !ok {
return engine.errorf("ExpressionTool expression did not return a JSON object: %v", tool.Task.Root.ID)
err = engine.dispatchTaskJob(tool)
if err != nil {
return engine.errorf("failed to dispatch task job: %v; error: %v", tool.Task.Root.ID, err)
}
engine.infof("end run ExpressionTool: %v", tool.Task.Root.ID)
return nil
Expand Down
25 changes: 4 additions & 21 deletions mariner/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,27 +168,12 @@ func (engine *K8sEngine) s3KeyToLocalPath(key string) string {
return strings.Replace(key, engine.UserID, engineWorkspaceVolumeName, 1)
}

// loads contents of file into the File.Contents field
// #no-fuse - read from s3, not locally
func (engine *K8sEngine) loadContents(f *File) (err error) {

// loadContents downloads contents for a file from the engine's S3 file manager to populate the file contents field.
func (engine *K8sEngine) loadContents(file *File) (err error) {
sess := engine.S3FileManager.newS3Session()
downloader := s3manager.NewDownloader(sess)

// Location field stores full path, no need to handle prefix here
s3Key := strings.TrimPrefix(engine.localPathToS3Key(f.Location), "/")

// Create a buffer to write the S3 Object contents to.
// see: https://stackoverflow.com/questions/41645377/golang-s3-download-to-buffer-using-s3manager-downloader
s3Key := engine.localPathToS3Key(file.Location)
buf := &aws.WriteAtBuffer{}

// read up to 64 KiB from file, as specified in CWL docs
// 1 KiB is 1024 bytes -> 64 KiB is 65536 bytes
//
// S3 sdk supports specifying byte ranges
// in this way: https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35

// Write the contents of S3 Object to the buffer
s3Obj := &s3.GetObjectInput{
Bucket: aws.String(engine.S3FileManager.S3BucketName),
Key: aws.String(s3Key),
Expand All @@ -198,9 +183,7 @@ func (engine *K8sEngine) loadContents(f *File) (err error) {
if err != nil {
return fmt.Errorf("failed to download file, %v", err)
}

// populate File.Contents field with contents
f.Contents = string(buf.Bytes())
file.Contents = string(buf.Bytes())
return nil
}

Expand Down
114 changes: 7 additions & 107 deletions mariner/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,42 +219,15 @@ func (tool *Tool) processFileList(l interface{}) ([]*File, error) {
return out, nil
}

// if err and input is not optional, it is a fatal error and the run should fail out
// transformInput parses all input in a workflow from the engine's tool.
func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out interface{}, err error) {
tool.Task.infof("begin transform input: %v", input.ID)

/*
NOTE: presently only context loaded into js vm's here is `self`
Will certainly need to add more context to handle all cases
Definitely, definitely need a generalized method for loading appropriate context at appropriate places
In particular, the `inputs` context is probably going to be needed most commonly

OTHERNOTE: `self` (in js vm) takes on different values in different places, according to cwl docs
see: https://www.commonwl.org/v1.0/Workflow.html#Parameter_references
---
Steps:
1. handle ValueFrom case at stepInput level
- if no ValueFrom specified, assign parameter value to `out` to processed in next step
2. handle ValueFrom case at toolInput level
- initial value is `out` from step 1
*/
localID := lastInPath(input.ID)

// stepInput ValueFrom case
if tool.StepInputMap[localID] != nil {
// no processing needs to happen if the valueFrom field is empty
if tool.StepInputMap[localID].ValueFrom != "" {
// here the valueFrom field is not empty, so we need to handle valueFrom
valueFrom := tool.StepInputMap[localID].ValueFrom
if strings.HasPrefix(valueFrom, "$") {
// valueFrom is an expression that needs to be eval'd

// get a js vm
vm := tool.JSVM.Copy() // #js-runtime

// preprocess struct/array so that fields can be accessed in vm
// Question: how to handle non-array/struct data types?
// --------- no preprocessing should have to happen in this case.
vm := tool.JSVM.Copy()
self, err := tool.loadInputValue(input)
if err != nil {
return nil, tool.Task.errorf("failed to load value: %v", err)
Expand All @@ -263,75 +236,29 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter
if err != nil {
return nil, tool.Task.errorf("failed to preprocess context: %v", err)
}

// set `self` variable in vm
if err = vm.Set("self", self); err != nil {
return nil, tool.Task.errorf("failed to set 'self' value in js vm: %v", err)
}

/*
// Troubleshooting js
// note: when accessing object fields using keys must use otto.Run("obj.key"), NOT otto.Get("obj.key")

fmt.Println("self in js:")
jsSelf, err := vm.Get("self")
jsSelfVal, err := jsSelf.Export()
PrintJSON(jsSelfVal)

fmt.Println("Expression:")
PrintJSON(valueFrom)

fmt.Println("Object.keys(self)")
keys, err := vm.Run("Object.keys(self)")
if err != nil {
fmt.Printf("Error evaluating Object.keys(self): %v\n", err)
}
keysVal, err := keys.Export()
PrintJSON(keysVal)
*/

// eval the expression in the vm, capture result in `out`
if out, err = evalExpression(valueFrom, vm); err != nil {
return nil, tool.Task.errorf("failed to eval js expression: %v; error: %v", valueFrom, err)
}
} else {
// valueFrom is not an expression - take raw string/val as value
out = valueFrom
}
}
}

// if this tool is not a step of a parent workflow
// OR
// if this tool is a step of a parent workflow but the valueFrom is empty
if out == nil {
out, err = tool.loadInputValue(input)
if err != nil {
return nil, tool.Task.errorf("failed to load input value: %v", err)
}
if out == nil {
// implies an optional parameter with no value provided and no default value specified
// this input parameter is not used by the tool
tool.Task.infof("optional input with no value or default provided - skipping: %v", input.ID)
return nil, nil
}
}

// if file, need to ensure that all file attributes get populated (e.g., basename)
/*
fixme: handle array of files
Q: what about directories (?)

do this:

switch statement:
case file
case []file

note: check types in the param type list?
vs. checking types of actual values
*/

switch {
case isFile(out):
if out, err = tool.processFile(out); err != nil {
Expand All @@ -342,14 +269,9 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter
return nil, tool.Task.errorf("failed to process file list: %v; error: %v", out, err)
}
default:
// fmt.Println("is not a file object")
tool.Task.infof("input is not a file object: %v", input.ID)
}

// ######### Load Secondary Files ############
// ######### ought to encapsulate this to a function
// ######### and call it for inputs and outputs
// ######### since it's basically the same process both times

if len(input.SecondaryFiles) > 0 {
var fileArray []*File
switch {
Expand All @@ -358,48 +280,33 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter
case isArrayOfFile(out):
fileArray = out.([]*File)
default:
// this is a fatal error - engine should fail out here
// but don't panic - actually handle this err and fail out gracefully
panic("invalid input: secondary files specified for a non-file input")
return nil, tool.Task.errorf("invalid input: secondary files specified for a non-file input.")
}

for _, entry := range input.SecondaryFiles {
val := entry.Entry
if strings.HasPrefix(val, "$") {
vm := tool.JSVM.Copy()
for _, fileObj := range fileArray {
// preprocess output file object
self, err := preProcessContext(fileObj)
if err != nil {
return nil, tool.Task.errorf("%v", err)
}
// set `self` variable name
// assuming it is okay to use one vm for all evaluations and just reset the `self` variable before each eval
vm.Set("self", self)

// eval js
jsResult, err := evalExpression(val, vm)
if err != nil {
return nil, tool.Task.errorf("%v", err)
}

// retrieve secondaryFile's path (type interface{} with underlying type string)
sFilePath, ok := jsResult.(string)
if !ok {
return nil, tool.Task.errorf("secondaryFile expression did not return string")
}

if exist, _ := engine.fileExists(sFilePath); !exist {
// fatal error
panic("secondary file doesn't exist")
return nil, tool.Task.errorf("secondary file doesn't exist")
}

// get file object for secondaryFile and append it to the input file's SecondaryFiles field
sFileObj := fileObject(sFilePath)
fileObj.SecondaryFiles = append(fileObj.SecondaryFiles, sFileObj)
}
} else {
// follow those two steps indicated at the bottom of the secondaryFiles field description
suffix, carats := trimLeading(val, "^")
for _, fileObj := range fileArray {
engine.loadSFilesFromPattern(tool, fileObj, suffix, carats)
Expand All @@ -415,15 +322,11 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter
}
}

// at this point, variable `out` is the transformed input thus far (even if no transformation actually occured)
// so `out` will be what we work with in this next block as an initial value
// tool inputBinding ValueFrom case
if input.Binding != nil && input.Binding.ValueFrom != nil {
valueFrom := input.Binding.ValueFrom.String
if strings.HasPrefix(valueFrom, "$") {
vm := tool.JSVM.Copy() // #js-runtime
vm := tool.JSVM.Copy()
var context interface{}
// fixme: handle array of files
switch out.(type) {
case *File, []*File:
context, err = preProcessContext(out)
Expand All @@ -433,17 +336,14 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter
default:
context = out
}

vm.Set("self", context) // NOTE: again, will more than likely need additional context here to cover other cases
vm.Set("self", context)
if out, err = evalExpression(valueFrom, vm); err != nil {
return nil, tool.Task.errorf("failed to eval expression: %v; error: %v", valueFrom, err)
}
} else {
// not an expression, so no eval necessary - take raw value
out = valueFrom
}
}

tool.Task.infof("end transform input: %v", input.ID)
return out, nil
}
Expand Down
Loading