Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to: Writer + TLS and SASL (kafka.Dialer in Writer.Transport?) #898

Closed
fracasula opened this issue Apr 26, 2022 · 4 comments
Closed

How to: Writer + TLS and SASL (kafka.Dialer in Writer.Transport?) #898

fracasula opened this issue Apr 26, 2022 · 4 comments

Comments

@fracasula
Copy link
Contributor

fracasula commented Apr 26, 2022

Can somebody provide an example of configuration with a writer + SASL and TLS?

This is what I tried so far:

func TestProducer(t *testing.T) {
	pool, err := dockertest.NewPool("")
	require.NoError(t, err)

	kafkaContainer, err := destination.SetupKafka(pool, &testCleanup{t}, t)
	require.NoError(t, err)

	kafkaHost := fmt.Sprintf("localhost:%s", kafkaContainer.Port)
	transport := &kafka.Transport{
		DialTimeout: 10 * time.Second,
		Dial: func(ctx context.Context, network string, address string) (net.Conn, error) {
			return kafka.DialContext(ctx, network, address)
		},
	}
	defer transport.CloseIdleConnections()
	w := &kafka.Writer{
		Addr:                   kafka.TCP(kafkaHost),
		Topic:                  t.Name(),
		Balancer:               &kafka.Hash{},
		AllowAutoTopicCreation: true,
		Transport:              transport,
	}
	require.Eventually(t, func() bool {
		err = w.WriteMessages(context.Background(),
			kafka.Message{
				Key:   []byte("Key-A"),
				Value: []byte("Hello World!"),
			},
			kafka.Message{
				Key:   []byte("Key-B"),
				Value: []byte("One!"),
			},
			kafka.Message{
				Key:   []byte("Key-C"),
				Value: []byte("Two!"),
			},
		)
		t.Log(err)
		return err == nil
	}, 30*time.Second, time.Second)
}

With the above I get:

=== RUN   TestProducer
    kafka.go:72: KAFKA_ZOOKEEPER_CONNECT: localhost: 37557
    kafka.go:79: broker Port: 38283/tcp
    kafka.go:86: localhost Port: 43483/tcp
    kafka.go:89: KAFKA_ADVERTISED_LISTENERS INTERNAL://broker:9090,EXTERNAL://localhost:43483
    kafka.go:118: Kafka PORT:  43483
    client_test.go:76: read tcp [::1]:38612->[::1]:43483: read: connection reset by peer
    client_test.go:76: read tcp [::1]:38612->[::1]:43483: read: connection reset by peer
    client_test.go:76: read tcp [::1]:38612->[::1]:43483: read: connection reset by peer
    client_test.go:76: read tcp [::1]:38612->[::1]:43483: read: connection reset by peer
    client_test.go:76: [3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker
    client_test.go:76: [3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker
    client_test.go:76: [3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker
    client_test.go:76: [3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker
    client_test.go:76: [3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker
    client_test.go:76: [3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker
    client_test.go:61: 
        	Error Trace:	client_test.go:61
        	Error:      	Condition never satisfied
        	Test:       	TestProducer
--- FAIL: TestProducer (32.53s)

If I simply remove the transport from the writer it works:

	w := &kafka.Writer{
		Addr:                   kafka.TCP(kafkaHost),
		Topic:                  t.Name(),
		Balancer:               &kafka.Hash{},
		AllowAutoTopicCreation: true,
		//Transport:              transport,
	}

As a sidenote, even if I create the topic beforehand (i.e. with controllerConn.CreateTopics(...)), I still get the Unknown Topic Or Partition.

The problem seems to go away if I replace the dialer in the transport with a *net.Dialer so that instead of:

transport := &kafka.Transport{
	DialTimeout: 10 * time.Second,
	Dial: func(ctx context.Context, network string, address string) (net.Conn, error) {
		return kafka.DialContext(ctx, network, address)
	},
}

I do:

dialer := &net.Dialer{}
transport := &kafka.Transport{
	DialTimeout: 10 * time.Second,
	Dial: func(ctx context.Context, network string, address string) (net.Conn, error) {
		return dialer.DialContext(ctx, network, address)
	},
}

Now I'm trying to figure out why the error comes back when I replace dialer := &net.Dialer{} with dialer := &kafka.Dialer{} (which I need for the SASL and TLS helpers).

If I cannot use a *kafka.Dialer in a *kafka.Writer transport, then how am I supposed to connect a producer that uses SASL and/or TLS in an easy way?

@fracasula fracasula changed the title Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker How to: Writer + TLS and SASL (kafka.Dialer?) Apr 26, 2022
@fracasula fracasula changed the title How to: Writer + TLS and SASL (kafka.Dialer?) How to: Writer + TLS and SASL (kafka.Dialer in Writer.Transport?) Apr 26, 2022
@fracasula
Copy link
Contributor Author

Nevermind, solved by setting everything into the transport.

@ekeric13
Copy link

ekeric13 commented Sep 2, 2022

@fracasula running into a similar issue here:
#516 (comment)

what do you mean by "setting everything into transport"?

Do you mean not creating a custom dialer? I am actually not doing that myself...

	sharedTransport := &kafka.Transport{}
	if useMskIam {
		awsCreds := credentials.NewEnvCredentials()

		sharedTransport = &kafka.Transport{
			TLS: &tls.Config{},
			SASL: &aws_msk_iam.Mechanism{
				Signer: sigv4.NewSigner(awsCreds),
				Region: awsRegion,
			},
		}
	}

	kafkaService := &kafka.Writer{
		Addr:  kafka.TCP(strings.Split(brokersEndpoint, ",")...),
		Topic: kafkaTopic,
		// default hash algo used by kafka
		Balancer:     kafka.Murmur2Balancer{},
		RequiredAcks: kafka.RequireOne,
		Transport:    sharedTransport,
	}

@fracasula
Copy link
Contributor Author

@ekeric13 this is how I ended up building my writer with the transport: https://github.com/rudderlabs/rudder-server/blob/v1.0.2/services/streammanager/kafka/client/producer.go#L39

@ekeric13
Copy link

Okay I think my issue may have been my MSK IAM permissions being too strict.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants