Skip to content

Commit

Permalink
Implement support for PNS (Process Namespace Sharing) executor
Browse files Browse the repository at this point in the history
  • Loading branch information
jessesuen committed Apr 10, 2019
1 parent b4edfd3 commit b1e77fe
Show file tree
Hide file tree
Showing 64 changed files with 1,946 additions and 729 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog

## 2.3.0 (Not Yet Released)

### Deprecation Notice
The workflow-controller-configmap introduces a new config field, `executor`, which is a container
spec and provides controls over the executor sidecar container (i.e. `init`/`wait`). The fields
`executorImage`, `executorResources`, and `executorImagePullPolicy` are deprecated and will be
removed in a future release.

## 2.2.1 (2018-10-18)

### Changelog since v2.2.0
Expand Down
27 changes: 24 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 12 additions & 13 deletions cmd/argoexec/commands/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,18 @@ import (
"github.com/spf13/cobra"
)

func init() {
RootCmd.AddCommand(initCmd)
}

var initCmd = &cobra.Command{
Use: "init",
Short: "Load artifacts",
Run: func(cmd *cobra.Command, args []string) {
err := loadArtifacts()
if err != nil {
log.Fatalf("%+v", err)
}
},
func NewInitCommand() *cobra.Command {
var command = cobra.Command{
Use: "init",
Short: "Load artifacts",
Run: func(cmd *cobra.Command, args []string) {
err := loadArtifacts()
if err != nil {
log.Fatalf("%+v", err)
}
},
}
return &command
}

func loadArtifacts() error {
Expand Down
33 changes: 16 additions & 17 deletions cmd/argoexec/commands/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,22 @@ import (
"github.com/spf13/cobra"
)

func init() {
RootCmd.AddCommand(resourceCmd)
}

var resourceCmd = &cobra.Command{
Use: "resource (get|create|apply|delete) MANIFEST",
Short: "update a resource and wait for resource conditions",
Run: func(cmd *cobra.Command, args []string) {
if len(args) != 1 {
cmd.HelpFunc()(cmd, args)
os.Exit(1)
}
err := execResource(args[0])
if err != nil {
log.Fatalf("%+v", err)
}
},
func NewResourceCommand() *cobra.Command {
var command = cobra.Command{
Use: "resource (get|create|apply|delete) MANIFEST",
Short: "update a resource and wait for resource conditions",
Run: func(cmd *cobra.Command, args []string) {
if len(args) != 1 {
cmd.HelpFunc()(cmd, args)
os.Exit(1)
}
err := execResource(args[0])
if err != nil {
log.Fatalf("%+v", err)
}
},
}
return &command
}

func execResource(action string) error {
Expand Down
109 changes: 55 additions & 54 deletions cmd/argoexec/commands/root.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package commands

import (
"encoding/json"
"os"

"github.com/argoproj/pkg/kube/cli"
"github.com/ghodss/yaml"
"github.com/argoproj/pkg/cli"
kubecli "github.com/argoproj/pkg/kube/cli"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"k8s.io/client-go/kubernetes"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/argoproj/argo/workflow/executor/docker"
"github.com/argoproj/argo/workflow/executor/k8sapi"
"github.com/argoproj/argo/workflow/executor/kubelet"
"github.com/argoproj/argo/workflow/executor/pns"
)

const (
Expand All @@ -25,83 +27,82 @@ const (
)

var (
// GlobalArgs hold global CLI flags
GlobalArgs globalFlags

clientConfig clientcmd.ClientConfig
)

type globalFlags struct {
clientConfig clientcmd.ClientConfig
logLevel string // --loglevel
glogLevel int // --gloglevel
podAnnotationsPath string // --pod-annotations
}
)

func init() {
clientConfig = cli.AddKubectlFlagsToCmd(RootCmd)
RootCmd.PersistentFlags().StringVar(&GlobalArgs.podAnnotationsPath, "pod-annotations", common.PodMetadataAnnotationsPath, "Pod annotations file from k8s downward API")
RootCmd.AddCommand(cmd.NewVersionCmd(CLIName))
cobra.OnInitialize(initConfig)
}

// RootCmd is the argo root level command
var RootCmd = &cobra.Command{
Use: CLIName,
Short: "argoexec is the executor sidecar to workflow containers",
Run: func(cmd *cobra.Command, args []string) {
cmd.HelpFunc()(cmd, args)
},
func initConfig() {
cli.SetLogLevel(logLevel)
cli.SetGLogLevel(glogLevel)
}

func initExecutor() *executor.WorkflowExecutor {
podAnnotationsPath := common.PodMetadataAnnotationsPath

// Use the path specified from the flag
if GlobalArgs.podAnnotationsPath != "" {
podAnnotationsPath = GlobalArgs.podAnnotationsPath
func NewRootCommand() *cobra.Command {
var command = cobra.Command{
Use: CLIName,
Short: "argoexec is the executor sidecar to workflow containers",
Run: func(cmd *cobra.Command, args []string) {
cmd.HelpFunc()(cmd, args)
},
}

command.AddCommand(NewInitCommand())
command.AddCommand(NewResourceCommand())
command.AddCommand(NewWaitCommand())
command.AddCommand(cmd.NewVersionCmd(CLIName))

clientConfig = kubecli.AddKubectlFlagsToCmd(&command)
command.PersistentFlags().StringVar(&podAnnotationsPath, "pod-annotations", common.PodMetadataAnnotationsPath, "Pod annotations file from k8s downward API")
command.PersistentFlags().StringVar(&logLevel, "loglevel", "info", "Set the logging level. One of: debug|info|warn|error")
command.PersistentFlags().IntVar(&glogLevel, "gloglevel", 0, "Set the glog logging level")

return &command
}

func initExecutor() *executor.WorkflowExecutor {
config, err := clientConfig.ClientConfig()
if err != nil {
panic(err.Error())
}
checkErr(err)

namespace, _, err := clientConfig.Namespace()
if err != nil {
panic(err.Error())
}
checkErr(err)

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
podName, ok := os.LookupEnv(common.EnvVarPodName)
if !ok {
log.Fatalf("Unable to determine pod name from environment variable %s", common.EnvVarPodName)
}
checkErr(err)

podName, err := os.Hostname()
checkErr(err)

tmpl, err := executor.LoadTemplate(podAnnotationsPath)
checkErr(err)

var cre executor.ContainerRuntimeExecutor
switch os.Getenv(common.EnvVarContainerRuntimeExecutor) {
case common.ContainerRuntimeExecutorK8sAPI:
cre, err = k8sapi.NewK8sAPIExecutor(clientset, config, podName, namespace)
if err != nil {
panic(err.Error())
}
case common.ContainerRuntimeExecutorKubelet:
cre, err = kubelet.NewKubeletExecutor()
if err != nil {
panic(err.Error())
}
case common.ContainerRuntimeExecutorPNS:
cre, err = pns.NewPNSExecutor(clientset, podName, namespace, tmpl.Outputs.HasOutputs())
default:
cre, err = docker.NewDockerExecutor()
if err != nil {
panic(err.Error())
}
}
wfExecutor := executor.NewExecutor(clientset, podName, namespace, podAnnotationsPath, cre)
err = wfExecutor.LoadTemplate()
if err != nil {
panic(err.Error())
}
checkErr(err)

yamlBytes, _ := yaml.Marshal(&wfExecutor.Template)
wfExecutor := executor.NewExecutor(clientset, podName, namespace, podAnnotationsPath, cre, *tmpl)
yamlBytes, _ := json.Marshal(&wfExecutor.Template)
vers := argo.GetVersion()
log.Infof("Executor (version: %s, build_date: %s) initialized with template:\n%s", vers, vers.BuildDate, string(yamlBytes))
return &wfExecutor
}

// checkErr is a convenience function to panic upon error
func checkErr(err error) {
if err != nil {
panic(err.Error())
}
}
34 changes: 19 additions & 15 deletions cmd/argoexec/commands/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,18 @@ import (
"github.com/spf13/cobra"
)

func init() {
RootCmd.AddCommand(waitCmd)
}

var waitCmd = &cobra.Command{
Use: "wait",
Short: "wait for main container to finish and save artifacts",
Run: func(cmd *cobra.Command, args []string) {
err := waitContainer()
if err != nil {
log.Fatalf("%+v", err)
}
},
func NewWaitCommand() *cobra.Command {
var command = cobra.Command{
Use: "wait",
Short: "wait for main container to finish and save artifacts",
Run: func(cmd *cobra.Command, args []string) {
err := waitContainer()
if err != nil {
log.Fatalf("%+v", err)
}
},
}
return &command
}

func waitContainer() error {
Expand All @@ -29,11 +28,16 @@ func waitContainer() error {
defer stats.LogStats()
stats.StartStatsTicker(5 * time.Minute)

// Wait for main container to complete and kill sidecars
// Wait for main container to complete
err := wfExecutor.Wait()
if err != nil {
wfExecutor.AddError(err)
// do not return here so we can still try to save outputs
// do not return here so we can still try to kill sidecars & save outputs
}
err = wfExecutor.KillSidecars()
if err != nil {
wfExecutor.AddError(err)
// do not return here so we can still try save outputs
}
logArt, err := wfExecutor.SaveLogs()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/argoexec/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

func main() {
if err := commands.RootCmd.Execute(); err != nil {
if err := commands.NewRootCommand().Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
}
Expand Down
Loading

0 comments on commit b1e77fe

Please sign in to comment.