From 9a0f5501d71bb70edff50678f2770f4975a50513 Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Mon, 22 Mar 2021 09:02:23 -0700 Subject: [PATCH 1/8] fix(executor): Allow 30s for Docker/PNs polling Signed-off-by: Alex Collins --- workflow/executor/docker/docker.go | 16 +++++++++------- workflow/executor/pns/pns.go | 2 +- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/workflow/executor/docker/docker.go b/workflow/executor/docker/docker.go index 16416f6526cf..460ff74fb4a6 100644 --- a/workflow/executor/docker/docker.go +++ b/workflow/executor/docker/docker.go @@ -185,10 +185,13 @@ func (d *DockerExecutor) GetExitCode(ctx context.Context, containerName string) } func (d *DockerExecutor) Wait(ctx context.Context, containerNames []string) error { - err := d.syncContainerIDs(ctx, containerNames) - if err != nil { - return err - } + go func() { + err := d.pollContainerIDs(ctx, containerNames) + if err != nil { + log.WithError(err).Error("failed to poll container IDs") + } + + }() containerIDs, err := d.getContainerIDs(containerNames) if err != nil { return err @@ -197,7 +200,7 @@ func (d *DockerExecutor) Wait(ctx context.Context, containerNames []string) erro return err } -func (d *DockerExecutor) syncContainerIDs(ctx context.Context, containerNames []string) error { +func (d *DockerExecutor) pollContainerIDs(ctx context.Context, containerNames []string) error { for { select { case <-ctx.Done(): @@ -249,7 +252,7 @@ func (d *DockerExecutor) syncContainerIDs(ctx context.Context, containerNames [] } // sidecars start after the main containers, so we can't just exit once we know about all the main containers, // we need a bit more time - if d.haveContainers(containerNames) && time.Since(started) > 3*time.Second { + if d.haveContainers(containerNames) && time.Since(started) > 30*time.Second { return nil } } @@ -291,7 +294,6 @@ func (d *DockerExecutor) Kill(ctx context.Context, containerNames []string, term _, err = common.RunCommand("docker", killArgs...) if err != nil { log.Warningf("Ignored error from 'docker kill --signal TERM': %s", err) - return nil } waitArgs := append([]string{"wait"}, containerIDs...) waitCmd := exec.Command("docker", waitArgs...) diff --git a/workflow/executor/pns/pns.go b/workflow/executor/pns/pns.go index 1f25ef7645c0..ba1a3713e0b5 100644 --- a/workflow/executor/pns/pns.go +++ b/workflow/executor/pns/pns.go @@ -205,7 +205,7 @@ func (p *PNSExecutor) pollRootProcesses(ctx context.Context, containerNames []st } // sidecars start after the main containers, so we can't just exit once we know about all the main containers, // we need a bit more time - if p.haveContainerPIDs(containerNames) && time.Since(start) > 5*time.Second { + if p.haveContainerPIDs(containerNames) && time.Since(start) > 30*time.Second { return } time.Sleep(50 * time.Millisecond) From a586ef58724365211f59b0a1c7538e687198affe Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Mon, 22 Mar 2021 09:17:33 -0700 Subject: [PATCH 2/8] ok Signed-off-by: Alex Collins --- workflow/executor/docker/docker.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/workflow/executor/docker/docker.go b/workflow/executor/docker/docker.go index 460ff74fb4a6..b09d993b92d4 100644 --- a/workflow/executor/docker/docker.go +++ b/workflow/executor/docker/docker.go @@ -190,8 +190,15 @@ func (d *DockerExecutor) Wait(ctx context.Context, containerNames []string) erro if err != nil { log.WithError(err).Error("failed to poll container IDs") } - }() + for !d.haveContainers(containerNames) { + select { + case <-ctx.Done(): + return ctx.Err() + default: + time.Sleep(1 * time.Second) + } + } containerIDs, err := d.getContainerIDs(containerNames) if err != nil { return err From e2eb7a382b5fc19532de2265abb82888a2d06a59 Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Mon, 22 Mar 2021 11:57:36 -0700 Subject: [PATCH 3/8] ok Signed-off-by: Alex Collins --- workflow/executor/docker/docker.go | 13 +++++++------ workflow/executor/pns/pns.go | 17 ++++++++++------- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/workflow/executor/docker/docker.go b/workflow/executor/docker/docker.go index b09d993b92d4..afc164ee13c9 100644 --- a/workflow/executor/docker/docker.go +++ b/workflow/executor/docker/docker.go @@ -257,13 +257,14 @@ func (d *DockerExecutor) pollContainerIDs(ctx context.Context, containerNames [] containerStatus[containerName] = status log.Infof("mapped container name %q to container ID %q (created at %v, status %s)", containerName, containerID, createdAt, status) } - // sidecars start after the main containers, so we can't just exit once we know about all the main containers, - // we need a bit more time - if d.haveContainers(containerNames) && time.Since(started) > 30*time.Second { - return nil - } } - time.Sleep(1 * time.Second) // this is a hard-loop because containers can run very short periods of time + // sidecars start after the main containers, so we can't just exit once we know about all the main containers, + // we need a bit more time + if !d.haveContainers(containerNames) || time.Since(started) < 15*time.Second { + time.Sleep(1 * time.Second) // this is a hard-loop because containers can run very short periods of time + } else { + time.Sleep(10 * time.Second) + } } } diff --git a/workflow/executor/pns/pns.go b/workflow/executor/pns/pns.go index ba1a3713e0b5..f0d71845ee69 100644 --- a/workflow/executor/pns/pns.go +++ b/workflow/executor/pns/pns.go @@ -205,10 +205,11 @@ func (p *PNSExecutor) pollRootProcesses(ctx context.Context, containerNames []st } // sidecars start after the main containers, so we can't just exit once we know about all the main containers, // we need a bit more time - if p.haveContainerPIDs(containerNames) && time.Since(start) > 30*time.Second { - return + if !p.haveContainerPIDs(containerNames) || time.Since(start) < 5*time.Second { + time.Sleep(50 * time.Millisecond) + } else { + time.Sleep(10 * time.Second) } - time.Sleep(50 * time.Millisecond) } } } @@ -335,15 +336,17 @@ func (p *PNSExecutor) secureRootFiles() error { if prevInfo, ok := p.pidFileHandles[pid]; ok { _ = prevInfo.Close() } - p.pidFileHandles[pid] = fs - log.Infof("secured root for pid %d root: %s", pid, proc.Executable()) + p.mu.Lock() + defer p.mu.Unlock() + if p.pidFileHandles[pid] != fs { + p.pidFileHandles[pid] = fs + log.Infof("secured root for pid %d root: %s", pid, proc.Executable()) + } containerName, err := containerNameForPID(pid) if err != nil { return err } if p.getContainerPID(containerName) != pid { - p.mu.Lock() - defer p.mu.Unlock() p.containerNameToPID[containerName] = pid log.Infof("mapped container name %q to pid %d", containerName, pid) } From ac815e0805249c946bc71c286d577b771c6c2ee8 Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Mon, 22 Mar 2021 14:32:28 -0700 Subject: [PATCH 4/8] ok Signed-off-by: Alex Collins --- Makefile | 2 +- workflow/executor/docker/docker.go | 4 ++++ workflow/executor/pns/pns.go | 3 +++ 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 0c9ecc34ba67..d15c64e29d1a 100644 --- a/Makefile +++ b/Makefile @@ -493,7 +493,7 @@ mysql-cli: kubectl exec -ti `kubectl get pod -l app=mysql -o name|cut -c 5-` -- mysql -u mysql -ppassword argo start-e2e: - $(MAKE) start PROFILE=mysql E2E_EXECUTOR=emissary ALWAYS_OFFLOAD_NODE_STATUS=true AUTH_MODE=client + $(MAKE) start PROFILE=mysql E2E_EXECUTOR=$(E2E_EXECUTOR) ALWAYS_OFFLOAD_NODE_STATUS=true AUTH_MODE=client test-e2e: test-api test-cli test-cron test-executor test-functional diff --git a/workflow/executor/docker/docker.go b/workflow/executor/docker/docker.go index afc164ee13c9..63ac6c9bc3e3 100644 --- a/workflow/executor/docker/docker.go +++ b/workflow/executor/docker/docker.go @@ -185,6 +185,8 @@ func (d *DockerExecutor) GetExitCode(ctx context.Context, containerName string) } func (d *DockerExecutor) Wait(ctx context.Context, containerNames []string) error { + ctx, cancel := context.WithCancel(ctx) // stop the polling when we are no longer waiting + defer cancel() go func() { err := d.pollContainerIDs(ctx, containerNames) if err != nil { @@ -208,6 +210,8 @@ func (d *DockerExecutor) Wait(ctx context.Context, containerNames []string) erro } func (d *DockerExecutor) pollContainerIDs(ctx context.Context, containerNames []string) error { + ctx, cancel := context.WithTimeout(ctx, time.Minute) + defer cancel() for { select { case <-ctx.Done(): diff --git a/workflow/executor/pns/pns.go b/workflow/executor/pns/pns.go index f0d71845ee69..adff785af29d 100644 --- a/workflow/executor/pns/pns.go +++ b/workflow/executor/pns/pns.go @@ -130,6 +130,9 @@ func (p *PNSExecutor) CopyFile(containerName string, sourcePath string, destPath } func (p *PNSExecutor) Wait(ctx context.Context, containerNames []string) error { + ctx, cancel := context.WithCancel(ctx) // stop the polling when we are no longer waiting + defer cancel() + go p.pollRootProcesses(ctx, containerNames) // Secure a filehandle on our own root. This is because we will chroot back and forth from From a3311e01d25043575b4b5516fecc4b49d4165eb4 Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Mon, 22 Mar 2021 15:48:45 -0700 Subject: [PATCH 5/8] ok Signed-off-by: Alex Collins --- workflow/executor/docker/docker.go | 11 ++--------- workflow/executor/pns/pns.go | 2 -- 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/workflow/executor/docker/docker.go b/workflow/executor/docker/docker.go index 63ac6c9bc3e3..2ca26a8abb47 100644 --- a/workflow/executor/docker/docker.go +++ b/workflow/executor/docker/docker.go @@ -193,13 +193,8 @@ func (d *DockerExecutor) Wait(ctx context.Context, containerNames []string) erro log.WithError(err).Error("failed to poll container IDs") } }() - for !d.haveContainers(containerNames) { - select { - case <-ctx.Done(): - return ctx.Err() - default: - time.Sleep(1 * time.Second) - } + for i := 0; !d.haveContainers(containerNames) && i < 5; i++ { + time.Sleep(1 * time.Second) } containerIDs, err := d.getContainerIDs(containerNames) if err != nil { @@ -210,8 +205,6 @@ func (d *DockerExecutor) Wait(ctx context.Context, containerNames []string) erro } func (d *DockerExecutor) pollContainerIDs(ctx context.Context, containerNames []string) error { - ctx, cancel := context.WithTimeout(ctx, time.Minute) - defer cancel() for { select { case <-ctx.Done(): diff --git a/workflow/executor/pns/pns.go b/workflow/executor/pns/pns.go index adff785af29d..5ff3e59b106b 100644 --- a/workflow/executor/pns/pns.go +++ b/workflow/executor/pns/pns.go @@ -196,8 +196,6 @@ OUTER: // Polling is necessary because it is not possible to use something like fsnotify against procfs. func (p *PNSExecutor) pollRootProcesses(ctx context.Context, containerNames []string) { start := time.Now() - ctx, cancel := context.WithTimeout(ctx, time.Minute) - defer cancel() for { select { case <-ctx.Done(): From 9ab6162bd7e8b13efc9e8dce5354f1916486d1d7 Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Tue, 23 Mar 2021 09:25:36 -0700 Subject: [PATCH 6/8] ok Signed-off-by: Alex Collins --- workflow/executor/pns/pns.go | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/workflow/executor/pns/pns.go b/workflow/executor/pns/pns.go index 5ff3e59b106b..1f25ef7645c0 100644 --- a/workflow/executor/pns/pns.go +++ b/workflow/executor/pns/pns.go @@ -130,9 +130,6 @@ func (p *PNSExecutor) CopyFile(containerName string, sourcePath string, destPath } func (p *PNSExecutor) Wait(ctx context.Context, containerNames []string) error { - ctx, cancel := context.WithCancel(ctx) // stop the polling when we are no longer waiting - defer cancel() - go p.pollRootProcesses(ctx, containerNames) // Secure a filehandle on our own root. This is because we will chroot back and forth from @@ -196,6 +193,8 @@ OUTER: // Polling is necessary because it is not possible to use something like fsnotify against procfs. func (p *PNSExecutor) pollRootProcesses(ctx context.Context, containerNames []string) { start := time.Now() + ctx, cancel := context.WithTimeout(ctx, time.Minute) + defer cancel() for { select { case <-ctx.Done(): @@ -206,11 +205,10 @@ func (p *PNSExecutor) pollRootProcesses(ctx context.Context, containerNames []st } // sidecars start after the main containers, so we can't just exit once we know about all the main containers, // we need a bit more time - if !p.haveContainerPIDs(containerNames) || time.Since(start) < 5*time.Second { - time.Sleep(50 * time.Millisecond) - } else { - time.Sleep(10 * time.Second) + if p.haveContainerPIDs(containerNames) && time.Since(start) > 5*time.Second { + return } + time.Sleep(50 * time.Millisecond) } } } @@ -337,17 +335,15 @@ func (p *PNSExecutor) secureRootFiles() error { if prevInfo, ok := p.pidFileHandles[pid]; ok { _ = prevInfo.Close() } - p.mu.Lock() - defer p.mu.Unlock() - if p.pidFileHandles[pid] != fs { - p.pidFileHandles[pid] = fs - log.Infof("secured root for pid %d root: %s", pid, proc.Executable()) - } + p.pidFileHandles[pid] = fs + log.Infof("secured root for pid %d root: %s", pid, proc.Executable()) containerName, err := containerNameForPID(pid) if err != nil { return err } if p.getContainerPID(containerName) != pid { + p.mu.Lock() + defer p.mu.Unlock() p.containerNameToPID[containerName] = pid log.Infof("mapped container name %q to pid %d", containerName, pid) } From 434f20d07c42a46907a3a37692329ad29cd08ef2 Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Wed, 24 Mar 2021 10:06:39 -0700 Subject: [PATCH 7/8] ok Signed-off-by: Alex Collins --- workflow/executor/docker/docker.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/workflow/executor/docker/docker.go b/workflow/executor/docker/docker.go index 2ca26a8abb47..877b3556b2e4 100644 --- a/workflow/executor/docker/docker.go +++ b/workflow/executor/docker/docker.go @@ -185,16 +185,19 @@ func (d *DockerExecutor) GetExitCode(ctx context.Context, containerName string) } func (d *DockerExecutor) Wait(ctx context.Context, containerNames []string) error { - ctx, cancel := context.WithCancel(ctx) // stop the polling when we are no longer waiting - defer cancel() go func() { err := d.pollContainerIDs(ctx, containerNames) if err != nil { log.WithError(err).Error("failed to poll container IDs") } }() - for i := 0; !d.haveContainers(containerNames) && i < 5; i++ { - time.Sleep(1 * time.Second) + for !d.haveContainers(containerNames) { + select { + case <-ctx.Done(): + return ctx.Err() + default: + time.Sleep(1 * time.Second) + } } containerIDs, err := d.getContainerIDs(containerNames) if err != nil { @@ -257,7 +260,7 @@ func (d *DockerExecutor) pollContainerIDs(ctx context.Context, containerNames [] } // sidecars start after the main containers, so we can't just exit once we know about all the main containers, // we need a bit more time - if !d.haveContainers(containerNames) || time.Since(started) < 15*time.Second { + if !d.haveContainers(containerNames) || time.Since(started) < 30*time.Second { time.Sleep(1 * time.Second) // this is a hard-loop because containers can run very short periods of time } else { time.Sleep(10 * time.Second) From 4fcffc75290f7c2ec37151d262d660bdd6032bf0 Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Wed, 24 Mar 2021 10:24:53 -0700 Subject: [PATCH 8/8] add fail test Signed-off-by: Alex Collins --- test/e2e/failed_main_test.go | 37 ++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 test/e2e/failed_main_test.go diff --git a/test/e2e/failed_main_test.go b/test/e2e/failed_main_test.go new file mode 100644 index 000000000000..3f41a438b534 --- /dev/null +++ b/test/e2e/failed_main_test.go @@ -0,0 +1,37 @@ +// +build functional + +package e2e + +import ( + "testing" + + "github.com/stretchr/testify/suite" + + "github.com/argoproj/argo-workflows/v3/test/e2e/fixtures" +) + +type FailedMainSuite struct { + fixtures.E2ESuite +} + +func (s *FailedMainSuite) TestFailedMain() { + s.Given(). + Workflow(` +metadata: + generateName: failed-main- +spec: + entrypoint: main + templates: + - name: main + container: + image: argoproj/argosay:v2 + args: [ exit, "1" ] +`). + When(). + SubmitWorkflow(). + WaitForWorkflow(fixtures.ToBeFailed) +} + +func TestFailedMainSuite(t *testing.T) { + suite.Run(t, new(FailedMainSuite)) +}