Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Random errors of "[3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker" probably related to MSK IAM auth #992

Open
ekeric13 opened this issue Sep 19, 2022 · 11 comments
Assignees
Labels

Comments

@ekeric13
Copy link

ekeric13 commented Sep 19, 2022

Describe the bug

So when I use kafka go as a writer, I will randomly get the error "[3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker".

I reproduced this in a sandbox app where I simply just add the current date to a topic every 5 seconds. I am only getting this with SASL auth with the msk iam mechanism.
I have a parallel kafka-go client with TLS auth and I have yet to reproduce this error.
I also wrote a client using franz-go using TLS and SASL authentication with the msk iam authentication and I have yet to reproduce the error there.
So it really does feel specific to kafka-go SASL MSK_IAM auth with the writer.

I think there might be a bug with the msk IAM auth package? Or maybe I need to give it different permissions?

In my sandbox app I give the AWS credentials very thorough permissions:

  statement {
    effect = "Allow"
    actions = [
      "kafka-cluster:Connect",
      "kafka-cluster:Describe*",
      "kafka-cluster:Read*",
      "kafka-cluster:Write*",
      "kafka-cluster:Alter*",
      "kafka-cluster:Create*",
    ]
    resources = [
      "*"
    ]
  }

Kafka Version

  • What version(s) of Kafka are you testing against?

Using Kafka 2.6.2 via MSK.

  • What version of kafka-go are you using?

github.com/segmentio/kafka-go v0.4.34
github.com/segmentio/kafka-go/sasl/aws_msk_iam v0.0.0-20220902170624-ba6f4426b511

To Reproduce

I cannot reproduce this immediately. As mentioned above, in order to reproduce I created a sandbox app that authenticated with a working aws account. Furthermore and it doesn't happen right away and usually takes some time to appear, usually lik 4 days. The error will usually occur in batches. Like it will occur for 15 seconds and then just stop.

Resources to reproduce the behavior:

setup:

kafkaAddr := kafka.TCP(strings.Split(brokersEndpoint, ",")...)
dialerTimeout := 10 * time.Second
dialer := &net.Dialer{
	Timeout: dialerTimeout,
}
transport := &kafka.Transport{
	DialTimeout: dialerTimeout,
	Dial:        dialer.DialContext,
}

if useAuth {
	awsCreds := credentials.NewEnvCredentials()

	transport.TLS = &tls.Config{}
	transport.SASL = &aws_msk_iam.Mechanism{
		Signer: sigv4.NewSigner(awsCreds),
		Region: awsRegion,
	}
}
kWriter := &kafka.Writer{
	Addr:  kafkaAddr,
	Topic: kafkaTopic,
	// default hash algo used by kafka
	Balancer:     kafka.Murmur2Balancer{},
	RequiredAcks: kafka.RequireAll,
	ReadTimeout:  15 * time.Second,
	WriteTimeout: 15 * time.Second,
	MaxAttempts:  5,
	Transport:    transport,
}

usage:

currentTime := time.Now()
headers := []kafka.Header{
	{
		Key:   "UnixTime",
		Value: []byte(strconv.FormatInt(currentTime.Unix(), 10)),
	},
}
value := currentTime.Format("2006-01-02 15:04:05")

log.Debug().Str("Data", value).Msg("Data added")
err := w.service.WriteMessages(ctx,
	kafka.Message{
		Value:   []byte(value),
		Headers: headers,
	},
)
if err != nil {
	log.Error().Err(err).Msg("Could not publish event to kafka topic")
	return err
}

I have copied a couple different setups:
#898 (comment)

but so far I always get the "[3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker" error eventually.

Expected Behavior

No errors when writing to a topic using a sasl authenticated connection via msk_iam.

Observed Behavior

Random failures of "[3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker" when writing to a topic using a sasl authenticated connection via msk_iam.

@ekeric13 ekeric13 added the bug label Sep 19, 2022
@ekeric13
Copy link
Author

I think this might be happening on sasl re-authentication. Is there way to know to not write to a topic when the client is re-authenticating?

@jdupl123
Copy link

As a quick note I get the same error if my kafka user is not authorised for a given topic.

@ekeric13
Copy link
Author

@jdupl123 how are you authorizing your user for a given topic?
As explained above this is my setup which I assumed works for all topics:

  statement {
    effect = "Allow"
    actions = [
      "kafka-cluster:Connect",
      "kafka-cluster:Describe*",
      "kafka-cluster:Read*",
      "kafka-cluster:Write*",
      "kafka-cluster:Alter*",
      "kafka-cluster:Create*",
    ]
    resources = [
      "*"
    ]
  }

@kscooo
Copy link
Contributor

kscooo commented Oct 19, 2022

No, I also encountered this problem, but no new issues were released. In fact, when I used this library in May this year, I did not observe this error for normal production and consumption, The architecture of the latter project has changed and put kafka as idle, without any production and consumption of messages, and it will appear [29] Topic Authorization Failed: the client is not authorized to access the requested topic and [3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker

[29] Topic Authorization Failed: the client is not authorized to access the requested topic This error is reported very frequently, once every 12 hours.

[3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker This error is very rare, but it occurs about once every few weeks.

I also contacted aws tech support, which did not give me any help and suggested I ask around in the community.

I now plan to have aws sdk v2 to rewrite the authentication process, hopefully that will help(By the way, English is not my native language, may not express some accurate)

@kscooo
Copy link
Contributor

kscooo commented Nov 1, 2022

Upgrading to aws sdk v2 did not help

@zhongchen
Copy link

I also noticed the same errors with AWS MSK using SASL authentication.

Is there way to know to not write to a topic when the client is re-authenticating?

@ekeric13 Did you find a way to solve this?

@ekeric13
Copy link
Author

ekeric13 commented Jan 6, 2023

@zhongchen I just used this library instead:
https://github.com/twmb/franz-go

It had a different sasl authentication issue but the maintainer is very good and fixed it.

@achille-roussel
Copy link
Contributor

Hello @ekeric13, I'm glad you got it sorted out using franz-go!

Would you be able to share a link to the discussion you had with the franz-go maintainer(s) that helped address your issue? This would be very valuable for us to improve kafka-go as well.

@zhongchen
Copy link

@achille-roussel I managed to solve the issue by adding an additional external retry logic for the kafka.WriteMessage.

I looked at the code and found that the delay interval of internal retry has an upper bound of 1s. As a result, even though it retries, it still gets affected if the temporary issue lasts a little bit longer.

@hequanb
Copy link

hequanb commented Oct 23, 2024

@zhongchen

hello, sorry to bother you, may you tell me increase the value of the writer's conf WriteBackoffMax would help to solve this problem? seems like the lib already provide a way to retry publishing message.

@hequanb
Copy link

hequanb commented Nov 11, 2024

well, simply retry may solve 85% problems, however, in some particular time, 12:30am in my case, this auth error occurs continuously in about 4 minutes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

7 participants