Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for Metadata Request/Response up to v5 #1069

Merged
merged 1 commit into from
Apr 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions metadata_request.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package sarama

type MetadataRequest struct {
Version int16
Topics []string
Version int16
Topics []string
AllowAutoTopicCreation bool
}

func (r *MetadataRequest) encode(pe packetEncoder) error {
if r.Version < 0 || r.Version > 1 {
if r.Version < 0 || r.Version > 5 {
return PacketEncodingError{"invalid or unsupported MetadataRequest version field"}
}
if r.Version == 0 || r.Topics != nil || len(r.Topics) > 0 {
Expand All @@ -24,6 +25,9 @@ func (r *MetadataRequest) encode(pe packetEncoder) error {
} else {
pe.putInt32(-1)
}
if r.Version > 3 {
pe.putBool(r.AllowAutoTopicCreation)
}
return nil
}

Expand All @@ -49,9 +53,15 @@ func (r *MetadataRequest) decode(pd packetDecoder, version int16) error {
}
r.Topics[i] = topic
}
return nil
}

if r.Version > 3 {
autoCreation, err := pd.getBool()
if err != nil {
return err
}
r.AllowAutoTopicCreation = autoCreation
}
return nil
}

func (r *MetadataRequest) key() int16 {
Expand All @@ -66,6 +76,12 @@ func (r *MetadataRequest) requiredVersion() KafkaVersion {
switch r.Version {
case 1:
return V0_10_0_0
case 2:
return V0_10_1_0
case 3, 4:
return V0_11_0_0
case 5:
return V1_0_0_0
default:
return MinVersion
}
Expand Down
32 changes: 32 additions & 0 deletions metadata_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ var (

metadataRequestNoTopicsV1 = []byte{
0xff, 0xff, 0xff, 0xff}

metadataRequestAutoCreateV4 = append(metadataRequestOneTopicV0, byte(1))
metadataRequestNoAutoCreateV4 = append(metadataRequestOneTopicV0, byte(0))
)

func TestMetadataRequestV0(t *testing.T) {
Expand All @@ -42,3 +45,32 @@ func TestMetadataRequestV1(t *testing.T) {
request.Topics = []string{"foo", "bar", "baz"}
testRequest(t, "three topics", request, metadataRequestThreeTopicsV0)
}

func TestMetadataRequestV2(t *testing.T) {
request := new(MetadataRequest)
request.Version = 2
testRequest(t, "no topics", request, metadataRequestNoTopicsV1)

request.Topics = []string{"topic1"}
testRequest(t, "one topic", request, metadataRequestOneTopicV0)
}

func TestMetadataRequestV3(t *testing.T) {
request := new(MetadataRequest)
request.Version = 3
testRequest(t, "no topics", request, metadataRequestNoTopicsV1)

request.Topics = []string{"topic1"}
testRequest(t, "one topic", request, metadataRequestOneTopicV0)
}

func TestMetadataRequestV4(t *testing.T) {
request := new(MetadataRequest)
request.Version = 4
request.Topics = []string{"topic1"}
request.AllowAutoTopicCreation = true
testRequest(t, "one topic", request, metadataRequestAutoCreateV4)

request.AllowAutoTopicCreation = false
testRequest(t, "one topic", request, metadataRequestNoAutoCreateV4)
}
74 changes: 59 additions & 15 deletions metadata_response.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package sarama

type PartitionMetadata struct {
Err KError
ID int32
Leader int32
Replicas []int32
Isr []int32
Err KError
ID int32
Leader int32
Replicas []int32
Isr []int32
OfflineReplicas []int32
}

func (pm *PartitionMetadata) decode(pd packetDecoder) (err error) {
func (pm *PartitionMetadata) decode(pd packetDecoder, version int16) (err error) {
tmp, err := pd.getInt16()
if err != nil {
return err
Expand All @@ -35,10 +36,17 @@ func (pm *PartitionMetadata) decode(pd packetDecoder) (err error) {
return err
}

if version >= 5 {
pm.OfflineReplicas, err = pd.getInt32Array()
if err != nil {
return err
}
}

return nil
}

func (pm *PartitionMetadata) encode(pe packetEncoder) (err error) {
func (pm *PartitionMetadata) encode(pe packetEncoder, version int16) (err error) {
pe.putInt16(int16(pm.Err))
pe.putInt32(pm.ID)
pe.putInt32(pm.Leader)
Expand All @@ -53,6 +61,13 @@ func (pm *PartitionMetadata) encode(pe packetEncoder) (err error) {
return err
}

if version >= 5 {
err = pe.putInt32Array(pm.OfflineReplicas)
if err != nil {
return err
}
}

return nil
}

Expand Down Expand Up @@ -89,7 +104,7 @@ func (tm *TopicMetadata) decode(pd packetDecoder, version int16) (err error) {
tm.Partitions = make([]*PartitionMetadata, n)
for i := 0; i < n; i++ {
tm.Partitions[i] = new(PartitionMetadata)
err = tm.Partitions[i].decode(pd)
err = tm.Partitions[i].decode(pd, version)
if err != nil {
return err
}
Expand All @@ -116,7 +131,7 @@ func (tm *TopicMetadata) encode(pe packetEncoder, version int16) (err error) {
}

for _, pm := range tm.Partitions {
err = pm.encode(pe)
err = pm.encode(pe, version)
if err != nil {
return err
}
Expand All @@ -126,13 +141,24 @@ func (tm *TopicMetadata) encode(pe packetEncoder, version int16) (err error) {
}

type MetadataResponse struct {
Version int16
Brokers []*Broker
ControllerID int32
Topics []*TopicMetadata
Version int16
ThrottleTimeMs int32
Brokers []*Broker
ClusterID *string
ControllerID int32
Topics []*TopicMetadata
}

func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) {
r.Version = version

if version >= 3 {
r.ThrottleTimeMs, err = pd.getInt32()
if err != nil {
return err
}
}

n, err := pd.getArrayLength()
if err != nil {
return err
Expand All @@ -147,6 +173,13 @@ func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) {
}
}

if version >= 2 {
r.ClusterID, err = pd.getNullableString()
if err != nil {
return err
}
}

if version >= 1 {
r.ControllerID, err = pd.getInt32()
if err != nil {
Expand Down Expand Up @@ -208,11 +241,22 @@ func (r *MetadataResponse) key() int16 {
}

func (r *MetadataResponse) version() int16 {
return 0
return r.Version
}

func (r *MetadataResponse) requiredVersion() KafkaVersion {
return MinVersion
switch r.Version {
case 1:
return V0_10_0_0
case 2:
return V0_10_1_0
case 3, 4:
return V0_11_0_0
case 5:
return V1_0_0_0
default:
return MinVersion
}
}

// testing API
Expand Down
78 changes: 74 additions & 4 deletions metadata_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,31 @@ var (
0x00, 0x03, 'b', 'a', 'r',
0x01,
0x00, 0x00, 0x00, 0x00}

noBrokersNoTopicsWithThrottleTimeAndClusterIDV3 = []byte{
0x00, 0x00, 0x00, 0x10,
0x00, 0x00, 0x00, 0x00,
0x00, 0x09, 'c', 'l', 'u', 's', 't', 'e', 'r', 'I', 'd',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x00}

noBrokersOneTopicWithOfflineReplicasV5 = []byte{
0x00, 0x00, 0x00, 0x05,
0x00, 0x00, 0x00, 0x00,
0x00, 0x09, 'c', 'l', 'u', 's', 't', 'e', 'r', 'I', 'd',
0x00, 0x00, 0x00, 0x02,
0x00, 0x00, 0x00, 0x01,
0x00, 0x00,
0x00, 0x03, 'f', 'o', 'o',
0x00,
0x00, 0x00, 0x00, 0x01,
0x00, 0x04,
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x07,
0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03,
0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02,
0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x03,
}
)

func TestEmptyMetadataResponseV0(t *testing.T) {
Expand Down Expand Up @@ -206,15 +231,60 @@ func TestMetadataResponseWithTopicsV1(t *testing.T) {
t.Error("Decoding produced", len(response.Brokers), "brokers where there were none!")
}
if response.ControllerID != 4 {
t.Error("Decoding produced", len(response.Brokers), "should have been 4!")
t.Error("Decoding produced", response.ControllerID, "should have been 4!")
}
if len(response.Topics) != 2 {
t.Error("Decoding produced", len(response.Brokers), "topics where there were 2!")
t.Error("Decoding produced", len(response.Topics), "topics where there were 2!")
}
if response.Topics[0].IsInternal {
t.Error("Decoding produced", response.ControllerID, "topic0 should have been false!")
t.Error("Decoding produced", response.Topics[0], "topic0 should have been false!")
}
if !response.Topics[1].IsInternal {
t.Error("Decoding produced", response.ControllerID, "topic1 should have been true!")
t.Error("Decoding produced", response.Topics[1], "topic1 should have been true!")
}
}

func TestMetadataResponseWithThrottleTime(t *testing.T) {
response := MetadataResponse{}

testVersionDecodable(t, "no topics, no brokers, throttle time and cluster Id V3", &response, noBrokersNoTopicsWithThrottleTimeAndClusterIDV3, 3)
if response.ThrottleTimeMs != int32(16) {
t.Error("Decoding produced", response.ThrottleTimeMs, "should have been 16!")
}
if len(response.Brokers) != 0 {
t.Error("Decoding produced", response.Brokers, "should have been 0!")
}
if response.ControllerID != int32(1) {
t.Error("Decoding produced", response.ControllerID, "should have been 1!")
}
if *response.ClusterID != "clusterId" {
t.Error("Decoding produced", response.ClusterID, "should have been clusterId!")
}
if len(response.Topics) != 0 {
t.Error("Decoding produced", len(response.Topics), "should have been 0!")
}
}

func TestMetadataResponseWithOfflineReplicasV5(t *testing.T) {
response := MetadataResponse{}

testVersionDecodable(t, "no brokers, 1 topic with offline replica V5", &response, noBrokersOneTopicWithOfflineReplicasV5, 5)
if response.ThrottleTimeMs != int32(5) {
t.Error("Decoding produced", response.ThrottleTimeMs, "should have been 5!")
}
if len(response.Brokers) != 0 {
t.Error("Decoding produced", response.Brokers, "should have been 0!")
}
if response.ControllerID != int32(2) {
t.Error("Decoding produced", response.ControllerID, "should have been 21!")
}
if *response.ClusterID != "clusterId" {
t.Error("Decoding produced", response.ClusterID, "should have been clusterId!")
}
if len(response.Topics) != 1 {
t.Error("Decoding produced", len(response.Topics), "should have been 1!")
}
if len(response.Topics[0].Partitions[0].OfflineReplicas) != 1 {
t.Error("Decoding produced", len(response.Topics[0].Partitions[0].OfflineReplicas), "should have been 1!")
}
}