diff --git a/mariner/engine.go b/mariner/engine.go index 5b87e14d..2d584b73 100644 --- a/mariner/engine.go +++ b/mariner/engine.go @@ -5,17 +5,16 @@ import ( "context" "encoding/json" "fmt" - "os" - "os/exec" - "path/filepath" - "strings" - "sync" - "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/robertkrimen/otto" cwl "github.com/uc-cdis/cwl.go" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -388,9 +387,7 @@ func (engine *K8sEngine) setupTool(tool *Tool) (err error) { return nil } -// RunTool runs the tool -// If ExpressionTool, passes to appropriate handler to eval the expression -// If CommandLineTool, passes to appropriate handler to create k8s job +// RunTool runs the tool from the engine and passes to the appropriate handler to create a k8s job. func (engine *K8sEngine) runTool(tool *Tool) (err error) { engine.infof("begin run tool: %v", tool.Task.Root.ID) switch class := tool.Task.Root.Class; class { @@ -398,16 +395,14 @@ func (engine *K8sEngine) runTool(tool *Tool) (err error) { if err = engine.runExpressionTool(tool); err != nil { return engine.errorf("failed to run ExpressionTool: %v; error: %v", tool.Task.Root.ID, err) } + if err = engine.listenForDone(tool); err != nil { + return engine.errorf("failed to listen for task to finish: %v; error: %v", tool.Task.Root.ID, err) + } case "CommandLineTool": if err = engine.runCommandLineTool(tool); err != nil { return engine.errorf("failed to run CommandLineTool: %v; error: %v", tool.Task.Root.ID, err) } - - // collect resource metrics via k8s api - // NOTE: at present, metrics are NOT collected for expressionTools - // this should be fixed go engine.collectResourceMetrics(tool) - if err = engine.listenForDone(tool); err != nil { return engine.errorf("failed to listen for task to finish: %v; error: %v", tool.Task.Root.ID, err) } @@ -453,32 +448,16 @@ func (engine *K8sEngine) listenForDone(tool *Tool) (err error) { return nil } -// #no-fuse - this has to change! -// currently, regrettably, expressiontools run "in the engine", not in separate containers -// need to revisit this in detail -// figure out if expression tools should be dispatched as jobs -// or if it's okay that they run "in the engine" -// probably no actual computation of any kind should run "in the engine" -// so I think the expressiontool should run as a job, just like commandlinetools +// runExpressionTool uses the engine to dispatch a task job for a given tool to evaluate an expression. func (engine *K8sEngine) runExpressionTool(tool *Tool) (err error) { engine.infof("begin run ExpressionTool: %v", tool.Task.Root.ID) - // note: context has already been loaded - if err = os.Chdir(tool.WorkingDir); err != nil { - return engine.errorf("failed to move to tool working dir: %v; error: %v", tool.Task.Root.ID, err) - } - result, err := evalExpression(tool.Task.Root.Expression, tool.InputsVM) + err = tool.evaluateExpression() if err != nil { - return engine.errorf("failed to eval expression for ExpressionTool: %v; error: %v", tool.Task.Root.ID, err) + return engine.errorf("failed to evaluate expression for tool: %v; error: %v", tool.Task.Root.ID, err) } - os.Chdir("/") // move back (?) to root after tool finishes execution -> or, where should the default directory position be? - - // expression must return a JSON object where the keys are the IDs of the ExpressionTool outputs - // see description of `expression` field here: - // https://www.commonwl.org/v1.0/Workflow.html#ExpressionTool - var ok bool - tool.ExpressionResult, ok = result.(map[string]interface{}) - if !ok { - return engine.errorf("ExpressionTool expression did not return a JSON object: %v", tool.Task.Root.ID) + err = engine.dispatchTaskJob(tool) + if err != nil { + return engine.errorf("failed to dispatch task job: %v; error: %v", tool.Task.Root.ID, err) } engine.infof("end run ExpressionTool: %v", tool.Task.Root.ID) return nil diff --git a/mariner/file.go b/mariner/file.go index f1a19bcb..b01f7c9f 100644 --- a/mariner/file.go +++ b/mariner/file.go @@ -168,27 +168,12 @@ func (engine *K8sEngine) s3KeyToLocalPath(key string) string { return strings.Replace(key, engine.UserID, engineWorkspaceVolumeName, 1) } -// loads contents of file into the File.Contents field -// #no-fuse - read from s3, not locally -func (engine *K8sEngine) loadContents(f *File) (err error) { - +// loadContents downloads contents for a file from the engine's S3 file manager to populate the file contents field. +func (engine *K8sEngine) loadContents(file *File) (err error) { sess := engine.S3FileManager.newS3Session() downloader := s3manager.NewDownloader(sess) - - // Location field stores full path, no need to handle prefix here - s3Key := strings.TrimPrefix(engine.localPathToS3Key(f.Location), "/") - - // Create a buffer to write the S3 Object contents to. - // see: https://stackoverflow.com/questions/41645377/golang-s3-download-to-buffer-using-s3manager-downloader + s3Key := engine.localPathToS3Key(file.Location) buf := &aws.WriteAtBuffer{} - - // read up to 64 KiB from file, as specified in CWL docs - // 1 KiB is 1024 bytes -> 64 KiB is 65536 bytes - // - // S3 sdk supports specifying byte ranges - // in this way: https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35 - - // Write the contents of S3 Object to the buffer s3Obj := &s3.GetObjectInput{ Bucket: aws.String(engine.S3FileManager.S3BucketName), Key: aws.String(s3Key), @@ -198,9 +183,7 @@ func (engine *K8sEngine) loadContents(f *File) (err error) { if err != nil { return fmt.Errorf("failed to download file, %v", err) } - - // populate File.Contents field with contents - f.Contents = string(buf.Bytes()) + file.Contents = string(buf.Bytes()) return nil } diff --git a/mariner/input.go b/mariner/input.go index facfb84f..607082c5 100644 --- a/mariner/input.go +++ b/mariner/input.go @@ -219,42 +219,15 @@ func (tool *Tool) processFileList(l interface{}) ([]*File, error) { return out, nil } -// if err and input is not optional, it is a fatal error and the run should fail out +// transformInput parses all input in a workflow from the engine's tool. func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out interface{}, err error) { tool.Task.infof("begin transform input: %v", input.ID) - - /* - NOTE: presently only context loaded into js vm's here is `self` - Will certainly need to add more context to handle all cases - Definitely, definitely need a generalized method for loading appropriate context at appropriate places - In particular, the `inputs` context is probably going to be needed most commonly - - OTHERNOTE: `self` (in js vm) takes on different values in different places, according to cwl docs - see: https://www.commonwl.org/v1.0/Workflow.html#Parameter_references - --- - Steps: - 1. handle ValueFrom case at stepInput level - - if no ValueFrom specified, assign parameter value to `out` to processed in next step - 2. handle ValueFrom case at toolInput level - - initial value is `out` from step 1 - */ localID := lastInPath(input.ID) - - // stepInput ValueFrom case if tool.StepInputMap[localID] != nil { - // no processing needs to happen if the valueFrom field is empty if tool.StepInputMap[localID].ValueFrom != "" { - // here the valueFrom field is not empty, so we need to handle valueFrom valueFrom := tool.StepInputMap[localID].ValueFrom if strings.HasPrefix(valueFrom, "$") { - // valueFrom is an expression that needs to be eval'd - - // get a js vm - vm := tool.JSVM.Copy() // #js-runtime - - // preprocess struct/array so that fields can be accessed in vm - // Question: how to handle non-array/struct data types? - // --------- no preprocessing should have to happen in this case. + vm := tool.JSVM.Copy() self, err := tool.loadInputValue(input) if err != nil { return nil, tool.Task.errorf("failed to load value: %v", err) @@ -263,75 +236,29 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter if err != nil { return nil, tool.Task.errorf("failed to preprocess context: %v", err) } - - // set `self` variable in vm if err = vm.Set("self", self); err != nil { return nil, tool.Task.errorf("failed to set 'self' value in js vm: %v", err) } - - /* - // Troubleshooting js - // note: when accessing object fields using keys must use otto.Run("obj.key"), NOT otto.Get("obj.key") - - fmt.Println("self in js:") - jsSelf, err := vm.Get("self") - jsSelfVal, err := jsSelf.Export() - PrintJSON(jsSelfVal) - - fmt.Println("Expression:") - PrintJSON(valueFrom) - - fmt.Println("Object.keys(self)") - keys, err := vm.Run("Object.keys(self)") - if err != nil { - fmt.Printf("Error evaluating Object.keys(self): %v\n", err) - } - keysVal, err := keys.Export() - PrintJSON(keysVal) - */ - - // eval the expression in the vm, capture result in `out` if out, err = evalExpression(valueFrom, vm); err != nil { return nil, tool.Task.errorf("failed to eval js expression: %v; error: %v", valueFrom, err) } } else { - // valueFrom is not an expression - take raw string/val as value out = valueFrom } } } - // if this tool is not a step of a parent workflow - // OR - // if this tool is a step of a parent workflow but the valueFrom is empty if out == nil { out, err = tool.loadInputValue(input) if err != nil { return nil, tool.Task.errorf("failed to load input value: %v", err) } if out == nil { - // implies an optional parameter with no value provided and no default value specified - // this input parameter is not used by the tool tool.Task.infof("optional input with no value or default provided - skipping: %v", input.ID) return nil, nil } } - // if file, need to ensure that all file attributes get populated (e.g., basename) - /* - fixme: handle array of files - Q: what about directories (?) - - do this: - - switch statement: - case file - case []file - - note: check types in the param type list? - vs. checking types of actual values - */ - switch { case isFile(out): if out, err = tool.processFile(out); err != nil { @@ -342,14 +269,9 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter return nil, tool.Task.errorf("failed to process file list: %v; error: %v", out, err) } default: - // fmt.Println("is not a file object") + tool.Task.infof("input is not a file object: %v", input.ID) } - // ######### Load Secondary Files ############ - // ######### ought to encapsulate this to a function - // ######### and call it for inputs and outputs - // ######### since it's basically the same process both times - if len(input.SecondaryFiles) > 0 { var fileArray []*File switch { @@ -358,48 +280,33 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter case isArrayOfFile(out): fileArray = out.([]*File) default: - // this is a fatal error - engine should fail out here - // but don't panic - actually handle this err and fail out gracefully - panic("invalid input: secondary files specified for a non-file input") + return nil, tool.Task.errorf("invalid input: secondary files specified for a non-file input.") } - for _, entry := range input.SecondaryFiles { val := entry.Entry if strings.HasPrefix(val, "$") { vm := tool.JSVM.Copy() for _, fileObj := range fileArray { - // preprocess output file object self, err := preProcessContext(fileObj) if err != nil { return nil, tool.Task.errorf("%v", err) } - // set `self` variable name - // assuming it is okay to use one vm for all evaluations and just reset the `self` variable before each eval vm.Set("self", self) - - // eval js jsResult, err := evalExpression(val, vm) if err != nil { return nil, tool.Task.errorf("%v", err) } - - // retrieve secondaryFile's path (type interface{} with underlying type string) sFilePath, ok := jsResult.(string) if !ok { return nil, tool.Task.errorf("secondaryFile expression did not return string") } - if exist, _ := engine.fileExists(sFilePath); !exist { - // fatal error - panic("secondary file doesn't exist") + return nil, tool.Task.errorf("secondary file doesn't exist") } - - // get file object for secondaryFile and append it to the input file's SecondaryFiles field sFileObj := fileObject(sFilePath) fileObj.SecondaryFiles = append(fileObj.SecondaryFiles, sFileObj) } } else { - // follow those two steps indicated at the bottom of the secondaryFiles field description suffix, carats := trimLeading(val, "^") for _, fileObj := range fileArray { engine.loadSFilesFromPattern(tool, fileObj, suffix, carats) @@ -415,15 +322,11 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter } } - // at this point, variable `out` is the transformed input thus far (even if no transformation actually occured) - // so `out` will be what we work with in this next block as an initial value - // tool inputBinding ValueFrom case if input.Binding != nil && input.Binding.ValueFrom != nil { valueFrom := input.Binding.ValueFrom.String if strings.HasPrefix(valueFrom, "$") { - vm := tool.JSVM.Copy() // #js-runtime + vm := tool.JSVM.Copy() var context interface{} - // fixme: handle array of files switch out.(type) { case *File, []*File: context, err = preProcessContext(out) @@ -433,17 +336,14 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter default: context = out } - - vm.Set("self", context) // NOTE: again, will more than likely need additional context here to cover other cases + vm.Set("self", context) if out, err = evalExpression(valueFrom, vm); err != nil { return nil, tool.Task.errorf("failed to eval expression: %v; error: %v", valueFrom, err) } } else { - // not an expression, so no eval necessary - take raw value out = valueFrom } } - tool.Task.infof("end transform input: %v", input.ID) return out, nil } diff --git a/mariner/js.go b/mariner/js.go index 4b3b6cc5..d8a45ab7 100644 --- a/mariner/js.go +++ b/mariner/js.go @@ -5,6 +5,8 @@ import ( "encoding/json" "fmt" "io" + "os" + "os/exec" "strings" "github.com/robertkrimen/otto" @@ -14,6 +16,32 @@ import ( // EvalExpression evals a single expression (something like $(...) or ${...}) // resolveExpressions processes a string which may contain several embedded expressions, each wrapped in their own $()/${} wrapper +// evaluateExpression evaluates the expression from the tool in its virtual machine. +func (tool *Tool) evaluateExpression() (err error) { + tool.Task.infof("begin evaluate expression") + if err = os.MkdirAll(tool.WorkingDir, os.ModeDir); err != nil { + return tool.Task.errorf("failed to make ExpressionTool working dir: %v; error: %v", tool.Task.Root.ID, err) + } + if err = os.Chdir(tool.WorkingDir); err != nil { + return tool.Task.errorf("failed to move to ExpressionTool working dir: %v; error: %v", tool.Task.Root.ID, err) + } + result, err := evalExpression(tool.Task.Root.Expression, tool.InputsVM) + if err != nil { + return tool.Task.errorf("failed to evaluate expression for ExpressionTool: %v; error: %v", tool.Task.Root.ID, err) + } + os.Chdir("/") + var ok bool + tool.ExpressionResult, ok = result.(map[string]interface{}) + if !ok { + return tool.Task.errorf("ExpressionTool expression did not return a JSON object: %v", tool.Task.Root.ID) + } + cmdPath := tool.WorkingDir + "expression.txt" + cmd := []string{"touch", cmdPath} + tool.Command = exec.Command(cmd[0], cmd[1:]...) + tool.Task.infof("end evaluate expression") + return nil +} + // NOTE: make uniform either UpperCase, or camelCase for naming functions // ----- none of these names really need to be exported, since they get called within the `mariner` package diff --git a/mariner/k8s.go b/mariner/k8s.go index 58824e08..1bdd4ea7 100644 --- a/mariner/k8s.go +++ b/mariner/k8s.go @@ -194,10 +194,7 @@ func (engine *K8sEngine) s3SidecarContainer(tool *Tool) (container *k8sv1.Contai return container } -// FIXME - TODO - insert some error/warning handling here -// in case errors/warnings creating the container as specified in the cwl -// additionally, add logic to check if the tool has specified each field -// if a field is not specified, the spec should be filled out using values from the mariner-config +// taskContainer sets up and returns a k8s container for the tool task. func (tool *Tool) taskContainer() (container *k8sv1.Container, err error) { tool.Task.infof("begin load main container spec") conf := Config.Containers.Task @@ -205,39 +202,23 @@ func (tool *Tool) taskContainer() (container *k8sv1.Container, err error) { container.Name = conf.Name container.VolumeMounts = volumeMounts(marinerTask) container.ImagePullPolicy = conf.pullPolicy() - container.Image = tool.dockerImage() tool.Task.Log.ContainerImage = container.Image - if container.Resources, err = tool.resourceReqs(); err != nil { return nil, tool.Task.errorf("failed to load cpu/mem info: %v", err) } - - // if not specified use config - container.Command = []string{tool.cltBash()} // fixme - please - - container.Args = tool.cltArgs() // fixme - make string constant or something - + container.Args = tool.containerArgs() + container.Command = []string{tool.cltBash()} if container.Env, err = tool.env(); err != nil { return nil, tool.Task.errorf("failed to load env info: %v", err) } - tool.Task.infof("end load main container spec") return container, nil } -// wait for sidecar to setup -// in particular wait until run.sh exists (run.sh is the command for the Tool) -// as soon as run.sh exists, run this script -// HERE TODO - put this in a bash script -// actually don't, because the CLT runs in its own container -// - won't have the mariner repo, and we shouldn't clone it in there -// so, just make this string a constant or something in the config file -// TOOL_WORKING_DIR is an envVar - no need to inject from go vars here -// Q: how to handle case of different possible bash, depending on CLT image specified in CWL? -// fixme -func (tool *Tool) cltArgs() []string { - tool.Task.infof("begin load CommandLineTool container args") +// containerArgs creates the necessary command arguments in a tool container for sidecar. +func (tool *Tool) containerArgs() []string { + tool.Task.infof("begin load container args") args := []string{ "-c", fmt.Sprintf(` @@ -252,26 +233,7 @@ func (tool *Tool) cltArgs() []string { touch %vdone `, tool.WorkingDir, tool.WorkingDir, tool.WorkingDir, tool.cltBash(), tool.WorkingDir, tool.WorkingDir), } - - // for debugging - /* - args := []string{ - "-c", - fmt.Sprintf(` - while [[ ! -f %vrun.sh ]]; do - echo "Waiting for sidecar to finish setting up.."; - sleep 5 - done - echo "side done setting up" - echo "staying alive" - while true; do - : - done - `, tool.WorkingDir), - } - */ - - tool.Task.infof("end load CommandLineTool container args") + tool.Task.infof("end load container args") return args } diff --git a/sidecar/sidecar.go b/sidecar/sidecar.go index 4c0b5596..edd633f4 100644 --- a/sidecar/sidecar.go +++ b/sidecar/sidecar.go @@ -207,9 +207,8 @@ func (fm *S3FileManager) waitForTaskToFinish() error { return nil } -// 5. upload this task's output to s3 +// uploadOutputFiles utilizes a file manager to upload output files for a task. func (fm *S3FileManager) uploadOutputFiles() (err error) { - // collect paths of all files in the task working directory paths := []string{} _ = filepath.Walk(fm.TaskWorkingDir, func(path string, info os.FileInfo, err error) error { if !info.IsDir() { @@ -217,37 +216,24 @@ func (fm *S3FileManager) uploadOutputFiles() (err error) { } return nil }) - - // note: uploader is safe for concurrent use sess := fm.newS3Session() uploader := s3manager.NewUploader(sess) - var result *s3manager.UploadOutput var wg sync.WaitGroup guard := make(chan struct{}, fm.MaxConcurrent) for _, p := range paths { - // blocks if guard channel is already full to capacity - // proceeds as soon as there is an open slot in the channel guard <- struct{}{} - wg.Add(1) go func(path string) { defer wg.Done() - - // debug - fmt.Println("trying to upload file:", path) - - // open file for reading f, err := os.Open(path) if err != nil { fmt.Println("failed to open file:", path, err) return } - - // upload the file contents result, err = uploader.Upload(&s3manager.UploadInput{ Bucket: aws.String(fm.S3BucketName), - Key: aws.String(fm.s3Key(path)), // HERE! - probably will fail, gotta take away the leading slash + Key: aws.String(strings.TrimPrefix(fm.s3Key(path), "/")), Body: f, }) if err != nil { @@ -255,16 +241,9 @@ func (fm *S3FileManager) uploadOutputFiles() (err error) { return } fmt.Println("file uploaded to location:", result.Location) - - // seems that files are already closed by this point - // close the file - very important if err = f.Close(); err != nil { fmt.Println("failed to close file:", err) - // return } - - // release this spot in the guard channel - // so the next goroutine can run <-guard }(p) } diff --git a/testdata/no_input_test/request_body.json b/testdata/no_input_test/request_body.json index ca632f33..f5c2f4a4 100644 --- a/testdata/no_input_test/request_body.json +++ b/testdata/no_input_test/request_body.json @@ -281,4 +281,4 @@ } ] } -} \ No newline at end of file +}