Skip to content

Commit

Permalink
feat(redpanda): add option to enable topic auto-creation
Browse files Browse the repository at this point in the history
The option to enable automatic topic creation is particularly useful for testing
environments, allowing users to produce data into topics without the need to
create them beforehand.

Signed-off-by: Ladislav Macoun <ladislavmacoun@gmail.com>
  • Loading branch information
ladislavmacoun committed Aug 2, 2023
1 parent f5a4a54 commit 6286fa8
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 4 deletions.
6 changes: 5 additions & 1 deletion modules/redpanda/mounts/bootstrap.yaml.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,8 @@ superusers:

{{- if .KafkaAPIEnableAuthorization }}
kafka_enable_authorization: true
{{- end }}
{{- end }}

{{- if .AutoCreateTopicsEnabled }}
auto_create_topics_enabled: true
{{- end }}
4 changes: 3 additions & 1 deletion modules/redpanda/mounts/redpanda.yaml.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,6 @@ schema_registry:
schema_registry_client:
brokers:
- address: localhost
port: 9093
port: 9093

auto_create_topics_enabled: {{ .AutoCreateTopicsEnabled }}
11 changes: 11 additions & 0 deletions modules/redpanda/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ type options struct {
// You must use SCRAM-SHA-256 as algorithm when authenticating on the
// Kafka API.
ServiceAccounts map[string]string

// AutoCreateTopicsEnabled is a flag to allow topic auto creation.
AutoCreateTopicsEnabled bool
}

func defaultOptions() options {
Expand All @@ -32,6 +35,7 @@ func defaultOptions() options {
KafkaAuthenticationMethod: "none",
SchemaRegistryAuthenticationMethod: "none",
ServiceAccounts: make(map[string]string, 0),
AutoCreateTopicsEnabled: false,
}
}

Expand Down Expand Up @@ -82,3 +86,10 @@ func WithEnableSchemaRegistryHTTPBasicAuth() Option {
o.SchemaRegistryAuthenticationMethod = "http_basic"
}
}

// WithAutoCreateTopics enables topic auto creation.
func WithAutoCreateTopics() Option {
return func(o *options) {
o.AutoCreateTopicsEnabled = true
}
}
8 changes: 6 additions & 2 deletions modules/redpanda/redpanda.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ func createBootstrapConfigFile(settings options) (*os.File, error) {
bootstrapTplParams := redpandaBootstrapConfigTplParams{
Superusers: settings.Superusers,
KafkaAPIEnableAuthorization: settings.KafkaEnableAuthorization,
AutoCreateTopicsEnabled: settings.AutoCreateTopicsEnabled,
}

tpl, err := template.New("bootstrap.yaml").Parse(bootstrapConfigTpl)
Expand Down Expand Up @@ -257,6 +258,7 @@ func createBootstrapConfigFile(settings options) (*os.File, error) {
// byte array.
func renderNodeConfig(settings options, hostIP string, advertisedKafkaPort int) ([]byte, error) {
tplParams := redpandaConfigTplParams{
AutoCreateTopicsEnabled: settings.AutoCreateTopicsEnabled,
KafkaAPI: redpandaConfigTplParamsKafkaAPI{
AdvertisedHost: hostIP,
AdvertisedPort: advertisedKafkaPort,
Expand Down Expand Up @@ -284,11 +286,13 @@ func renderNodeConfig(settings options, hostIP string, advertisedKafkaPort int)
type redpandaBootstrapConfigTplParams struct {
Superusers []string
KafkaAPIEnableAuthorization bool
AutoCreateTopicsEnabled bool
}

type redpandaConfigTplParams struct {
KafkaAPI redpandaConfigTplParamsKafkaAPI
SchemaRegistry redpandaConfigTplParamsSchemaRegistry
KafkaAPI redpandaConfigTplParamsKafkaAPI
SchemaRegistry redpandaConfigTplParamsSchemaRegistry
AutoCreateTopicsEnabled bool
}

type redpandaConfigTplParamsKafkaAPI struct {
Expand Down
31 changes: 31 additions & 0 deletions modules/redpanda/redpanda_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl/scram"
)
Expand Down Expand Up @@ -63,6 +64,10 @@ func TestRedpanda(t *testing.T) {
require.NoError(t, err)
defer resp.Body.Close()
assert.Equal(t, http.StatusOK, resp.StatusCode)

// Test produce to unknown topic
results := kafkaCl.ProduceSync(ctx, &kgo.Record{Topic: "test", Value: []byte("test message")})
require.Error(t, results.FirstErr(), kerr.UnknownTopicOrPartition)
}

func TestRedpandaWithAuthentication(t *testing.T) {
Expand Down Expand Up @@ -173,3 +178,29 @@ func TestRedpandaWithAuthentication(t *testing.T) {
resp.Body.Close()
}
}

func TestRedpandaProduceWithAutoCreateTopics(t *testing.T) {
ctx := context.Background()

container, err := RunContainer(ctx, WithAutoCreateTopics())
require.NoError(t, err)

t.Cleanup(func() {
if err := container.Terminate(ctx); err != nil {
t.Fatalf("failed to terminate container: %s", err)
}
})

brokers, err := container.KafkaSeedBroker(ctx)
require.NoError(t, err)

kafkaCl, err := kgo.NewClient(
kgo.SeedBrokers(brokers),
kgo.AllowAutoTopicCreation(),
)
require.NoError(t, err)
defer kafkaCl.Close()

results := kafkaCl.ProduceSync(ctx, &kgo.Record{Topic: "test", Value: []byte("test message")})
require.NoError(t, results.FirstErr())
}

0 comments on commit 6286fa8

Please sign in to comment.