From bc8715ed4b5f4ce7c7901acddffdb96d746fa983 Mon Sep 17 00:00:00 2001 From: Martin Sucha Date: Wed, 23 Aug 2023 10:55:16 +0200 Subject: [PATCH] Handle errors in log producer gracefully If the context is done, we close the log producer. That is not an error, the context cancellation signals that the consumer should stop. If there is a non-context error during the HTTP call or while reading the response, retry the HTTP request in 1 second again. Previously, the error handling was inconsistent: - an error while reading HTTP response headers would retry the HTTP request - but an error while reading the body would just end the log producer With this commit, the error handling should be more consistent. --- docker.go | 35 +++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/docker.go b/docker.go index 7e084a88ee..98a043ef30 100644 --- a/docker.go +++ b/docker.go @@ -51,7 +51,7 @@ const ( ReaperDefault = "reaper_default" // Default network name when bridge is not available packagePath = "github.com/testcontainers/testcontainers-go" - logStoppedForOutOfSyncMessage = "Stopping log consumer: Headers out of sync" + logRestartedForOutOfSyncMessage = "headers out of sync, will retry" ) // DockerContainer represents a container started using Docker @@ -633,25 +633,27 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { r, err := c.provider.client.ContainerLogs(ctx, c.GetContainerID(), options) if err != nil { // if we can't get the logs, retry in one second. - c.logger.Printf("cannot get logs for container %q: %v", c.ID, err) if ctx.Err() != nil { // context done. return } + c.logger.Printf("cannot get logs for container %q: %v", c.ID, err) time.Sleep(1 * time.Second) goto BEGIN } defer c.provider.Close() for { - select { - case <-ctx.Done(): + if ctx.Err() != nil { err := r.Close() if err != nil { // we can't close the read closer, this should never happen panic(err) } - return + } + select { + case <-ctx.Done(): + continue default: h := make([]byte, 8) _, err := io.ReadFull(r, h) @@ -662,13 +664,15 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { since = fmt.Sprintf("%d.%09d", now.Unix(), int64(now.Nanosecond())) goto BEGIN } - if errors.Is(err, context.DeadlineExceeded) { - // Probably safe to continue here + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + // If the outer context is done, loop will exit in the next iteration. continue } - _, _ = fmt.Fprintf(os.Stderr, "container log error: %+v. %s", err, logStoppedForOutOfSyncMessage) + _, _ = fmt.Fprintf(os.Stderr, "read log header: %+v. %s", err, logRestartedForOutOfSyncMessage) // if we would continue here, the next header-read will result into random data... - return + // we need to restart the whole request. + time.Sleep(1 * time.Second) + goto BEGIN } count := binary.BigEndian.Uint32(h[4:]) @@ -690,13 +694,16 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { if err != nil { // TODO: add-logger: use logger to log out this error _, _ = fmt.Fprintf(os.Stderr, "error occurred reading log with known length %s", err.Error()) - if errors.Is(err, context.DeadlineExceeded) { - // Probably safe to continue here + + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + // If the outer context is done, loop will exit in the next iteration. continue } - // we can not continue here as the next read most likely will not be the next header - _, _ = fmt.Fprintln(os.Stderr, logStoppedForOutOfSyncMessage) - return + // if we would continue here, the next header-read will result into random data... + // we need to restart the whole request. + _, _ = fmt.Fprintf(os.Stderr, "read log message: %+v. %s", err, logRestartedForOutOfSyncMessage) + time.Sleep(1 * time.Second) + goto BEGIN } for _, c := range c.consumers { c.Accept(Log{