From c6f7f9a4281c10fda11835a14ea6aa090fee72c1 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 12 Dec 2022 21:23:00 -0700 Subject: [PATCH] kgo: allow empty groups when finding coordinator / fetching offsets v1.10.0 introduced a bug when switching to batch loadCoordinators: we would strip empty coordinator keys from the request. Technically, empty coordinator keys are valid. If a request contains an empty coordinator key (empty group), we would not load it and not set a map key for it and then using a field in the map value would panic. Now, we always request all coordinator keys. This also showed a bug in sharded OffsetFetch: fetching offsets for a group with no name would be stripped (now we will just forward whatever error happens). Closes #283. --- pkg/kgo/client.go | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index b5e5f133..738fc48e 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -1154,9 +1154,7 @@ func (cl *Client) loadCoordinators(ctx context.Context, typ int8, keys ...string toRequest := make(map[string]bool, len(keys)) // true == bypass the cache for _, key := range keys { - if len(key) > 0 { - toRequest[key] = false - } + toRequest[key] = false } // For each of these keys, we have two cases: @@ -2324,9 +2322,7 @@ func (cl *offsetFetchSharder) shard(ctx context.Context, kreq kmsg.Request, last } groups := make([]string, 0, len(req.Groups)) for i := range req.Groups { - if g := req.Groups[i].Group; len(g) > 0 { - groups = append(groups, req.Groups[i].Group) - } + groups = append(groups, req.Groups[i].Group) } coordinators := cl.loadCoordinators(ctx, coordinatorTypeGroup, groups...) @@ -2487,9 +2483,7 @@ func (*findCoordinatorSharder) shard(_ context.Context, kreq kmsg.Request, lastE } req.CoordinatorKeys = req.CoordinatorKeys[:0] for key := range uniq { - if len(key) > 0 { - req.CoordinatorKeys = append(req.CoordinatorKeys, key) - } + req.CoordinatorKeys = append(req.CoordinatorKeys, key) } splitReq := errors.Is(lastErr, errBrokerTooOld)