-
-
Notifications
You must be signed in to change notification settings - Fork 512
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add kafka (KRaft mode only) module (#1610)
* chore: scaffolding for kafka module * chore: add test for reading from a topic * docs: document methods and options * fix: golangci-lint * fix: wrong copy&paste * docs: link to KRaft docs * chore: better wait for KRaft log * chore: support for set cluster ID * chore: configure controller quorum voters based on the networks * chore: validate KRaft version is above 7.0.0 * fix: lint * chore: validate that the image namespace is the official one * docs: improve message for image validation * chore: run go mod tidy * chore: rename file to only exists for testing * chore: use concluent-local Docker image
- Loading branch information
1 parent
d6d5b0b
commit 9a340c3
Showing
12 changed files
with
883 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
# Kafka (KRaft) | ||
|
||
Not available until the next release of testcontainers-go <a href="https://github.com/testcontainers/testcontainers-go"><span class="tc-version">:material-tag: main</span></a> | ||
|
||
## Introduction | ||
|
||
The Testcontainers module for KRaft: [Apache Kafka Without ZooKeeper](https://developer.confluent.io/learn/kraft). | ||
|
||
## Adding this module to your project dependencies | ||
|
||
Please run the following command to add the Kafka module to your Go dependencies: | ||
|
||
``` | ||
go get github.com/testcontainers/testcontainers-go/modules/kafka | ||
``` | ||
|
||
## Usage example | ||
|
||
<!--codeinclude--> | ||
[Creating a Kafka container](../../modules/kafka/examples_test.go) inside_block:runKafkaContainer | ||
<!--/codeinclude--> | ||
|
||
## Module reference | ||
|
||
The Kafka module exposes one entrypoint function to create the Kafka container, and this function receives two parameters: | ||
|
||
```golang | ||
func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomizer) (*KafkaContainer, error) | ||
``` | ||
|
||
- `context.Context`, the Go context. | ||
- `testcontainers.ContainerCustomizer`, a variadic argument for passing options. | ||
|
||
### Container Options | ||
|
||
When starting the Kafka container, you can pass options in a variadic way to configure it. | ||
|
||
#### Image | ||
|
||
If you need to set a different Kafka Docker image, you can use `testcontainers.WithImage` with a valid Docker image | ||
for Kafka. E.g. `testcontainers.WithImage("confluentinc/confluent-local:7.5.0")`. | ||
|
||
!!! warning | ||
The minimal required version of Kafka for KRaft mode is `confluentinc/confluent-local:7.4.0`. If you are using an image that | ||
is different from the official one, please make sure that it's compatible with KRaft mode, as the module won't check | ||
the version for you. | ||
|
||
#### Init script | ||
|
||
The Kafka container will be started using a custom shell script: | ||
|
||
<!--codeinclude--> | ||
[Init script](../../modules/kafka/kafka.go) inside_block:starterScript | ||
<!--/codeinclude--> | ||
|
||
#### Environment variables | ||
|
||
The environment variables that are already set by default are: | ||
|
||
<!--codeinclude--> | ||
[Environment variables](../../modules/kafka/kafka.go) inside_block:envVars | ||
<!--/codeinclude--> | ||
|
||
#### Wait Strategies | ||
|
||
If you need to set a different wait strategy for Kafka, you can use `testcontainers.WithWaitStrategy` with a valid wait strategy | ||
for Kafka. | ||
|
||
!!!info | ||
The default deadline for the wait strategy is 60 seconds. | ||
|
||
At the same time, it's possible to set a wait strategy and a custom deadline with `testcontainers.WithWaitStrategyAndDeadline`. | ||
|
||
#### Docker type modifiers | ||
|
||
If you need an advanced configuration for Kafka, you can leverage the following Docker type modifiers: | ||
|
||
- `testcontainers.WithConfigModifier` | ||
- `testcontainers.WithHostConfigModifier` | ||
- `testcontainers.WithEndpointSettingsModifier` | ||
|
||
Please read the [Create containers: Advanced Settings](../features/creating_container.md#advanced-settings) documentation for more information. | ||
|
||
### Container Methods | ||
|
||
The Kafka container exposes the following methods: | ||
|
||
#### Brokers | ||
|
||
The `Brokers(ctx)` method returns the Kafka brokers as a string slice, containing the host and the random port defined by Kafka's public port (`9093/tcp`). | ||
|
||
<!--codeinclude--> | ||
[Get Kafka brokers](../../modules/kafka/kafka_test.go) inside_block:getBrokers | ||
<!--/codeinclude--> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
include ../../commons-test.mk | ||
|
||
.PHONY: test | ||
test: | ||
$(MAKE) test-kafka |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
package kafka | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/IBM/sarama" | ||
) | ||
|
||
// TestKafkaConsumer is a test consumer for Kafka | ||
type TestKafkaConsumer struct { | ||
t *testing.T | ||
ready chan bool | ||
done chan bool | ||
cancel chan bool | ||
message *sarama.ConsumerMessage | ||
} | ||
|
||
func NewTestKafkaConsumer(t *testing.T) (consumer *TestKafkaConsumer, ready <-chan bool, done <-chan bool, cancel func()) { | ||
kc := &TestKafkaConsumer{ | ||
t: t, | ||
ready: make(chan bool, 1), | ||
done: make(chan bool, 1), | ||
cancel: make(chan bool, 1), | ||
} | ||
return kc, kc.ready, kc.done, func() { | ||
kc.cancel <- true | ||
} | ||
} | ||
|
||
func (k *TestKafkaConsumer) Setup(_ sarama.ConsumerGroupSession) error { | ||
return nil | ||
} | ||
|
||
func (k *TestKafkaConsumer) Cleanup(_ sarama.ConsumerGroupSession) error { | ||
return nil | ||
} | ||
|
||
// ConsumeClaim is called by the Kafka client library when a message is received | ||
func (k *TestKafkaConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { | ||
k.ready <- true | ||
for { | ||
select { | ||
case message := <-claim.Messages(): | ||
k.message = message | ||
session.MarkMessage(message, "") | ||
k.done <- true | ||
|
||
case <-k.cancel: | ||
return nil | ||
|
||
case <-session.Context().Done(): | ||
return nil | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
package kafka_test | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
"github.com/testcontainers/testcontainers-go" | ||
"github.com/testcontainers/testcontainers-go/modules/kafka" | ||
) | ||
|
||
func ExampleRunContainer() { | ||
// runKafkaContainer { | ||
ctx := context.Background() | ||
|
||
kafkaContainer, err := kafka.RunContainer(ctx, | ||
kafka.WithClusterID("test-cluster"), | ||
testcontainers.WithImage("confluentinc/confluent-local:7.5.0"), | ||
) | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
// Clean up the container after | ||
defer func() { | ||
if err := kafkaContainer.Terminate(ctx); err != nil { | ||
panic(err) | ||
} | ||
}() | ||
// } | ||
|
||
state, err := kafkaContainer.State(ctx) | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
fmt.Println(kafkaContainer.ClusterID) | ||
fmt.Println(state.Running) | ||
|
||
// Output: | ||
// test-cluster | ||
// true | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
module github.com/testcontainers/testcontainers-go/modules/kafka | ||
|
||
go 1.20 | ||
|
||
require ( | ||
github.com/IBM/sarama v1.41.1 | ||
github.com/docker/go-connections v0.4.0 | ||
github.com/testcontainers/testcontainers-go v0.23.0 | ||
golang.org/x/mod v0.12.0 | ||
) | ||
|
||
require ( | ||
dario.cat/mergo v1.0.0 // indirect | ||
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect | ||
github.com/Microsoft/go-winio v0.6.1 // indirect | ||
github.com/Microsoft/hcsshim v0.11.0 // indirect | ||
github.com/cenkalti/backoff/v4 v4.2.1 // indirect | ||
github.com/containerd/containerd v1.7.6 // indirect | ||
github.com/cpuguy83/dockercfg v0.3.1 // indirect | ||
github.com/davecgh/go-spew v1.1.1 // indirect | ||
github.com/docker/distribution v2.8.2+incompatible // indirect | ||
github.com/docker/docker v24.0.6+incompatible // indirect | ||
github.com/docker/go-units v0.5.0 // indirect | ||
github.com/eapache/go-resiliency v1.4.0 // indirect | ||
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect | ||
github.com/eapache/queue v1.1.0 // indirect | ||
github.com/gogo/protobuf v1.3.2 // indirect | ||
github.com/golang/protobuf v1.5.3 // indirect | ||
github.com/golang/snappy v0.0.4 // indirect | ||
github.com/google/uuid v1.3.1 // indirect | ||
github.com/hashicorp/errwrap v1.1.0 // indirect | ||
github.com/hashicorp/go-multierror v1.1.1 // indirect | ||
github.com/hashicorp/go-uuid v1.0.3 // indirect | ||
github.com/jcmturner/aescts/v2 v2.0.0 // indirect | ||
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect | ||
github.com/jcmturner/gofork v1.7.6 // indirect | ||
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect | ||
github.com/jcmturner/rpc/v2 v2.0.3 // indirect | ||
github.com/klauspost/compress v1.16.7 // indirect | ||
github.com/magiconair/properties v1.8.7 // indirect | ||
github.com/moby/patternmatcher v0.5.0 // indirect | ||
github.com/moby/sys/sequential v0.5.0 // indirect | ||
github.com/moby/term v0.5.0 // indirect | ||
github.com/morikuni/aec v1.0.0 // indirect | ||
github.com/opencontainers/go-digest v1.0.0 // indirect | ||
github.com/opencontainers/image-spec v1.1.0-rc4 // indirect | ||
github.com/opencontainers/runc v1.1.5 // indirect | ||
github.com/pierrec/lz4/v4 v4.1.18 // indirect | ||
github.com/pkg/errors v0.9.1 // indirect | ||
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect | ||
github.com/sirupsen/logrus v1.9.0 // indirect | ||
golang.org/x/crypto v0.12.0 // indirect | ||
golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea // indirect | ||
golang.org/x/net v0.14.0 // indirect | ||
golang.org/x/sys v0.11.0 // indirect | ||
golang.org/x/tools v0.7.0 // indirect | ||
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect | ||
google.golang.org/grpc v1.57.0 // indirect | ||
google.golang.org/protobuf v1.30.0 // indirect | ||
) | ||
|
||
replace github.com/testcontainers/testcontainers-go => ../.. |
Oops, something went wrong.