Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(redpanda): Add option to enable topic auto-creation #1360

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion modules/redpanda/mounts/bootstrap.yaml.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,8 @@ superusers:

{{- if .KafkaAPIEnableAuthorization }}
kafka_enable_authorization: true
{{- end }}
{{- end }}

{{- if .AutoCreateTopics }}
auto_create_topics_enabled: true
{{- end }}
4 changes: 3 additions & 1 deletion modules/redpanda/mounts/redpanda.yaml.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,6 @@ schema_registry:
schema_registry_client:
brokers:
- address: localhost
port: 9093
port: 9093

auto_create_topics_enabled: {{ .AutoCreateTopics }}
11 changes: 11 additions & 0 deletions modules/redpanda/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ type options struct {
// You must use SCRAM-SHA-256 as algorithm when authenticating on the
// Kafka API.
ServiceAccounts map[string]string

// AutoCreateTopics is a flag to allow topic auto creation.
AutoCreateTopics bool
}

func defaultOptions() options {
Expand All @@ -32,6 +35,7 @@ func defaultOptions() options {
KafkaAuthenticationMethod: "none",
SchemaRegistryAuthenticationMethod: "none",
ServiceAccounts: make(map[string]string, 0),
AutoCreateTopics: false,
}
}

Expand Down Expand Up @@ -82,3 +86,10 @@ func WithEnableSchemaRegistryHTTPBasicAuth() Option {
o.SchemaRegistryAuthenticationMethod = "http_basic"
}
}

// WithAutoCreateTopics enables topic auto creation.
func WithAutoCreateTopics() Option {
return func(o *options) {
o.AutoCreateTopics = true
}
}
8 changes: 6 additions & 2 deletions modules/redpanda/redpanda.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ func createBootstrapConfigFile(settings options) (*os.File, error) {
bootstrapTplParams := redpandaBootstrapConfigTplParams{
Superusers: settings.Superusers,
KafkaAPIEnableAuthorization: settings.KafkaEnableAuthorization,
AutoCreateTopics: settings.AutoCreateTopics,
}

tpl, err := template.New("bootstrap.yaml").Parse(bootstrapConfigTpl)
Expand Down Expand Up @@ -257,6 +258,7 @@ func createBootstrapConfigFile(settings options) (*os.File, error) {
// byte array.
func renderNodeConfig(settings options, hostIP string, advertisedKafkaPort int) ([]byte, error) {
tplParams := redpandaConfigTplParams{
AutoCreateTopics: settings.AutoCreateTopics,
KafkaAPI: redpandaConfigTplParamsKafkaAPI{
AdvertisedHost: hostIP,
AdvertisedPort: advertisedKafkaPort,
Expand Down Expand Up @@ -284,11 +286,13 @@ func renderNodeConfig(settings options, hostIP string, advertisedKafkaPort int)
type redpandaBootstrapConfigTplParams struct {
Superusers []string
KafkaAPIEnableAuthorization bool
AutoCreateTopics bool
}

type redpandaConfigTplParams struct {
KafkaAPI redpandaConfigTplParamsKafkaAPI
SchemaRegistry redpandaConfigTplParamsSchemaRegistry
KafkaAPI redpandaConfigTplParamsKafkaAPI
SchemaRegistry redpandaConfigTplParamsSchemaRegistry
AutoCreateTopics bool
}

type redpandaConfigTplParamsKafkaAPI struct {
Expand Down
32 changes: 32 additions & 0 deletions modules/redpanda/redpanda_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl/scram"
)
Expand All @@ -35,6 +36,7 @@ func TestRedpanda(t *testing.T) {
kgo.SeedBrokers(seedBroker),
)
require.NoError(t, err)
defer kafkaCl.Close()

kafkaAdmCl := kadm.NewClient(kafkaCl)
metadata, err := kafkaAdmCl.Metadata(ctx)
Expand Down Expand Up @@ -63,6 +65,10 @@ func TestRedpanda(t *testing.T) {
require.NoError(t, err)
defer resp.Body.Close()
assert.Equal(t, http.StatusOK, resp.StatusCode)

// Test produce to unknown topic
results := kafkaCl.ProduceSync(ctx, &kgo.Record{Topic: "test", Value: []byte("test message")})
require.Error(t, results.FirstErr(), kerr.UnknownTopicOrPartition)
}

func TestRedpandaWithAuthentication(t *testing.T) {
Expand Down Expand Up @@ -173,3 +179,29 @@ func TestRedpandaWithAuthentication(t *testing.T) {
resp.Body.Close()
}
}

func TestRedpandaProduceWithAutoCreateTopics(t *testing.T) {
ctx := context.Background()

container, err := RunContainer(ctx, WithAutoCreateTopics())
require.NoError(t, err)

t.Cleanup(func() {
if err := container.Terminate(ctx); err != nil {
t.Fatalf("failed to terminate container: %s", err)
}
})

brokers, err := container.KafkaSeedBroker(ctx)
require.NoError(t, err)

kafkaCl, err := kgo.NewClient(
kgo.SeedBrokers(brokers),
kgo.AllowAutoTopicCreation(),
)
require.NoError(t, err)
defer kafkaCl.Close()

results := kafkaCl.ProduceSync(ctx, &kgo.Record{Topic: "test", Value: []byte("test message")})
require.NoError(t, results.FirstErr())
}