Skip to content

Commit

Permalink
Merge branch 'main' into fix/compose-containers
Browse files Browse the repository at this point in the history
* main:
  fix!: data races (#2843)
  fix: mongodb replicaset should work with auth (#2847)
  chore: use require.(No)Error(t,err) instead of t.Fatal(err) (#2851)
  fix: simplify fully-qualified image names (#2846)
  • Loading branch information
mdelapenya committed Oct 28, 2024
2 parents 30ba8c3 + 032a69f commit d5a1383
Show file tree
Hide file tree
Showing 95 changed files with 635 additions and 682 deletions.
3 changes: 2 additions & 1 deletion commons-test.mk
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ test-%: $(GOBIN)/gotestsum
-- \
-v \
-coverprofile=coverage.out \
-timeout=30m
-timeout=30m \
-race

.PHONY: tools
tools:
Expand Down
6 changes: 3 additions & 3 deletions container_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"
)

func TestContainerFileValidation(t *testing.T) {
Expand All @@ -17,9 +19,7 @@ func TestContainerFileValidation(t *testing.T) {
}

f, err := os.Open(filepath.Join(".", "testdata", "hello.sh"))
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

testTable := []ContainerFileValidationTestCase{
{
Expand Down
14 changes: 7 additions & 7 deletions container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func Test_BuildImageWithContexts(t *testing.T) {
}{
{
Name: "Dockerfile",
Contents: `FROM docker.io/alpine
Contents: `FROM alpine
CMD ["echo", "this is from the archive"]`,
},
}
Expand Down Expand Up @@ -216,7 +216,7 @@ func Test_BuildImageWithContexts(t *testing.T) {
},
{
Name: "Dockerfile",
Contents: `FROM docker.io/alpine
Contents: `FROM alpine
WORKDIR /app
COPY . .
CMD ["sh", "./say_hi.sh"]`,
Expand Down Expand Up @@ -365,7 +365,7 @@ func Test_GetLogsFromFailedContainer(t *testing.T) {
ctx := context.Background()
// directDockerHubReference {
req := testcontainers.ContainerRequest{
Image: "docker.io/alpine",
Image: "alpine",
Cmd: []string{"echo", "-n", "I was not expecting this"},
WaitingFor: wait.ForLog("I was expecting this").WithStartupTimeout(5 * time.Second),
}
Expand All @@ -392,11 +392,11 @@ func Test_GetLogsFromFailedContainer(t *testing.T) {
type dockerImageSubstitutor struct{}

func (s dockerImageSubstitutor) Description() string {
return "DockerImageSubstitutor (prepends docker.io)"
return "DockerImageSubstitutor (prepends registry.hub.docker.com)"
}

func (s dockerImageSubstitutor) Substitute(image string) (string, error) {
return "docker.io/" + image, nil
return "registry.hub.docker.com/library/" + image, nil
}

// }
Expand Down Expand Up @@ -455,7 +455,7 @@ func TestImageSubstitutors(t *testing.T) {
name: "Prepend namespace",
image: "alpine",
substitutors: []testcontainers.ImageSubstitutor{dockerImageSubstitutor{}},
expectedImage: "docker.io/alpine",
expectedImage: "registry.hub.docker.com/library/alpine",
},
{
name: "Substitution with error",
Expand Down Expand Up @@ -554,5 +554,5 @@ func ExampleGenericContainer_withSubstitutors() {

fmt.Println(dockerContainer.Image)

// Output: docker.io/alpine:latest
// Output: registry.hub.docker.com/library/alpine:latest
}
177 changes: 95 additions & 82 deletions docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,11 +762,15 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro
// Setup the log production context which will be used to stop the log production.
c.logProductionCtx, c.logProductionCancel = context.WithCancelCause(ctx)

go func() {
err := c.logProducer(stdout, stderr)
// Set context cancel cause, if not already set.
c.logProductionCancel(err)
}()
// We capture context cancel function to avoid data race with multiple
// calls to startLogProduction.
go func(cancel context.CancelCauseFunc) {
// Ensure the context is cancelled when log productions completes
// so that GetLogProductionErrorChannel functions correctly.
defer cancel(nil)

c.logProducer(stdout, stderr)
}(c.logProductionCancel)

return nil
}
Expand All @@ -775,40 +779,49 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro
// - logProductionCtx is done
// - A fatal error occurs
// - No more logs are available
func (c *DockerContainer) logProducer(stdout, stderr io.Writer) error {
func (c *DockerContainer) logProducer(stdout, stderr io.Writer) {
// Clean up idle client connections.
defer c.provider.Close()

// Setup the log options, start from the beginning.
options := container.LogsOptions{
options := &container.LogsOptions{
ShowStdout: true,
ShowStderr: true,
Follow: true,
}

for {
timeoutCtx, cancel := context.WithTimeout(c.logProductionCtx, *c.logProductionTimeout)
defer cancel()
// Use a separate method so that timeout cancel function is
// called correctly.
for c.copyLogsTimeout(stdout, stderr, options) {
}
}

err := c.copyLogs(timeoutCtx, stdout, stderr, options)
switch {
case err == nil:
// No more logs available.
return nil
case c.logProductionCtx.Err() != nil:
// Log production was stopped or caller context is done.
return nil
case timeoutCtx.Err() != nil, errors.Is(err, net.ErrClosed):
// Timeout or client connection closed, retry.
default:
// Unexpected error, retry.
Logger.Printf("Unexpected error reading logs: %v", err)
}
// copyLogsTimeout copies logs from the container to stdout and stderr with a timeout.
// It returns true if the log production should be retried, false otherwise.
func (c *DockerContainer) copyLogsTimeout(stdout, stderr io.Writer, options *container.LogsOptions) bool {
timeoutCtx, cancel := context.WithTimeout(c.logProductionCtx, *c.logProductionTimeout)
defer cancel()

// Retry from the last log received.
now := time.Now()
options.Since = fmt.Sprintf("%d.%09d", now.Unix(), int64(now.Nanosecond()))
err := c.copyLogs(timeoutCtx, stdout, stderr, *options)
switch {
case err == nil:
// No more logs available.
return false
case c.logProductionCtx.Err() != nil:
// Log production was stopped or caller context is done.
return false
case timeoutCtx.Err() != nil, errors.Is(err, net.ErrClosed):
// Timeout or client connection closed, retry.
default:
// Unexpected error, retry.
Logger.Printf("Unexpected error reading logs: %v", err)
}

// Retry from the last log received.
now := time.Now()
options.Since = fmt.Sprintf("%d.%09d", now.Unix(), int64(now.Nanosecond()))

return true
}

// copyLogs copies logs from the container to stdout and stderr.
Expand Down Expand Up @@ -866,10 +879,12 @@ func (c *DockerContainer) GetLogProductionErrorChannel() <-chan error {
}

errCh := make(chan error, 1)
go func() {
<-c.logProductionCtx.Done()
errCh <- context.Cause(c.logProductionCtx)
}()
go func(ctx context.Context) {
<-ctx.Done()
errCh <- context.Cause(ctx)
close(errCh)
}(c.logProductionCtx)

return errCh
}

Expand Down Expand Up @@ -1011,29 +1026,26 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque
// defer the close of the Docker client connection the soonest
defer p.Close()

// Make sure that bridge network exists
// In case it is disabled we will create reaper_default network
if p.DefaultNetwork == "" {
p.DefaultNetwork, err = p.getDefaultNetwork(ctx, p.client)
if err != nil {
return nil, err
}
var defaultNetwork string
defaultNetwork, err = p.ensureDefaultNetwork(ctx)
if err != nil {
return nil, fmt.Errorf("ensure default network: %w", err)
}

// If default network is not bridge make sure it is attached to the request
// as container won't be attached to it automatically
// in case of Podman the bridge network is called 'podman' as 'bridge' would conflict
if p.DefaultNetwork != p.defaultBridgeNetworkName {
if defaultNetwork != p.defaultBridgeNetworkName {
isAttached := false
for _, net := range req.Networks {
if net == p.DefaultNetwork {
if net == defaultNetwork {
isAttached = true
break
}
}

if !isAttached {
req.Networks = append(req.Networks, p.DefaultNetwork)
req.Networks = append(req.Networks, defaultNetwork)
}
}

Expand Down Expand Up @@ -1478,12 +1490,8 @@ func (p *DockerProvider) CreateNetwork(ctx context.Context, req NetworkRequest)
// defer the close of the Docker client connection the soonest
defer p.Close()

// Make sure that bridge network exists
// In case it is disabled we will create reaper_default network
if p.DefaultNetwork == "" {
if p.DefaultNetwork, err = p.getDefaultNetwork(ctx, p.client); err != nil {
return nil, err
}
if _, err = p.ensureDefaultNetwork(ctx); err != nil {
return nil, fmt.Errorf("ensure default network: %w", err)
}

if req.Labels == nil {
Expand Down Expand Up @@ -1554,14 +1562,12 @@ func (p *DockerProvider) GetNetwork(ctx context.Context, req NetworkRequest) (ne

func (p *DockerProvider) GetGatewayIP(ctx context.Context) (string, error) {
// Use a default network as defined in the DockerProvider
if p.DefaultNetwork == "" {
var err error
p.DefaultNetwork, err = p.getDefaultNetwork(ctx, p.client)
if err != nil {
return "", err
}
defaultNetwork, err := p.ensureDefaultNetwork(ctx)
if err != nil {
return "", fmt.Errorf("ensure default network: %w", err)
}
nw, err := p.GetNetwork(ctx, NetworkRequest{Name: p.DefaultNetwork})

nw, err := p.GetNetwork(ctx, NetworkRequest{Name: defaultNetwork})
if err != nil {
return "", err
}
Expand All @@ -1580,43 +1586,50 @@ func (p *DockerProvider) GetGatewayIP(ctx context.Context) (string, error) {
return ip, nil
}

func (p *DockerProvider) getDefaultNetwork(ctx context.Context, cli client.APIClient) (string, error) {
// Get list of available networks
networkResources, err := cli.NetworkList(ctx, network.ListOptions{})
if err != nil {
return "", err
}
// ensureDefaultNetwork ensures that defaultNetwork is set and creates
// it if it does not exist, returning its value.
// It is safe to call this method concurrently.
func (p *DockerProvider) ensureDefaultNetwork(ctx context.Context) (string, error) {
p.mtx.Lock()
defer p.mtx.Unlock()

reaperNetwork := ReaperDefault
if p.defaultNetwork != "" {
// Already set.
return p.defaultNetwork, nil
}

reaperNetworkExists := false
networkResources, err := p.client.NetworkList(ctx, network.ListOptions{})
if err != nil {
return "", fmt.Errorf("network list: %w", err)
}

for _, net := range networkResources {
if net.Name == p.defaultBridgeNetworkName {
return p.defaultBridgeNetworkName, nil
}

if net.Name == reaperNetwork {
reaperNetworkExists = true
switch net.Name {
case p.defaultBridgeNetworkName:
p.defaultNetwork = p.defaultBridgeNetworkName
return p.defaultNetwork, nil
case ReaperDefault:
p.defaultNetwork = ReaperDefault
return p.defaultNetwork, nil
}
}

// Create a bridge network for the container communications
if !reaperNetworkExists {
_, err = cli.NetworkCreate(ctx, reaperNetwork, network.CreateOptions{
Driver: Bridge,
Attachable: true,
Labels: GenericLabels(),
})
// If the network already exists, we can ignore the error as that can
// happen if we are running multiple tests in parallel and we only
// need to ensure that the network exists.
if err != nil && !errdefs.IsConflict(err) {
return "", err
}
// Create a bridge network for the container communications.
_, err = p.client.NetworkCreate(ctx, ReaperDefault, network.CreateOptions{
Driver: Bridge,
Attachable: true,
Labels: GenericLabels(),
})
// If the network already exists, we can ignore the error as that can
// happen if we are running multiple tests in parallel and we only
// need to ensure that the network exists.
if err != nil && !errdefs.IsConflict(err) {
return "", fmt.Errorf("network create: %w", err)
}

return reaperNetwork, nil
p.defaultNetwork = ReaperDefault

return p.defaultNetwork, nil
}

// ContainerFromType builds a Docker container struct from the response of the Docker API
Expand Down
Loading

0 comments on commit d5a1383

Please sign in to comment.