Skip to content

Commit

Permalink
broker: remove 5s minimum for sasl session lifetime
Browse files Browse the repository at this point in the history
Previously, if a broker returned <5000ms for a sasl session lifetime,
the client would reject the lifetime and close the connection. This
would result in a log and a failed request, and the error was
technically not retriable.

The Java client uses anywhere from 85% to 95% of the lifetime no matter
what (uniform random range, plus / minus jitter). So, no lower bound.

We remove the lower bound and instead switching to taking 2.5s off the
lifetime, or the latency of the authentication itself, whichever is
larger. If subtracting 2.5s immediately expires the sasl, we sleep 100ms
to avoid hot looping, but the next write will cause a reauthentication.
As a final bit of validation, if we loop reauthenticating 15x (which is
an overkill allowance already), we kill the connection and retry on a
new connection. The new introduced error is considered a retriable
broker err.
  • Loading branch information
twmb committed Feb 23, 2022
1 parent da13328 commit 3e6f0fd
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 8 deletions.
53 changes: 45 additions & 8 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,19 @@ func (b *broker) handleReq(pr promisedReq) {

req.SetVersion(version) // always go for highest version

if !cxn.expiry.IsZero() && time.Now().After(cxn.expiry) {
for reauthentications := 1; !cxn.expiry.IsZero() && time.Now().After(cxn.expiry); reauthentications++ {
// We allow 15 reauths, which is a lot. If a new lifetime is
// <2.5s, we sleep 100ms and try again. Retrying 15x puts us at
// <1s compared to the original lifetime. A broker should not
// reply with a <1s lifetime, but if we end up here, then we
// kill the connection ourselves and retry on a new connection.
if reauthentications > 15 {
cxn.cl.cfg.logger.Log(LogLevelError, "the broker has repeatedly given us short sasl lifetimes, we are forcefully killing our own connection to retry on a new connection ", "broker", logID(cxn.b.meta.NodeID))
pr.promise(nil, errSaslReauthLoop)
cxn.die()
return
}

// If we are after the reauth time, try to reauth. We
// can only have an expiry if we went the authenticate
// flow, so we know we are authenticating again.
Expand Down Expand Up @@ -722,6 +734,7 @@ func (cxn *brokerCxn) sasl() error {

v := cxn.b.loadVersions()
req := kmsg.NewPtrSASLHandshakeRequest()

start:
if mechanism.Name() != "GSSAPI" && v.versions[req.Key()] >= 0 {
req.Mechanism = mechanism.Name()
Expand Down Expand Up @@ -774,6 +787,7 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error {
return fmt.Errorf("unexpected server-write sasl with mechanism %s", cxn.mechanism.Name())
}

prereq := time.Now() // used below for sasl lifetime calculation
var lifetimeMillis int64

// Even if we do not wrap our reads/writes in SASLAuthenticate, we
Expand Down Expand Up @@ -814,6 +828,9 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error {
req.Version = cxn.b.loadVersions().versions[req.Key()]
cxn.cl.cfg.logger.Log(LogLevelDebug, "issuing SASLAuthenticate", "broker", logID(cxn.b.meta.NodeID), "version", req.Version, "step", step)

// Lifetime: we take the timestamp before we write our
// request; see usage below for why.
prereq = time.Now()
corrID, bytesWritten, writeWait, timeToWrite, readEnqueue, writeErr := cxn.writeRequest(nil, time.Now(), req)

// As mentioned above, we could have one final write
Expand Down Expand Up @@ -856,16 +873,36 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error {
}

if lifetimeMillis > 0 {
// If we have a lifetime, we take 1s off of it to account
// for some processing lag or whatever.
// A better thing to return in the auth response would
// have been the deadline, but we are here now.
if lifetimeMillis < 5000 {
return fmt.Errorf("invalid short sasl lifetime millis %d", lifetimeMillis)
// Lifetime: we could have written our request instantaenously,
// the broker calculating our session lifetime, and then the
// broker / network hung for a bit when writing. We
// pessimistically assume this worst case and take off the
// final request e2e latency x1.1 from the lifetime.
//
// If the latency is <2.5s, we also pessimistically assume that
// things may take 2.5s in the future.
//
// We may make our lifetime <0; brokers should use longer
// lifetimes, but some do not in all cases. If our lifetime is
// <100ms, we sleep for 100ms just to ensure we do not
// spin-loop reauthenticating *too* much.
latency := int64(float64(time.Since(prereq).Milliseconds()) * 1.1)
if latency < 2500 {
latency = 2500
}

useLifetime := lifetimeMillis - latency
now := time.Now()
cxn.expiry = now.Add(time.Duration(lifetimeMillis)*time.Millisecond - time.Second)
cxn.expiry = now.Add(time.Duration(useLifetime) * time.Millisecond)
cxn.cl.cfg.logger.Log(LogLevelDebug, "sasl has a limited lifetime", "broker", logID(cxn.b.meta.NodeID), "reauthenticate_in", cxn.expiry.Sub(now))
if useLifetime < 0 {
cxn.cl.cfg.logger.Log(LogLevelInfo, "sasl lifetime minus 2.5s lower bound latency results in immediate reauthentication, sleeping 100ms to avoid spin-loop",
"broker", logID(cxn.b.meta.NodeID),
"session_lifetime", time.Duration(lifetimeMillis)*time.Millisecond,
"latency_lower_bound", time.Duration(latency)*time.Millisecond,
)
time.Sleep(100 * time.Millisecond)
}
}
return nil
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/kgo/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ func isRetriableBrokerErr(err error) bool {
if errors.Is(err, errChosenBrokerDead) {
return true
}
// A broker kept giving us short sasl lifetimes, so we killed the
// connection ourselves. We can retry on a new connection.
if errors.Is(err, errSaslReauthLoop) {
return true
}
// We really should not get correlation mismatch, but if we do, we can
// retry.
if errors.Is(err, errCorrelationIDMismatch) {
Expand Down Expand Up @@ -112,6 +117,11 @@ var (
// stopped due to a concurrent metadata response.
errChosenBrokerDead = errors.New("the internal broker struct chosen to issue this request has died--either the broker id is migrating or no longer exists")

// If a broker repeatedly gives us tiny sasl lifetimes, we fail a
// request after a few tries to forcefully kill the connection and
// restart a new connection ourselves.
errSaslReauthLoop = errors.New("the broker is repeatedly giving us sasl lifetimes that are too short to write a request")

errProducerIDLoadFail = errors.New("unable to initialize a producer ID due to request failures")

// A temporary error returned when Kafka replies with a different
Expand Down

0 comments on commit 3e6f0fd

Please sign in to comment.