From 77da736d5ac881b66be2489a8ef83c4fd7d0c963 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Mon, 17 Jun 2024 13:43:22 +0200 Subject: [PATCH] fix: proper synchronisation for start/stop log production (#2576) * fix: proper synchronisation for start/stop log production * fix: protect channel close * fix: properly close the log channel * chore: do not close the channel --- docker.go | 36 +++++++++-------------------- logconsumer_test.go | 55 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 26 deletions(-) diff --git a/docker.go b/docker.go index 820e9b4cba..ac8bad2361 100644 --- a/docker.go +++ b/docker.go @@ -78,11 +78,10 @@ type DockerContainer struct { // logProductionWaitGroup is used to signal when the log production has stopped. // This allows stopLogProduction to safely set logProductionStop to nil. + // See simplification in https://go.dev/play/p/x0pOElF2Vjf logProductionWaitGroup sync.WaitGroup - // logProductionMutex protects logProductionStop channel so it can be started again. - logProductionMutex sync.Mutex - logProductionStop chan struct{} + logProductionStop chan struct{} logProductionTimeout *time.Duration logger Logging @@ -697,17 +696,8 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context, opts ...LogProdu // Use functional option WithLogProductionTimeout() to override default timeout. If it's // lower than 5s and greater than 60s it will be set to 5s or 60s respectively. func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogProductionOption) error { - { - c.logProductionMutex.Lock() - defer c.logProductionMutex.Unlock() - - if c.logProductionStop != nil { - return errors.New("log production already started") - } - - c.logProductionStop = make(chan struct{}) - c.logProductionWaitGroup.Add(1) - } + c.logProductionStop = make(chan struct{}, 1) // buffered channel to avoid blocking + c.logProductionWaitGroup.Add(1) for _, opt := range opts { opt(c) @@ -828,18 +818,12 @@ func (c *DockerContainer) StopLogProducer() error { // stopLogProduction will stop the concurrent process that is reading logs // and sending them to each added LogConsumer func (c *DockerContainer) stopLogProduction() error { - // TODO: Remove locking and wait group once StartLogProducer and StopLogProducer - // have been removed and hence logging can only be started / stopped once. - c.logProductionMutex.Lock() - defer c.logProductionMutex.Unlock() - if c.logProductionStop != nil { - close(c.logProductionStop) - c.logProductionWaitGroup.Wait() - // Set c.logProductionStop to nil so that it can be started again. - c.logProductionStop = nil - return <-c.logProductionError - } - return nil + // signal the log production to stop + c.logProductionStop <- struct{}{} + + c.logProductionWaitGroup.Wait() + + return <-c.logProductionError } // GetLogProductionErrorChannel exposes the only way for the consumer diff --git a/logconsumer_test.go b/logconsumer_test.go index 192a00f954..d63e97e2df 100644 --- a/logconsumer_test.go +++ b/logconsumer_test.go @@ -638,3 +638,58 @@ func Test_MultiContainerLogConsumer_CancelledContext(t *testing.T) { // the multiple containers. assert.False(t, strings.Contains(actual, logStoppedForOutOfSyncMessage)) } + +type FooLogConsumer struct { + LogChannel chan string +} + +func (c FooLogConsumer) Accept(rawLog Log) { + log := string(rawLog.Content) + c.LogChannel <- log +} + +func NewFooLogConsumer() *FooLogConsumer { + return &FooLogConsumer{ + LogChannel: make(chan string), + } +} + +func TestRestartContainerWithLogConsumer(t *testing.T) { + logConsumer := NewFooLogConsumer() + + ctx := context.Background() + container, err := GenericContainer(ctx, GenericContainerRequest{ + ContainerRequest: ContainerRequest{ + Image: "hello-world", + AlwaysPullImage: true, + LogConsumerCfg: &LogConsumerConfig{ + Consumers: []LogConsumer{logConsumer}, + }, + }, + Started: false, + }) + if err != nil { + t.Fatalf("Cant create container: %s", err.Error()) + } + + err = container.Start(ctx) + if err != nil { + t.Fatalf("Cant start container: %s", err.Error()) + } + + d := 30 * time.Second + err = container.Stop(ctx, &d) + if err != nil { + t.Fatalf("Cant stop container: %s", err.Error()) + } + err = container.Start(ctx) + if err != nil { + t.Fatalf("Cant start container: %s", err.Error()) + } + + for s := range logConsumer.LogChannel { + if strings.Contains(s, "Hello from Docker!") { + break + } + } +}