Skip to content

Commit

Permalink
Clean up.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Jul 9, 2024
1 parent eea982d commit 9602c92
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 37 deletions.
86 changes: 86 additions & 0 deletions lib/kafkalib/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package kafkalib

import (
"context"
"crypto/tls"
"fmt"
"time"

awsCfg "github.com/aws/aws-sdk-go-v2/config"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2"
"github.com/segmentio/kafka-go/sasl/scram"
)

type Mechanism string

const (
Plain Mechanism = "PLAIN"
ScramSha512 Mechanism = "SCRAM-SHA-512"
AwsMskIam Mechanism = "AWS-MSK-IAM"
)

type Connection struct {
enableAWSMSKIAM bool
disableTLS bool
username string
password string
}

func NewConnection(enableAWSMSKIAM bool, disableTLS bool, username, password string) Connection {
return Connection{
enableAWSMSKIAM: enableAWSMSKIAM,
disableTLS: disableTLS,
username: username,
password: password,
}
}

func (c Connection) Mechanism() Mechanism {
if c.username != "" && c.password != "" {
return ScramSha512
}

if c.enableAWSMSKIAM {
return AwsMskIam
}

return Plain
}

func (c Connection) Dialer(ctx context.Context) (*kafka.Dialer, error) {
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
}

switch c.Mechanism() {
case ScramSha512:
mechanism, err := scram.Mechanism(scram.SHA512, c.username, c.password)
if err != nil {
return nil, fmt.Errorf("failed to create SCRAM mechanism: %w", err)
}

dialer.SASLMechanism = mechanism
if !c.disableTLS {
dialer.TLS = &tls.Config{}
}

case AwsMskIam:
_awsCfg, err := awsCfg.LoadDefaultConfig(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load aws configuration: %w", err)
}

dialer.SASLMechanism = aws_msk_iam_v2.NewMechanism(_awsCfg)
if !c.disableTLS {
dialer.TLS = &tls.Config{}
}
case Plain:
// No mechanism
default:
return nil, fmt.Errorf("unsupported kafka mechanism: %s", c.Mechanism())
}

return dialer, nil
}
47 changes: 10 additions & 37 deletions processes/consumer/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,10 @@ package consumer

import (
"context"
"crypto/tls"
"log/slog"
"sync"
"time"

awsCfg "github.com/aws/aws-sdk-go-v2/config"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2"
"github.com/segmentio/kafka-go/sasl/scram"

"github.com/artie-labs/transfer/lib/artie"
"github.com/artie-labs/transfer/lib/cdc/format"
"github.com/artie-labs/transfer/lib/config"
Expand All @@ -21,6 +15,7 @@ import (
"github.com/artie-labs/transfer/lib/logger"
"github.com/artie-labs/transfer/lib/telemetry/metrics/base"
"github.com/artie-labs/transfer/models"
"github.com/segmentio/kafka-go"
)

var topicToConsumer *TopicToConsumer
Expand Down Expand Up @@ -49,37 +44,15 @@ func (t *TopicToConsumer) Get(topic string) kafkalib.Consumer {
}

func StartConsumer(ctx context.Context, cfg config.Config, inMemDB *models.DatabaseData, dest destination.Baseline, metricsClient base.Client) {
slog.Info("Starting Kafka consumer...", slog.Any("config", cfg.Kafka))
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
}

// If using AWS MSK IAM, we expect this to be set in the ENV VAR
// (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_REGION, or the AWS Profile should be called default.)
if cfg.Kafka.EnableAWSMSKIAM {
_awsCfg, err := awsCfg.LoadDefaultConfig(ctx)
if err != nil {
logger.Panic("Failed to load aws configuration", slog.Any("err", err))
}

dialer.SASLMechanism = aws_msk_iam_v2.NewMechanism(_awsCfg)
if !cfg.Kafka.DisableTLS {
dialer.TLS = &tls.Config{}
}
}

// If username and password are provided, we'll use SCRAM w/ SHA512.
if cfg.Kafka.Username != "" {
mechanism, err := scram.Mechanism(scram.SHA512, cfg.Kafka.Username, cfg.Kafka.Password)
if err != nil {
logger.Panic("Failed to create SCRAM mechanism", slog.Any("err", err))
}

dialer.SASLMechanism = mechanism
if !cfg.Kafka.DisableTLS {
dialer.TLS = &tls.Config{}
}
kafkaConn := kafkalib.NewConnection(cfg.Kafka.EnableAWSMSKIAM, cfg.Kafka.DisableTLS, cfg.Kafka.Username, cfg.Kafka.Password)
slog.Info("Starting Kafka consumer...",
slog.Any("config", cfg.Kafka),
slog.Any("authMechanism", kafkaConn.Mechanism()),
)

dialer, err := kafkaConn.Dialer(ctx)
if err != nil {
logger.Panic("Failed to create Kafka dialer", slog.Any("err", err))
}

tcFmtMap := NewTcFmtMap()
Expand Down

0 comments on commit 9602c92

Please sign in to comment.