Skip to content

Commit

Permalink
Merge branch 'main' into dynamodb-module
Browse files Browse the repository at this point in the history
* main:
  fix(redpanda): wait for (#2794)
  fix(elasticsearch): wait for (#2724)
  • Loading branch information
mdelapenya committed Sep 27, 2024
2 parents 17f234d + 9562594 commit 4fbae32
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 97 deletions.
155 changes: 89 additions & 66 deletions modules/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package elasticsearch

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"os"
Expand All @@ -15,6 +17,7 @@ const (
defaultTCPPort = "9300"
defaultPassword = "changeme"
defaultUsername = "elastic"
defaultCaCertPath = "/usr/share/elasticsearch/config/certs/http_ca.crt"
minimalImageVersion = "7.9.2"
)

Expand All @@ -32,7 +35,7 @@ type ElasticsearchContainer struct {
}

// Deprecated: use Run instead
// RunContainer creates an instance of the Couchbase container type
// RunContainer creates an instance of the Elasticsearch container type
func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomizer) (*ElasticsearchContainer, error) {
return Run(ctx, "docker.elastic.co/elasticsearch/elasticsearch:7.9.2", opts...)
}
Expand All @@ -50,54 +53,41 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
defaultHTTPPort + "/tcp",
defaultTCPPort + "/tcp",
},
// regex that
// matches 8.3 JSON logging with started message and some follow up content within the message field
// matches 8.0 JSON logging with no whitespace between message field and content
// matches 7.x JSON logging with whitespace between message field and content
// matches 6.x text logging with node name in brackets and just a 'started' message till the end of the line
WaitingFor: wait.ForLog(`.*("message":\s?"started(\s|")?.*|]\sstarted\n)`).AsRegexp(),
LifecycleHooks: []testcontainers.ContainerLifecycleHooks{
{
// the container needs a post create hook to set the default JVM options in a file
PostCreates: []testcontainers.ContainerHook{},
PostReadies: []testcontainers.ContainerHook{},
},
},
},
Started: true,
}

// Gather all config options (defaults and then apply provided options)
settings := defaultOptions()
options := defaultOptions()
for _, opt := range opts {
if apply, ok := opt.(Option); ok {
apply(settings)
apply(options)
}
if err := opt.Customize(&req); err != nil {
return nil, err
}
}

// Transfer the certificate settings to the container request
err := configureCertificate(settings, &req)
if err != nil {
return nil, err
}

// Transfer the password settings to the container request
err = configurePassword(settings, &req)
if err != nil {
if err := configurePassword(options, &req); err != nil {
return nil, err
}

if isAtLeastVersion(req.Image, 7) {
req.LifecycleHooks[0].PostCreates = append(req.LifecycleHooks[0].PostCreates, configureJvmOpts)
req.LifecycleHooks = append(req.LifecycleHooks,
testcontainers.ContainerLifecycleHooks{
PostCreates: []testcontainers.ContainerHook{configureJvmOpts},
},
)
}

// Set the default waiting strategy if not already set.
setWaitFor(options, &req.ContainerRequest)

container, err := testcontainers.GenericContainer(ctx, req)
var esContainer *ElasticsearchContainer
if container != nil {
esContainer = &ElasticsearchContainer{Container: container, Settings: *settings}
esContainer = &ElasticsearchContainer{Container: container, Settings: *options}
}
if err != nil {
return esContainer, fmt.Errorf("generic container: %w", err)
Expand All @@ -110,6 +100,61 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
return esContainer, nil
}

// certWriter is a helper that writes the details of a CA cert to options.
type certWriter struct {
options *Options
certPool *x509.CertPool
}

// Read reads the CA cert from the reader and appends it to the options.
func (w *certWriter) Read(r io.Reader) error {
buf, err := io.ReadAll(r)
if err != nil {
return fmt.Errorf("read CA cert: %w", err)
}

w.options.CACert = buf
w.certPool.AppendCertsFromPEM(w.options.CACert)

return nil
}

// setWaitFor sets the req.WaitingFor strategy based on settings.
func setWaitFor(options *Options, req *testcontainers.ContainerRequest) {
var strategies []wait.Strategy
if req.WaitingFor != nil {
// Custom waiting strategy, ensure we honour it.
strategies = append(strategies, req.WaitingFor)
}

waitHTTP := wait.ForHTTP("/").WithPort(defaultHTTPPort)
if sslRequired(req) {
waitHTTP = waitHTTP.WithTLS(true).WithAllowInsecure(true)
cw := &certWriter{
options: options,
certPool: x509.NewCertPool(),
}

waitHTTP = waitHTTP.
WithTLS(true, &tls.Config{RootCAs: cw.certPool})

strategies = append(strategies, wait.ForFile(defaultCaCertPath).WithMatcher(cw.Read))
}

if options.Password != "" || options.Username != "" {
waitHTTP = waitHTTP.WithBasicAuth(options.Username, options.Password)
}

strategies = append(strategies, waitHTTP)

if len(strategies) > 1 {
req.WaitingFor = wait.ForAll(strategies...)
return
}

req.WaitingFor = strategies[0]
}

// configureAddress sets the address of the Elasticsearch container.
// If the certificate is set, it will use https as protocol, otherwise http.
func (c *ElasticsearchContainer) configureAddress(ctx context.Context) error {
Expand All @@ -133,50 +178,28 @@ func (c *ElasticsearchContainer) configureAddress(ctx context.Context) error {
return nil
}

// configureCertificate transfers the certificate settings to the container request.
// For that, it defines a post start hook that copies the certificate from the container to the host.
// The certificate is only available since version 8, and will be located in a well-known location.
func configureCertificate(settings *Options, req *testcontainers.GenericContainerRequest) error {
if isAtLeastVersion(req.Image, 8) {
// These configuration keys explicitly disable CA generation.
// If any are set we skip the file retrieval.
configKeys := []string{
"xpack.security.enabled",
"xpack.security.http.ssl.enabled",
"xpack.security.transport.ssl.enabled",
}
for _, configKey := range configKeys {
if value, ok := req.Env[configKey]; ok {
if value == "false" {
return nil
}
// sslRequired returns true if the SSL is required, otherwise false.
func sslRequired(req *testcontainers.ContainerRequest) bool {
if !isAtLeastVersion(req.Image, 8) {
return false
}

// These configuration keys explicitly disable CA generation.
// If any are set we skip the file retrieval.
configKeys := []string{
"xpack.security.enabled",
"xpack.security.http.ssl.enabled",
"xpack.security.transport.ssl.enabled",
}
for _, configKey := range configKeys {
if value, ok := req.Env[configKey]; ok {
if value == "false" {
return false
}
}

// The container needs a post ready hook to copy the certificate from the container to the host.
// This certificate is only available since version 8
req.LifecycleHooks[0].PostReadies = append(req.LifecycleHooks[0].PostReadies,
func(ctx context.Context, container testcontainers.Container) error {
const defaultCaCertPath = "/usr/share/elasticsearch/config/certs/http_ca.crt"

readCloser, err := container.CopyFileFromContainer(ctx, defaultCaCertPath)
if err != nil {
return err
}

// receive the bytes from the default location
certBytes, err := io.ReadAll(readCloser)
if err != nil {
return err
}

settings.CACert = certBytes

return nil
})
}

return nil
return true
}

// configurePassword transfers the password settings to the container request.
Expand Down
1 change: 0 additions & 1 deletion modules/elasticsearch/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ type Options struct {

func defaultOptions() *Options {
return &Options{
CACert: nil,
Username: defaultUsername,
}
}
Expand Down
Loading

0 comments on commit 4fbae32

Please sign in to comment.