diff --git a/modules/redpanda/mounts/bootstrap.yaml.tpl b/modules/redpanda/mounts/bootstrap.yaml.tpl index cbc21b7503..78c7053a27 100644 --- a/modules/redpanda/mounts/bootstrap.yaml.tpl +++ b/modules/redpanda/mounts/bootstrap.yaml.tpl @@ -13,4 +13,8 @@ superusers: {{- if .KafkaAPIEnableAuthorization }} kafka_enable_authorization: true -{{- end }} \ No newline at end of file +{{- end }} + +{{- if .AutoCreateTopics }} +auto_create_topics_enabled: true +{{- end }} diff --git a/modules/redpanda/mounts/redpanda.yaml.tpl b/modules/redpanda/mounts/redpanda.yaml.tpl index dc57230d7b..9c7922d75d 100644 --- a/modules/redpanda/mounts/redpanda.yaml.tpl +++ b/modules/redpanda/mounts/redpanda.yaml.tpl @@ -37,4 +37,6 @@ schema_registry: schema_registry_client: brokers: - address: localhost - port: 9093 \ No newline at end of file + port: 9093 + +auto_create_topics_enabled: {{ .AutoCreateTopics }} diff --git a/modules/redpanda/options.go b/modules/redpanda/options.go index f140fc40b4..29a32bdb9b 100644 --- a/modules/redpanda/options.go +++ b/modules/redpanda/options.go @@ -23,6 +23,9 @@ type options struct { // You must use SCRAM-SHA-256 as algorithm when authenticating on the // Kafka API. ServiceAccounts map[string]string + + // AutoCreateTopics is a flag to allow topic auto creation. + AutoCreateTopics bool } func defaultOptions() options { @@ -32,6 +35,7 @@ func defaultOptions() options { KafkaAuthenticationMethod: "none", SchemaRegistryAuthenticationMethod: "none", ServiceAccounts: make(map[string]string, 0), + AutoCreateTopics: false, } } @@ -82,3 +86,10 @@ func WithEnableSchemaRegistryHTTPBasicAuth() Option { o.SchemaRegistryAuthenticationMethod = "http_basic" } } + +// WithAutoCreateTopics enables topic auto creation. +func WithAutoCreateTopics() Option { + return func(o *options) { + o.AutoCreateTopics = true + } +} diff --git a/modules/redpanda/redpanda.go b/modules/redpanda/redpanda.go index f429add368..cc373b42cd 100644 --- a/modules/redpanda/redpanda.go +++ b/modules/redpanda/redpanda.go @@ -229,6 +229,7 @@ func createBootstrapConfigFile(settings options) (*os.File, error) { bootstrapTplParams := redpandaBootstrapConfigTplParams{ Superusers: settings.Superusers, KafkaAPIEnableAuthorization: settings.KafkaEnableAuthorization, + AutoCreateTopics: settings.AutoCreateTopics, } tpl, err := template.New("bootstrap.yaml").Parse(bootstrapConfigTpl) @@ -257,6 +258,7 @@ func createBootstrapConfigFile(settings options) (*os.File, error) { // byte array. func renderNodeConfig(settings options, hostIP string, advertisedKafkaPort int) ([]byte, error) { tplParams := redpandaConfigTplParams{ + AutoCreateTopics: settings.AutoCreateTopics, KafkaAPI: redpandaConfigTplParamsKafkaAPI{ AdvertisedHost: hostIP, AdvertisedPort: advertisedKafkaPort, @@ -284,11 +286,13 @@ func renderNodeConfig(settings options, hostIP string, advertisedKafkaPort int) type redpandaBootstrapConfigTplParams struct { Superusers []string KafkaAPIEnableAuthorization bool + AutoCreateTopics bool } type redpandaConfigTplParams struct { - KafkaAPI redpandaConfigTplParamsKafkaAPI - SchemaRegistry redpandaConfigTplParamsSchemaRegistry + KafkaAPI redpandaConfigTplParamsKafkaAPI + SchemaRegistry redpandaConfigTplParamsSchemaRegistry + AutoCreateTopics bool } type redpandaConfigTplParamsKafkaAPI struct { diff --git a/modules/redpanda/redpanda_test.go b/modules/redpanda/redpanda_test.go index 346876662a..af7a5244ef 100644 --- a/modules/redpanda/redpanda_test.go +++ b/modules/redpanda/redpanda_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/twmb/franz-go/pkg/kadm" + "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/sasl/scram" ) @@ -35,6 +36,7 @@ func TestRedpanda(t *testing.T) { kgo.SeedBrokers(seedBroker), ) require.NoError(t, err) + defer kafkaCl.Close() kafkaAdmCl := kadm.NewClient(kafkaCl) metadata, err := kafkaAdmCl.Metadata(ctx) @@ -63,6 +65,10 @@ func TestRedpanda(t *testing.T) { require.NoError(t, err) defer resp.Body.Close() assert.Equal(t, http.StatusOK, resp.StatusCode) + + // Test produce to unknown topic + results := kafkaCl.ProduceSync(ctx, &kgo.Record{Topic: "test", Value: []byte("test message")}) + require.Error(t, results.FirstErr(), kerr.UnknownTopicOrPartition) } func TestRedpandaWithAuthentication(t *testing.T) { @@ -173,3 +179,29 @@ func TestRedpandaWithAuthentication(t *testing.T) { resp.Body.Close() } } + +func TestRedpandaProduceWithAutoCreateTopics(t *testing.T) { + ctx := context.Background() + + container, err := RunContainer(ctx, WithAutoCreateTopics()) + require.NoError(t, err) + + t.Cleanup(func() { + if err := container.Terminate(ctx); err != nil { + t.Fatalf("failed to terminate container: %s", err) + } + }) + + brokers, err := container.KafkaSeedBroker(ctx) + require.NoError(t, err) + + kafkaCl, err := kgo.NewClient( + kgo.SeedBrokers(brokers), + kgo.AllowAutoTopicCreation(), + ) + require.NoError(t, err) + defer kafkaCl.Close() + + results := kafkaCl.ProduceSync(ctx, &kgo.Record{Topic: "test", Value: []byte("test message")}) + require.NoError(t, results.FirstErr()) +}