From bbdeda9fd2d563768555a1a1230bb762a6fe4fca Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Sat, 17 Mar 2018 18:25:02 +0000 Subject: [PATCH] Added support for Metadata Request/Response up to v5 --- metadata_request.go | 26 ++++++++++--- metadata_request_test.go | 32 ++++++++++++++++ metadata_response.go | 74 +++++++++++++++++++++++++++++-------- metadata_response_test.go | 78 +++++++++++++++++++++++++++++++++++++-- 4 files changed, 186 insertions(+), 24 deletions(-) diff --git a/metadata_request.go b/metadata_request.go index 6cef61cc8..48adfa28c 100644 --- a/metadata_request.go +++ b/metadata_request.go @@ -1,12 +1,13 @@ package sarama type MetadataRequest struct { - Version int16 - Topics []string + Version int16 + Topics []string + AllowAutoTopicCreation bool } func (r *MetadataRequest) encode(pe packetEncoder) error { - if r.Version < 0 || r.Version > 1 { + if r.Version < 0 || r.Version > 5 { return PacketEncodingError{"invalid or unsupported MetadataRequest version field"} } if r.Version == 0 || r.Topics != nil || len(r.Topics) > 0 { @@ -24,6 +25,9 @@ func (r *MetadataRequest) encode(pe packetEncoder) error { } else { pe.putInt32(-1) } + if r.Version > 3 { + pe.putBool(r.AllowAutoTopicCreation) + } return nil } @@ -49,9 +53,15 @@ func (r *MetadataRequest) decode(pd packetDecoder, version int16) error { } r.Topics[i] = topic } - return nil } - + if r.Version > 3 { + autoCreation, err := pd.getBool() + if err != nil { + return err + } + r.AllowAutoTopicCreation = autoCreation + } + return nil } func (r *MetadataRequest) key() int16 { @@ -66,6 +76,12 @@ func (r *MetadataRequest) requiredVersion() KafkaVersion { switch r.Version { case 1: return V0_10_0_0 + case 2: + return V0_10_1_0 + case 3, 4: + return V0_11_0_0 + case 5: + return V1_0_0_0 default: return MinVersion } diff --git a/metadata_request_test.go b/metadata_request_test.go index 11be33132..727e48a2c 100644 --- a/metadata_request_test.go +++ b/metadata_request_test.go @@ -18,6 +18,9 @@ var ( metadataRequestNoTopicsV1 = []byte{ 0xff, 0xff, 0xff, 0xff} + + metadataRequestAutoCreateV4 = append(metadataRequestOneTopicV0, byte(1)) + metadataRequestNoAutoCreateV4 = append(metadataRequestOneTopicV0, byte(0)) ) func TestMetadataRequestV0(t *testing.T) { @@ -42,3 +45,32 @@ func TestMetadataRequestV1(t *testing.T) { request.Topics = []string{"foo", "bar", "baz"} testRequest(t, "three topics", request, metadataRequestThreeTopicsV0) } + +func TestMetadataRequestV2(t *testing.T) { + request := new(MetadataRequest) + request.Version = 2 + testRequest(t, "no topics", request, metadataRequestNoTopicsV1) + + request.Topics = []string{"topic1"} + testRequest(t, "one topic", request, metadataRequestOneTopicV0) +} + +func TestMetadataRequestV3(t *testing.T) { + request := new(MetadataRequest) + request.Version = 3 + testRequest(t, "no topics", request, metadataRequestNoTopicsV1) + + request.Topics = []string{"topic1"} + testRequest(t, "one topic", request, metadataRequestOneTopicV0) +} + +func TestMetadataRequestV4(t *testing.T) { + request := new(MetadataRequest) + request.Version = 4 + request.Topics = []string{"topic1"} + request.AllowAutoTopicCreation = true + testRequest(t, "one topic", request, metadataRequestAutoCreateV4) + + request.AllowAutoTopicCreation = false + testRequest(t, "one topic", request, metadataRequestNoAutoCreateV4) +} diff --git a/metadata_response.go b/metadata_response.go index 8190e8521..bf8a67bbc 100644 --- a/metadata_response.go +++ b/metadata_response.go @@ -1,14 +1,15 @@ package sarama type PartitionMetadata struct { - Err KError - ID int32 - Leader int32 - Replicas []int32 - Isr []int32 + Err KError + ID int32 + Leader int32 + Replicas []int32 + Isr []int32 + OfflineReplicas []int32 } -func (pm *PartitionMetadata) decode(pd packetDecoder) (err error) { +func (pm *PartitionMetadata) decode(pd packetDecoder, version int16) (err error) { tmp, err := pd.getInt16() if err != nil { return err @@ -35,10 +36,17 @@ func (pm *PartitionMetadata) decode(pd packetDecoder) (err error) { return err } + if version >= 5 { + pm.OfflineReplicas, err = pd.getInt32Array() + if err != nil { + return err + } + } + return nil } -func (pm *PartitionMetadata) encode(pe packetEncoder) (err error) { +func (pm *PartitionMetadata) encode(pe packetEncoder, version int16) (err error) { pe.putInt16(int16(pm.Err)) pe.putInt32(pm.ID) pe.putInt32(pm.Leader) @@ -53,6 +61,13 @@ func (pm *PartitionMetadata) encode(pe packetEncoder) (err error) { return err } + if version >= 5 { + err = pe.putInt32Array(pm.OfflineReplicas) + if err != nil { + return err + } + } + return nil } @@ -89,7 +104,7 @@ func (tm *TopicMetadata) decode(pd packetDecoder, version int16) (err error) { tm.Partitions = make([]*PartitionMetadata, n) for i := 0; i < n; i++ { tm.Partitions[i] = new(PartitionMetadata) - err = tm.Partitions[i].decode(pd) + err = tm.Partitions[i].decode(pd, version) if err != nil { return err } @@ -116,7 +131,7 @@ func (tm *TopicMetadata) encode(pe packetEncoder, version int16) (err error) { } for _, pm := range tm.Partitions { - err = pm.encode(pe) + err = pm.encode(pe, version) if err != nil { return err } @@ -126,13 +141,24 @@ func (tm *TopicMetadata) encode(pe packetEncoder, version int16) (err error) { } type MetadataResponse struct { - Version int16 - Brokers []*Broker - ControllerID int32 - Topics []*TopicMetadata + Version int16 + ThrottleTimeMs int32 + Brokers []*Broker + ClusterID *string + ControllerID int32 + Topics []*TopicMetadata } func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) { + r.Version = version + + if version >= 3 { + r.ThrottleTimeMs, err = pd.getInt32() + if err != nil { + return err + } + } + n, err := pd.getArrayLength() if err != nil { return err @@ -147,6 +173,13 @@ func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) { } } + if version >= 2 { + r.ClusterID, err = pd.getNullableString() + if err != nil { + return err + } + } + if version >= 1 { r.ControllerID, err = pd.getInt32() if err != nil { @@ -208,11 +241,22 @@ func (r *MetadataResponse) key() int16 { } func (r *MetadataResponse) version() int16 { - return 0 + return r.Version } func (r *MetadataResponse) requiredVersion() KafkaVersion { - return MinVersion + switch r.Version { + case 1: + return V0_10_0_0 + case 2: + return V0_10_1_0 + case 3, 4: + return V0_11_0_0 + case 5: + return V1_0_0_0 + default: + return MinVersion + } } // testing API diff --git a/metadata_response_test.go b/metadata_response_test.go index a1a53a597..04a4ce7fc 100644 --- a/metadata_response_test.go +++ b/metadata_response_test.go @@ -75,6 +75,31 @@ var ( 0x00, 0x03, 'b', 'a', 'r', 0x01, 0x00, 0x00, 0x00, 0x00} + + noBrokersNoTopicsWithThrottleTimeAndClusterIDV3 = []byte{ + 0x00, 0x00, 0x00, 0x10, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x09, 'c', 'l', 'u', 's', 't', 'e', 'r', 'I', 'd', + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x00, 0x00, 0x00} + + noBrokersOneTopicWithOfflineReplicasV5 = []byte{ + 0x00, 0x00, 0x00, 0x05, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x09, 'c', 'l', 'u', 's', 't', 'e', 'r', 'I', 'd', + 0x00, 0x00, 0x00, 0x02, + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x00, + 0x00, 0x03, 'f', 'o', 'o', + 0x00, + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x04, + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x00, 0x00, 0x07, + 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03, + 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, + 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x03, + } ) func TestEmptyMetadataResponseV0(t *testing.T) { @@ -206,15 +231,60 @@ func TestMetadataResponseWithTopicsV1(t *testing.T) { t.Error("Decoding produced", len(response.Brokers), "brokers where there were none!") } if response.ControllerID != 4 { - t.Error("Decoding produced", len(response.Brokers), "should have been 4!") + t.Error("Decoding produced", response.ControllerID, "should have been 4!") } if len(response.Topics) != 2 { - t.Error("Decoding produced", len(response.Brokers), "topics where there were 2!") + t.Error("Decoding produced", len(response.Topics), "topics where there were 2!") } if response.Topics[0].IsInternal { - t.Error("Decoding produced", response.ControllerID, "topic0 should have been false!") + t.Error("Decoding produced", response.Topics[0], "topic0 should have been false!") } if !response.Topics[1].IsInternal { - t.Error("Decoding produced", response.ControllerID, "topic1 should have been true!") + t.Error("Decoding produced", response.Topics[1], "topic1 should have been true!") + } +} + +func TestMetadataResponseWithThrottleTime(t *testing.T) { + response := MetadataResponse{} + + testVersionDecodable(t, "no topics, no brokers, throttle time and cluster Id V3", &response, noBrokersNoTopicsWithThrottleTimeAndClusterIDV3, 3) + if response.ThrottleTimeMs != int32(16) { + t.Error("Decoding produced", response.ThrottleTimeMs, "should have been 16!") + } + if len(response.Brokers) != 0 { + t.Error("Decoding produced", response.Brokers, "should have been 0!") + } + if response.ControllerID != int32(1) { + t.Error("Decoding produced", response.ControllerID, "should have been 1!") + } + if *response.ClusterID != "clusterId" { + t.Error("Decoding produced", response.ClusterID, "should have been clusterId!") + } + if len(response.Topics) != 0 { + t.Error("Decoding produced", len(response.Topics), "should have been 0!") + } +} + +func TestMetadataResponseWithOfflineReplicasV5(t *testing.T) { + response := MetadataResponse{} + + testVersionDecodable(t, "no brokers, 1 topic with offline replica V5", &response, noBrokersOneTopicWithOfflineReplicasV5, 5) + if response.ThrottleTimeMs != int32(5) { + t.Error("Decoding produced", response.ThrottleTimeMs, "should have been 5!") + } + if len(response.Brokers) != 0 { + t.Error("Decoding produced", response.Brokers, "should have been 0!") + } + if response.ControllerID != int32(2) { + t.Error("Decoding produced", response.ControllerID, "should have been 21!") + } + if *response.ClusterID != "clusterId" { + t.Error("Decoding produced", response.ClusterID, "should have been clusterId!") + } + if len(response.Topics) != 1 { + t.Error("Decoding produced", len(response.Topics), "should have been 1!") + } + if len(response.Topics[0].Partitions[0].OfflineReplicas) != 1 { + t.Error("Decoding produced", len(response.Topics[0].Partitions[0].OfflineReplicas), "should have been 1!") } }