Skip to content

Commit

Permalink
kgo: inject the group lost error into polling
Browse files Browse the repository at this point in the history
Multiple users have been hit by this and have not had visibility.

In fact, the original HookGroupManageError can be attributed to having
no visibility.

For #147, #321, #379.
  • Loading branch information
twmb committed Mar 11, 2023
1 parent 29a94a1 commit d5ea09b
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 1 deletion.
1 change: 1 addition & 0 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ func (g *groupConsumer) manage() {
h.OnGroupManageError(err)
}
})
g.c.addFakeReadyForDraining("", 0, err)
}

// If we are eager, we should have invalidated everything
Expand Down
13 changes: 13 additions & 0 deletions pkg/kgo/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,3 +284,16 @@ func (e *errUnknownCoordinator) Error() string {
" but did not reply with that broker in the broker list", e.key.name, e.key.typ, e.coordinator)
}
}

// ErrGroupSession is injected into a poll if an error occurred such that your
// consumer group member was kicked from the group or was never able to join
// the group.
type ErrGroupLost struct {
err error
}

func (e *ErrGroupLost) Error() string {
return fmt.Sprintf("unable to join group: %v", e.err)
}

func (e *ErrGroupLost) Unwrap() error { return e.err }
13 changes: 12 additions & 1 deletion pkg/kgo/record_and_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ type FetchError struct {
// Errors returns all errors in a fetch with the topic and partition that
// errored.
//
// There are four classes of errors possible:
// There are a few classes of errors possible:
//
// 1. a normal kerr.Error; these are usually the non-retriable kerr.Errors,
// but theoretically a non-retriable error can be fixed at runtime (auth
Expand All @@ -345,6 +345,17 @@ type FetchError struct {
// is returned from every Poll call if the client has been closed.
// A corresponding helper function IsClientClosed can be used to detect
// this error.
//
// 5. an injected context error; this can be present if the context you were
// using for polling timed out or was canceled.
//
// If you are group consuming, there is another class of error:
//
// 6. an injected ErrGroupSession; this is an informational error that is
// injected once a group session is lost in a way that is not the standard
// rebalance. This error can signify that your consumer member is not able
// to connect to the group (ACL problems, unreachable broker), or you
// blocked rebalancing for too long, or your callbacks took too long.
func (fs Fetches) Errors() []FetchError {
var errs []FetchError
fs.EachError(func(t string, p int32, err error) {
Expand Down

0 comments on commit d5ea09b

Please sign in to comment.