diff --git a/client.go b/client.go index f5563e938..019cb4373 100644 --- a/client.go +++ b/client.go @@ -17,6 +17,9 @@ type Client interface { // altered after it has been created. Config() *Config + // Controller returns the cluster controller broker. + Controller() (*Broker, error) + // Brokers returns the current set of active brokers as retrieved from cluster metadata. Brokers() []*Broker @@ -97,6 +100,7 @@ type client struct { seedBrokers []*Broker deadSeeds []*Broker + controllerID int32 // cluster controller broker id brokers map[int32]*Broker // maps broker ids to brokers metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata coordinators map[string]int32 // Maps consumer group names to coordinating broker IDs @@ -379,6 +383,27 @@ func (client *client) GetOffset(topic string, partitionID int32, time int64) (in return offset, err } +func (client *client) Controller() (*Broker, error) { + if client.Closed() { + return nil, ErrClosedClient + } + + controller := client.cachedController() + if controller == nil { + if err := client.refreshMetadata(); err != nil { + return nil, err + } + controller = client.cachedController() + } + + if controller == nil { + return nil, ErrControllerNotAvailable + } + + _ = controller.Open(client.conf) + return controller, nil +} + func (client *client) Coordinator(consumerGroup string) (*Broker, error) { if client.Closed() { return nil, ErrClosedClient @@ -607,20 +632,7 @@ func (client *client) backgroundMetadataUpdater() { for { select { case <-ticker.C: - topics := []string{} - if !client.conf.Metadata.Full { - if specificTopics, err := client.Topics(); err != nil { - Logger.Println("Client background metadata topic load:", err) - break - } else if len(specificTopics) == 0 { - Logger.Println("Client background metadata update: no specific topics to update") - break - } else { - topics = specificTopics - } - } - - if err := client.RefreshMetadata(topics...); err != nil { + if err := client.refreshMetadata(); err != nil { Logger.Println("Client background metadata update:", err) } case <-client.closer: @@ -629,6 +641,26 @@ func (client *client) backgroundMetadataUpdater() { } } +func (client *client) refreshMetadata() error { + topics := []string{} + + if !client.conf.Metadata.Full { + if specificTopics, err := client.Topics(); err != nil { + return err + } else if len(specificTopics) == 0 { + return ErrNoTopicsToUpdateMetadata + } else { + topics = specificTopics + } + } + + if err := client.RefreshMetadata(topics...); err != nil { + return err + } + + return nil +} + func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int) error { retry := func(err error) error { if attemptsRemaining > 0 { @@ -645,7 +677,12 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int) } else { Logger.Printf("client/metadata fetching metadata for all topics from broker %s\n", broker.addr) } - response, err := broker.GetMetadata(&MetadataRequest{Topics: topics}) + + req := &MetadataRequest{Topics: topics} + if client.conf.Version.IsAtLeast(V0_10_0_0) { + req.Version = 1 + } + response, err := broker.GetMetadata(req) switch err.(type) { case nil: @@ -686,6 +723,9 @@ func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bo for _, broker := range data.Brokers { client.registerBroker(broker) } + + client.controllerID = data.ControllerID + if allKnownMetaData { client.metadata = make(map[string]map[int32]*PartitionMetadata) client.cachedPartitionsResults = make(map[string][maxPartitionIndex][]int32) @@ -739,6 +779,13 @@ func (client *client) cachedCoordinator(consumerGroup string) *Broker { return nil } +func (client *client) cachedController() *Broker { + client.lock.RLock() + defer client.lock.RUnlock() + + return client.brokers[client.controllerID] +} + func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemaining int) (*FindCoordinatorResponse, error) { retry := func(err error) (*FindCoordinatorResponse, error) { if attemptsRemaining > 0 { diff --git a/client_test.go b/client_test.go index 2e1198d27..fc255a730 100644 --- a/client_test.go +++ b/client_test.go @@ -444,6 +444,47 @@ func TestClientResurrectDeadSeeds(t *testing.T) { safeClose(t, c) } +func TestClientController(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + controllerBroker := NewMockBroker(t, 2) + defer controllerBroker.Close() + + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(controllerBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). + SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()), + }) + + cfg := NewConfig() + + // test kafka version greater than 0.10.0.0 + cfg.Version = V0_10_0_0 + client1, err := NewClient([]string{seedBroker.Addr()}, cfg) + if err != nil { + t.Fatal(err) + } + defer safeClose(t, client1) + broker, err := client1.Controller() + if err != nil { + t.Fatal(err) + } + if broker.Addr() != controllerBroker.Addr() { + t.Errorf("Expected controller to have address %s, found %s", controllerBroker.Addr(), broker.Addr()) + } + + // test kafka version earlier than 0.10.0.0 + cfg.Version = V0_9_0_1 + client2, err := NewClient([]string{seedBroker.Addr()}, cfg) + if err != nil { + t.Fatal(err) + } + defer safeClose(t, client2) + if _, err = client2.Controller(); err != ErrControllerNotAvailable { + t.Errorf("Expected Contoller() to return %s, found %s", ErrControllerNotAvailable, err) + } +} func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) { seedBroker := NewMockBroker(t, 1) staleCoordinator := NewMockBroker(t, 2) diff --git a/errors.go b/errors.go index 54f431a4a..916098c54 100644 --- a/errors.go +++ b/errors.go @@ -41,6 +41,14 @@ var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetc // a RecordBatch. var ErrConsumerOffsetNotAdvanced = errors.New("kafka: consumer offset was not advanced after a RecordBatch") +// ErrControllerNotAvailable is returned when server didn't give correct controller id. May be kafka server's version +// is lower than 0.10.0.0. +var ErrControllerNotAvailable = errors.New("kafka: controller is not avaiable") + +// ErrNoTopicsToUpdateMetadata is returned when Meta.Full is set to false but no specific topics were found to update +// the metadata. +var ErrNoTopicsToUpdateMetadata = errors.New("kafka: no specific topics to update metadata") + // PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example, // if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that. type PacketEncodingError struct { diff --git a/mockresponses.go b/mockresponses.go index e12849f54..5541d32ec 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -68,9 +68,10 @@ func (mc *MockSequence) For(reqBody versionedDecoder) (res encoder) { // MockMetadataResponse is a `MetadataResponse` builder. type MockMetadataResponse struct { - leaders map[string]map[int32]int32 - brokers map[string]int32 - t TestReporter + controllerID int32 + leaders map[string]map[int32]int32 + brokers map[string]int32 + t TestReporter } func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse { @@ -96,9 +97,17 @@ func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMet return mmr } +func (mmr *MockMetadataResponse) SetController(brokerID int32) *MockMetadataResponse { + mmr.controllerID = brokerID + return mmr +} + func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoder { metadataRequest := reqBody.(*MetadataRequest) - metadataResponse := &MetadataResponse{} + metadataResponse := &MetadataResponse{ + Version: metadataRequest.version(), + ControllerID: mmr.controllerID, + } for addr, brokerID := range mmr.brokers { metadataResponse.AddBroker(addr, brokerID) }