Skip to content

Commit

Permalink
Adding a test.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Dec 14, 2023
1 parent 4f2c5e8 commit dd49113
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 4 deletions.
5 changes: 5 additions & 0 deletions lib/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"io"
"os"
"strings"

"github.com/artie-labs/transfer/lib/stringutil"

Expand Down Expand Up @@ -48,6 +49,10 @@ type Kafka struct {
TopicConfigs []*kafkalib.TopicConfig `yaml:"topicConfigs"`
}

func (k *Kafka) BootstrapServers() []string {
return strings.Split(k.BootstrapServer, ",")
}

type S3Settings struct {
OptionalPrefix string `yaml:"optionalPrefix"`
Bucket string `yaml:"bucket"`
Expand Down
26 changes: 26 additions & 0 deletions lib/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,32 @@ func TestBigQuery_DSN(t *testing.T) {
assert.Equal(t, "bigquery://project/eu/dataset", b.DSN())
}

func TestKafka_BootstrapServers(t *testing.T) {
type _tc struct {
bootstrapServerString string
expectedBootstrapServers []string
}

tcs := []_tc{
{
bootstrapServerString: "localhost:9092",
expectedBootstrapServers: []string{"localhost:9092"},
},
{
bootstrapServerString: "a:9092,b:9093,c:9094",
expectedBootstrapServers: []string{"a:9092", "b:9093", "c:9094"},
},
}

for idx, tc := range tcs {
k := Kafka{
BootstrapServer: tc.bootstrapServerString,
}

assert.Equal(t, tc.expectedBootstrapServers, k.BootstrapServers(), idx)
}
}

func TestKafka_String(t *testing.T) {
k := Kafka{
BootstrapServer: "server",
Expand Down
5 changes: 1 addition & 4 deletions processes/consumer/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package consumer
import (
"context"
"crypto/tls"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -105,11 +104,9 @@ func StartConsumer(ctx context.Context) {
GroupID: settings.Config.Kafka.GroupID,
Dialer: dialer,
Topic: topic,
Brokers: settings.Config.Kafka.BootstrapServers(),
}
var brokers []string
brokers = append(brokers, strings.Split(settings.Config.Kafka.BootstrapServer, ",")...)

kafkaCfg.Brokers = brokers
kafkaConsumer := kafka.NewReader(kafkaCfg)
topicToConsumer.Add(topic, kafkaConsumer)
for {
Expand Down

0 comments on commit dd49113

Please sign in to comment.