Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
imjustfly authored Mar 16, 2018
2 parents e06b9a2 + 102a489 commit 8a87ca8
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 22 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ language: go
go:
- 1.8.x
- 1.9.x
- 1.10.x

env:
global:
Expand All @@ -11,7 +12,6 @@ env:
- KAFKA_HOSTNAME=localhost
- DEBUG=true
matrix:
- KAFKA_VERSION=0.10.2.1
- KAFKA_VERSION=0.11.0.2
- KAFKA_VERSION=1.0.0

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ You might also want to look at the [Frequently Asked Questions](https://github.c
Sarama provides a "2 releases + 2 months" compatibility guarantee: we support
the two latest stable releases of Kafka and Go, and we provide a two month
grace period for older releases. This means we currently officially support
Go 1.9 and 1.8, and Kafka 1.0 through 0.10, although older releases are
Go 1.8 through 1.10, and Kafka 0.11 through 1.0, although older releases are
still likely to work.

Sarama follows semantic versioning and provides API stability via the gopkg.in service.
Expand Down
11 changes: 11 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,17 @@ func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse,
return response, nil
}

func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
response := new(CreatePartitionsResponse)

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
response := new(CreateTopicsResponse)

Expand Down
9 changes: 7 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,8 +686,9 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int)

switch err.(type) {
case nil:
allKnownMetaData := len(topics) == 0
// valid response, use it
shouldRetry, err := client.updateMetadata(response)
shouldRetry, err := client.updateMetadata(response, allKnownMetaData)
if shouldRetry {
Logger.Println("client/metadata found some partitions to be leaderless")
return retry(err) // note: err can be nil
Expand All @@ -711,7 +712,7 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int)
}

// if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
func (client *client) updateMetadata(data *MetadataResponse) (retry bool, err error) {
func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bool) (retry bool, err error) {
client.lock.Lock()
defer client.lock.Unlock()

Expand All @@ -725,6 +726,10 @@ func (client *client) updateMetadata(data *MetadataResponse) (retry bool, err er

client.controllerID = data.ControllerID

if allKnownMetaData {
client.metadata = make(map[string]map[int32]*PartitionMetadata)
client.cachedPartitionsResults = make(map[string][maxPartitionIndex][]int32)
}
for _, topic := range data.Topics {
delete(client.metadata, topic.Name)
delete(client.cachedPartitionsResults, topic.Name)
Expand Down
19 changes: 1 addition & 18 deletions message_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package sarama

import (
"runtime"
"strings"
"testing"
"time"
)
Expand Down Expand Up @@ -31,17 +29,6 @@ var (
0xFF, 0xFF, 0xFF, 0xFF} // value

emptyGzipMessage = []byte{
97, 79, 149, 90, //CRC
0x00, // magic version byte
0x01, // attribute flags
0xFF, 0xFF, 0xFF, 0xFF, // key
// value
0x00, 0x00, 0x00, 0x17,
0x1f, 0x8b,
0x08,
0, 0, 9, 110, 136, 0, 255, 1, 0, 0, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0}

emptyGzipMessage18 = []byte{
132, 99, 80, 148, //CRC
0x00, // magic version byte
0x01, // attribute flags
Expand Down Expand Up @@ -107,11 +94,7 @@ func TestMessageEncoding(t *testing.T) {

message.Value = []byte{}
message.Codec = CompressionGZIP
if strings.HasPrefix(runtime.Version(), "go1.8") || strings.HasPrefix(runtime.Version(), "go1.9") {
testEncodable(t, "empty gzip", &message, emptyGzipMessage18)
} else {
testEncodable(t, "empty gzip", &message, emptyGzipMessage)
}
testEncodable(t, "empty gzip", &message, emptyGzipMessage)

message.Value = []byte{}
message.Codec = CompressionLZ4
Expand Down

0 comments on commit 8a87ca8

Please sign in to comment.