Skip to content

Commit

Permalink
fix: conditionally drain docker logs on stop to avoid docker deployer…
Browse files Browse the repository at this point in the history
… to stay in infinite loop (#8838)

* fix: conditionally drain docker logs on stop to avoid docker deployer to stay in infinite loop

* test: integration test for docker deployer interruption
  • Loading branch information
renzodavid9 authored Jun 5, 2023
1 parent d219498 commit 97de7db
Show file tree
Hide file tree
Showing 13 changed files with 188 additions and 25 deletions.
40 changes: 40 additions & 0 deletions integration/dev_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,46 @@ func TestDevGracefulCancel(t *testing.T) {
}
}

func TestDevCancelWithDockerDeployer(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("graceful cancel doesn't work on windows")
}

tests := []struct {
description string
dir string
containers []string
}{
{
description: "interrupt dev loop in Docker deployer",
dir: "testdata/docker-deploy",
containers: []string{"ernie", "bert"},
},
}

for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
MarkIntegrationTest(t, CanRunWithoutGcp)
p, err := skaffold.Dev().InDir(test.dir).StartWithProcess(t)
if err != nil {
t.Fatalf("error starting skaffold dev process")
}

if err = waitForContainersRunning(t, test.containers...); err != nil {
t.Fatalf("failed waiting for containers: %v", err)
}

p.Signal(syscall.SIGINT)

state, _ := p.Wait()

if state.ExitCode() != 0 {
t.Fail()
}
})
}
}

func TestDevAPIBuildTrigger(t *testing.T) {
MarkIntegrationTest(t, CanRunWithoutGcp)

Expand Down
14 changes: 14 additions & 0 deletions integration/testdata/docker-deploy/bert/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM golang:1.18 as builder
WORKDIR /code
COPY main.go .
COPY go.mod .
# `skaffold debug` sets SKAFFOLD_GO_GCFLAGS to disable compiler optimizations
ARG SKAFFOLD_GO_GCFLAGS
RUN go build -gcflags="${SKAFFOLD_GO_GCFLAGS}" -trimpath -o /app main.go

FROM alpine:3.10
# Define GOTRACEBACK to mark this container as using the Go language runtime
# for `skaffold debug` (https://skaffold.dev/docs/workflows/debug/).
ENV GOTRACEBACK=single
CMD ["./app"]
COPY --from=builder /app .
3 changes: 3 additions & 0 deletions integration/testdata/docker-deploy/bert/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/GoogleContainerTools/skaffold/examples/docker-deploy/bert

go 1.18
14 changes: 14 additions & 0 deletions integration/testdata/docker-deploy/bert/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package main

import (
"fmt"
"time"
)

func main() {
for {
fmt.Println("Hey there Bert!")

time.Sleep(time.Second * 2)
}
}
14 changes: 14 additions & 0 deletions integration/testdata/docker-deploy/ernie/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM golang:1.18 as builder
WORKDIR /code
COPY main.go .
COPY go.mod .
# `skaffold debug` sets SKAFFOLD_GO_GCFLAGS to disable compiler optimizations
ARG SKAFFOLD_GO_GCFLAGS
RUN go build -gcflags="${SKAFFOLD_GO_GCFLAGS}" -trimpath -o /app main.go

FROM alpine:3.10
# Define GOTRACEBACK to mark this container as using the Go language runtime
# for `skaffold debug` (https://skaffold.dev/docs/workflows/debug/).
ENV GOTRACEBACK=single
CMD ["./app"]
COPY --from=builder /app .
3 changes: 3 additions & 0 deletions integration/testdata/docker-deploy/ernie/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/GoogleContainerTools/skaffold/examples/docker-deploy/ernie

go 1.18
14 changes: 14 additions & 0 deletions integration/testdata/docker-deploy/ernie/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package main

import (
"fmt"
"time"
)

func main() {
for {
fmt.Println("Hey there Ernie!")

time.Sleep(time.Second * 2)
}
}
13 changes: 13 additions & 0 deletions integration/testdata/docker-deploy/skaffold.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
apiVersion: skaffold/v4beta5
kind: Config
build:
local:
push: false
artifacts:
- image: bert
context: bert
- image: ernie
context: ernie
deploy:
docker:
images: [bert, ernie]
40 changes: 40 additions & 0 deletions integration/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ import (
"testing"
"time"

"github.com/docker/docker/errdefs"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
typedappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
Expand Down Expand Up @@ -461,6 +463,44 @@ func SetupDockerClient(t *testing.T) docker.LocalDaemon {
return client
}

func waitForContainersRunning(t *testing.T, containerNames ...string) error {
t.Helper()

ctx := context.Background()
// Same as waitForPods.
timeout := 5 * time.Minute
interval := 1 * time.Second
client := SetupDockerClient(t)

return wait.Poll(interval, timeout, func() (bool, error) {
containersRunning := 0
for _, cn := range containerNames {
cInfo, err := client.RawClient().ContainerInspect(ctx, cn)
if err != nil && !errdefs.IsNotFound(err) {
return false, err
}

if errdefs.IsNotFound(err) {
return false, nil
}

if cInfo.State.Running {
containersRunning++
}

if cInfo.State.Dead || cInfo.State.Restarting {
return false, fmt.Errorf("container %v is in dead or restarting state", cn)
}
}

if containersRunning == len(containerNames) {
return true, nil
}

return false, nil
})
}

type fakeDockerConfig struct {
kubeContext string
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/skaffold/actions/docker/exec_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ var NewExecEnv = newExecEnv

func newExecEnv(ctx context.Context, cfg dockerutil.Config, labeller *label.DefaultLabeller, resources []*latest.PortForwardResource, network string, envMap map[string]string, acs []latest.Action) (*ExecEnv, error) {
tracker := tracker.NewContainerTracker()
l, err := logger.NewLogger(ctx, tracker, cfg)
l, err := logger.NewLogger(ctx, tracker, cfg, false)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/skaffold/deploy/docker/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func NewDeployer(ctx context.Context, cfg dockerutil.Config, labeller *label.Def
}

tracker := tracker.NewContainerTracker()
l, err := logger.NewLogger(ctx, tracker, cfg)
l, err := logger.NewLogger(ctx, tracker, cfg, true)
if err != nil {
return nil, err
}
Expand Down
52 changes: 30 additions & 22 deletions pkg/skaffold/docker/logger/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ type Logger struct {
colorPicker output.ColorPicker
client docker.LocalDaemon
hadLogsOutput sync.Map
childThreadEmitLogs AtomicBool
muted int32
shouldInterruptLogs bool
// Cancel function to trigger the interruption of the threads emitting container logs.
threadLogsCancel context.CancelFunc
}

type AtomicBool struct{ flag int32 }
Expand All @@ -59,18 +61,16 @@ func (b *AtomicBool) Get() bool {
return atomic.LoadInt32(&(b.flag)) != 0
}

func NewLogger(ctx context.Context, tracker *tracker.ContainerTracker, cfg docker.Config) (*Logger, error) {
func NewLogger(ctx context.Context, tracker *tracker.ContainerTracker, cfg docker.Config, shouldInterruptLogs bool) (*Logger, error) {
cli, err := docker.NewAPIClient(ctx, cfg)
if err != nil {
return nil, err
}
childThreadEmitLogs := AtomicBool{}
childThreadEmitLogs.Set(true)
return &Logger{
tracker: tracker,
client: cli,
colorPicker: output.NewColorPicker(),
childThreadEmitLogs: childThreadEmitLogs,
shouldInterruptLogs: shouldInterruptLogs,
}, nil
}

Expand All @@ -87,14 +87,21 @@ func (l *Logger) Start(ctx context.Context, out io.Writer) error {

l.out = out

cancel := func() {}
threadsCtx := ctx
if l.shouldInterruptLogs {
threadsCtx, cancel = context.WithCancel(ctx)
}
l.threadLogsCancel = cancel

go func() {
for {
select {
case <-ctx.Done():
return
case id := <-l.tracker.Notifier():
l.hadLogsOutput.Store(id, false)
go l.streamLogsFromContainer(ctx, id, false)
go l.streamLogsFromContainer(threadsCtx, id)
}
}
}()
Expand Down Expand Up @@ -125,24 +132,21 @@ func NewStatusBackoff() *wait.Backoff {
}
}

func (l *Logger) streamLogsFromContainer(ctx context.Context, id string, force bool) {
func (l *Logger) streamLogsFromContainer(ctx context.Context, id string) {
tr, tw := io.Pipe()
l.wg.Add(1)
defer l.wg.Done()
go func() {
var err error
backoff := NewStatusBackoff()
if waitErr := wait.Poll(backoff.Duration, RetryTimeout, func() (bool, error) {
if waitErr := wait.PollWithContext(ctx, backoff.Duration, RetryTimeout, func(ctx context.Context) (bool, error) {
time.Sleep(backoff.Step())

if !force {
if !l.childThreadEmitLogs.Get() {
return true, nil
}
}

if err = l.client.ContainerLogs(ctx, tw, id); err != nil {
return false, nil
// Only if the error was not a ctx cancel event we want to keep doing the poll.
if ctx.Err() == nil {
return false, nil
}
}
l.hadLogsOutput.Store(id, true)
return true, nil
Expand All @@ -168,15 +172,19 @@ func (l *Logger) Stop() {
if l == nil {
return
}
l.childThreadEmitLogs.Set(false)

l.threadLogsCancel()
l.wg.Wait()

l.hadLogsOutput.Range(func(key, value interface{}) bool {
if !value.(bool) {
l.streamLogsFromContainer(context.TODO(), key.(string), true)
}
return true
})
// If the logs shouldn't be interrupted, we assume we want to drain them.
if !l.shouldInterruptLogs {
l.hadLogsOutput.Range(func(key, value interface{}) bool {
if !value.(bool) {
l.streamLogsFromContainer(context.TODO(), key.(string))
}
return true
})
}

l.tracker.Reset()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/skaffold/verify/docker/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func NewVerifier(ctx context.Context, cfg dockerutil.Config, labeller *label.Def
}

tracker := tracker.NewContainerTracker()
l, err := logger.NewLogger(ctx, tracker, cfg)
l, err := logger.NewLogger(ctx, tracker, cfg, false)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 97de7db

Please sign in to comment.