Skip to content

Commit

Permalink
Clean up.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Jul 9, 2024
1 parent 9602c92 commit fd17710
Showing 1 changed file with 35 additions and 0 deletions.
35 changes: 35 additions & 0 deletions lib/kafkalib/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,38 @@ func (c Connection) Dialer(ctx context.Context) (*kafka.Dialer, error) {

return dialer, nil
}

func (c Connection) Transport() (*kafka.Transport, error) {
transport := &kafka.Transport{
DialTimeout: 10 * time.Second,
}

switch c.Mechanism() {
case ScramSha512:
mechanism, err := scram.Mechanism(scram.SHA512, c.username, c.password)
if err != nil {
return nil, fmt.Errorf("failed to create SCRAM mechanism: %w", err)
}

transport.SASL = mechanism
if !c.disableTLS {
transport.TLS = &tls.Config{}
}
case AwsMskIam:
_awsCfg, err := awsCfg.LoadDefaultConfig(context.Background())
if err != nil {
return nil, fmt.Errorf("failed to load AWS configuration: %w", err)
}

transport.SASL = aws_msk_iam_v2.NewMechanism(_awsCfg)
if !c.disableTLS {
transport.TLS = &tls.Config{}
}
case Plain:
// No mechanism
default:
return nil, fmt.Errorf("unsupported kafka mechanism: %s", c.Mechanism())
}

return transport, nil
}

0 comments on commit fd17710

Please sign in to comment.