Skip to content

Commit

Permalink
Merge pull request #1063 from imjustfly/master
Browse files Browse the repository at this point in the history
add Controller() method to Client interface
  • Loading branch information
eapache authored Mar 16, 2018
2 parents 102a489 + 8a87ca8 commit 5e8fd95
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 19 deletions.
77 changes: 62 additions & 15 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ type Client interface {
// altered after it has been created.
Config() *Config

// Controller returns the cluster controller broker.
Controller() (*Broker, error)

// Brokers returns the current set of active brokers as retrieved from cluster metadata.
Brokers() []*Broker

Expand Down Expand Up @@ -97,6 +100,7 @@ type client struct {
seedBrokers []*Broker
deadSeeds []*Broker

controllerID int32 // cluster controller broker id
brokers map[int32]*Broker // maps broker ids to brokers
metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
coordinators map[string]int32 // Maps consumer group names to coordinating broker IDs
Expand Down Expand Up @@ -379,6 +383,27 @@ func (client *client) GetOffset(topic string, partitionID int32, time int64) (in
return offset, err
}

func (client *client) Controller() (*Broker, error) {
if client.Closed() {
return nil, ErrClosedClient
}

controller := client.cachedController()
if controller == nil {
if err := client.refreshMetadata(); err != nil {
return nil, err
}
controller = client.cachedController()
}

if controller == nil {
return nil, ErrControllerNotAvailable
}

_ = controller.Open(client.conf)
return controller, nil
}

func (client *client) Coordinator(consumerGroup string) (*Broker, error) {
if client.Closed() {
return nil, ErrClosedClient
Expand Down Expand Up @@ -607,20 +632,7 @@ func (client *client) backgroundMetadataUpdater() {
for {
select {
case <-ticker.C:
topics := []string{}
if !client.conf.Metadata.Full {
if specificTopics, err := client.Topics(); err != nil {
Logger.Println("Client background metadata topic load:", err)
break
} else if len(specificTopics) == 0 {
Logger.Println("Client background metadata update: no specific topics to update")
break
} else {
topics = specificTopics
}
}

if err := client.RefreshMetadata(topics...); err != nil {
if err := client.refreshMetadata(); err != nil {
Logger.Println("Client background metadata update:", err)
}
case <-client.closer:
Expand All @@ -629,6 +641,26 @@ func (client *client) backgroundMetadataUpdater() {
}
}

func (client *client) refreshMetadata() error {
topics := []string{}

if !client.conf.Metadata.Full {
if specificTopics, err := client.Topics(); err != nil {
return err
} else if len(specificTopics) == 0 {
return ErrNoTopicsToUpdateMetadata
} else {
topics = specificTopics
}
}

if err := client.RefreshMetadata(topics...); err != nil {
return err
}

return nil
}

func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int) error {
retry := func(err error) error {
if attemptsRemaining > 0 {
Expand All @@ -645,7 +677,12 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int)
} else {
Logger.Printf("client/metadata fetching metadata for all topics from broker %s\n", broker.addr)
}
response, err := broker.GetMetadata(&MetadataRequest{Topics: topics})

req := &MetadataRequest{Topics: topics}
if client.conf.Version.IsAtLeast(V0_10_0_0) {
req.Version = 1
}
response, err := broker.GetMetadata(req)

switch err.(type) {
case nil:
Expand Down Expand Up @@ -686,6 +723,9 @@ func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bo
for _, broker := range data.Brokers {
client.registerBroker(broker)
}

client.controllerID = data.ControllerID

if allKnownMetaData {
client.metadata = make(map[string]map[int32]*PartitionMetadata)
client.cachedPartitionsResults = make(map[string][maxPartitionIndex][]int32)
Expand Down Expand Up @@ -739,6 +779,13 @@ func (client *client) cachedCoordinator(consumerGroup string) *Broker {
return nil
}

func (client *client) cachedController() *Broker {
client.lock.RLock()
defer client.lock.RUnlock()

return client.brokers[client.controllerID]
}

func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemaining int) (*FindCoordinatorResponse, error) {
retry := func(err error) (*FindCoordinatorResponse, error) {
if attemptsRemaining > 0 {
Expand Down
41 changes: 41 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,47 @@ func TestClientResurrectDeadSeeds(t *testing.T) {
safeClose(t, c)
}

func TestClientController(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()
controllerBroker := NewMockBroker(t, 2)
defer controllerBroker.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(controllerBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()),
})

cfg := NewConfig()

// test kafka version greater than 0.10.0.0
cfg.Version = V0_10_0_0
client1, err := NewClient([]string{seedBroker.Addr()}, cfg)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, client1)
broker, err := client1.Controller()
if err != nil {
t.Fatal(err)
}
if broker.Addr() != controllerBroker.Addr() {
t.Errorf("Expected controller to have address %s, found %s", controllerBroker.Addr(), broker.Addr())
}

// test kafka version earlier than 0.10.0.0
cfg.Version = V0_9_0_1
client2, err := NewClient([]string{seedBroker.Addr()}, cfg)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, client2)
if _, err = client2.Controller(); err != ErrControllerNotAvailable {
t.Errorf("Expected Contoller() to return %s, found %s", ErrControllerNotAvailable, err)
}
}
func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
staleCoordinator := NewMockBroker(t, 2)
Expand Down
8 changes: 8 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetc
// a RecordBatch.
var ErrConsumerOffsetNotAdvanced = errors.New("kafka: consumer offset was not advanced after a RecordBatch")

// ErrControllerNotAvailable is returned when server didn't give correct controller id. May be kafka server's version
// is lower than 0.10.0.0.
var ErrControllerNotAvailable = errors.New("kafka: controller is not avaiable")

// ErrNoTopicsToUpdateMetadata is returned when Meta.Full is set to false but no specific topics were found to update
// the metadata.
var ErrNoTopicsToUpdateMetadata = errors.New("kafka: no specific topics to update metadata")

// PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example,
// if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
type PacketEncodingError struct {
Expand Down
17 changes: 13 additions & 4 deletions mockresponses.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,10 @@ func (mc *MockSequence) For(reqBody versionedDecoder) (res encoder) {

// MockMetadataResponse is a `MetadataResponse` builder.
type MockMetadataResponse struct {
leaders map[string]map[int32]int32
brokers map[string]int32
t TestReporter
controllerID int32
leaders map[string]map[int32]int32
brokers map[string]int32
t TestReporter
}

func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse {
Expand All @@ -96,9 +97,17 @@ func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMet
return mmr
}

func (mmr *MockMetadataResponse) SetController(brokerID int32) *MockMetadataResponse {
mmr.controllerID = brokerID
return mmr
}

func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoder {
metadataRequest := reqBody.(*MetadataRequest)
metadataResponse := &MetadataResponse{}
metadataResponse := &MetadataResponse{
Version: metadataRequest.version(),
ControllerID: mmr.controllerID,
}
for addr, brokerID := range mmr.brokers {
metadataResponse.AddBroker(addr, brokerID)
}
Expand Down

0 comments on commit 5e8fd95

Please sign in to comment.