Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Option to disable NACK throttling for consumer group #274

Merged
merged 5 commits into from
Aug 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 32 additions & 10 deletions clients/metadata/metadata_cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package metadata
import (
"encoding/json"
"fmt"
"reflect"
"regexp"
"strings"
"time"
Expand Down Expand Up @@ -162,6 +163,7 @@ const (
columnVisible = "visible"
columnZone = "zone"
columnZoneConfigs = "zone_configs"
columnOptions = "options"
)

const userOperationTTL = "2592000" // 30 days
Expand Down Expand Up @@ -589,6 +591,7 @@ func getUtilConsumerGroupDescription() *shared.ConsumerGroupDescription {
result.IsMultiZone = common.BoolPtr(false)
result.ActiveZone = common.StringPtr("")
result.ZoneConfigs = shared.ConsumerGroupDescription_ZoneConfigs_DEFAULT
result.Options = make(map[string]string, 0)

return result
}
Expand Down Expand Up @@ -1183,7 +1186,8 @@ const (
columnOwnerEmail + `: ?, ` +
columnIsMultiZone + `: ?, ` +
columnActiveZone + `: ?, ` +
columnZoneConfigs + `: ? }`
columnZoneConfigs + `: ?, ` +
columnOptions + `: ? }`

sqlInsertCGByUUID = `INSERT INTO ` + tableConsumerGroups +
`(` +
Expand Down Expand Up @@ -1214,7 +1218,8 @@ const (
columnConsumerGroup + `.` + columnOwnerEmail + "," +
columnConsumerGroup + `.` + columnIsMultiZone + "," +
columnConsumerGroup + `.` + columnActiveZone + "," +
columnConsumerGroup + `.` + columnZoneConfigs +
columnConsumerGroup + `.` + columnZoneConfigs + "," +
columnConsumerGroup + `.` + columnOptions +
` FROM ` + tableConsumerGroupsByName +
` WHERE ` + columnDestinationUUID + `=? and ` + columnName + `=?`

Expand All @@ -1232,7 +1237,8 @@ const (
columnConsumerGroup + `.` + columnOwnerEmail + "," +
columnConsumerGroup + `.` + columnIsMultiZone + "," +
columnConsumerGroup + `.` + columnActiveZone + "," +
columnConsumerGroup + `.` + columnZoneConfigs +
columnConsumerGroup + `.` + columnZoneConfigs + "," +
columnConsumerGroup + `.` + columnOptions +
` FROM ` + tableConsumerGroups

sqlGetCGByUUID = sqlGetCG + ` WHERE ` + columnUUID + `=?`
Expand All @@ -1251,7 +1257,8 @@ const (
columnConsumerGroup + `.` + columnOwnerEmail + "," +
columnConsumerGroup + `.` + columnIsMultiZone + "," +
columnConsumerGroup + `.` + columnActiveZone + "," +
columnConsumerGroup + `.` + columnZoneConfigs +
columnConsumerGroup + `.` + columnZoneConfigs + "," +
columnConsumerGroup + `.` + columnOptions +
` FROM ` + tableConsumerGroupsByName +
` WHERE ` + columnDestinationUUID + `=?`

Expand Down Expand Up @@ -1341,7 +1348,8 @@ func (s *CassandraMetadataService) CreateConsumerGroupUUID(ctx thrift.Context, r
createRequest.GetOwnerEmail(),
createRequest.GetIsMultiZone(),
createRequest.GetActiveZone(),
marshalCgZoneConfigs(createRequest.GetZoneConfigs())).Exec()
marshalCgZoneConfigs(createRequest.GetZoneConfigs()),
createRequest.GetOptions()).Exec()

if err != nil {
return nil, &shared.InternalServiceError{
Expand All @@ -1367,7 +1375,8 @@ func (s *CassandraMetadataService) CreateConsumerGroupUUID(ctx thrift.Context, r
createRequest.GetOwnerEmail(),
createRequest.GetIsMultiZone(),
createRequest.GetActiveZone(),
marshalCgZoneConfigs(createRequest.GetZoneConfigs()))
marshalCgZoneConfigs(createRequest.GetZoneConfigs()),
createRequest.GetOptions())

previous := make(map[string]interface{}) // We actually throw away the old values below, but passing nil causes a panic

Expand Down Expand Up @@ -1412,6 +1421,7 @@ func (s *CassandraMetadataService) CreateConsumerGroupUUID(ctx thrift.Context, r
IsMultiZone: common.BoolPtr(createRequest.GetIsMultiZone()),
ActiveZone: common.StringPtr(createRequest.GetActiveZone()),
ZoneConfigs: createRequest.GetZoneConfigs(),
Options: createRequest.GetOptions(),
}, nil
}

Expand Down Expand Up @@ -1468,7 +1478,8 @@ func (s *CassandraMetadataService) readConsumerGroupByDstUUID(dstUUID string, cg
result.OwnerEmail,
result.IsMultiZone,
result.ActiveZone,
&zoneConfigsData); err != nil {
&zoneConfigsData,
&result.Options); err != nil {
if err == gocql.ErrNotFound {
return nil, &shared.EntityNotExistsError{
Message: fmt.Sprintf("ConsumerGroup %s of destinationUUID %s does not exist", cgName, dstUUID),
Expand Down Expand Up @@ -1539,7 +1550,8 @@ func (s *CassandraMetadataService) ReadConsumerGroupByUUID(ctx thrift.Context, r
result.OwnerEmail,
result.IsMultiZone,
result.ActiveZone,
&zoneConfigsData); err != nil {
&zoneConfigsData,
&result.Options); err != nil {
if err == gocql.ErrNotFound {
return nil, &shared.EntityNotExistsError{
Message: fmt.Sprintf("ConsumerGroup %s does not exist", *request.ConsumerGroupUUID),
Expand Down Expand Up @@ -1600,6 +1612,11 @@ func updateCGDescIfChanged(req *shared.UpdateConsumerGroupRequest, cgDesc *share
cgDesc.IsMultiZone = common.BoolPtr(true)
}

if req.IsSetOptions() && !reflect.DeepEqual(req.GetOptions(), cgDesc.GetOptions()) {
isChanged = true
cgDesc.Options = req.Options
}

return isChanged
}

Expand Down Expand Up @@ -1648,6 +1665,7 @@ func (s *CassandraMetadataService) UpdateConsumerGroup(ctx thrift.Context, reque
newCG.GetIsMultiZone(),
newCG.GetActiveZone(),
marshalCgZoneConfigs(newCG.GetZoneConfigs()),
newCG.GetOptions(),
// Query columns
newCG.GetConsumerGroupUUID())

Expand All @@ -1668,6 +1686,7 @@ func (s *CassandraMetadataService) UpdateConsumerGroup(ctx thrift.Context, reque
newCG.GetIsMultiZone(),
newCG.GetActiveZone(),
marshalCgZoneConfigs(newCG.GetZoneConfigs()),
newCG.GetOptions(),
// Query columns
newCG.GetDestinationUUID(),
newCG.GetConsumerGroupName())
Expand Down Expand Up @@ -1788,6 +1807,7 @@ func (s *CassandraMetadataService) DeleteConsumerGroup(ctx thrift.Context, reque
existingCG.GetIsMultiZone(),
existingCG.GetActiveZone(),
marshalCgZoneConfigs(existingCG.GetZoneConfigs()),
existingCG.GetOptions(),
defaultDeleteTTLSeconds)

batch.Query(sqlDeleteCGByName,
Expand Down Expand Up @@ -1891,7 +1911,8 @@ func (s *CassandraMetadataService) ListConsumerGroups(ctx thrift.Context, reques
cg.OwnerEmail,
cg.IsMultiZone,
cg.ActiveZone,
&zoneConfigsData) {
&zoneConfigsData,
&cg.Options) {

// Get a new item within limit
if cg.GetStatus() == shared.ConsumerGroupStatus_DELETED {
Expand Down Expand Up @@ -1954,7 +1975,8 @@ func (s *CassandraMetadataService) ListAllConsumerGroups(ctx thrift.Context, req
cg.OwnerEmail,
cg.IsMultiZone,
cg.ActiveZone,
&zoneConfigsData) {
&zoneConfigsData,
&cg.Options) {

cg.ZoneConfigs = unmarshalCgZoneConfigs(zoneConfigsData)
result.ConsumerGroups = append(result.ConsumerGroups, cg)
Expand Down
20 changes: 18 additions & 2 deletions clients/metadata/metadata_cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ type CassandraSuite struct {
TestCluster
}

const testPageSize = 2
const (
testPageSize = 2
)

func TestCassandraSuite(t *testing.T) {
s := new(CassandraSuite)
Expand Down Expand Up @@ -1676,6 +1678,7 @@ func assertConsumerGroupsEqual(s *CassandraSuite, expected, got *shared.Consumer
s.Equal(expected.GetOwnerEmail(), got.GetOwnerEmail(), "Wrong OwnerEmail")
s.Equal(expected.GetDeadLetterQueueDestinationUUID(), got.GetDeadLetterQueueDestinationUUID(), "Wrong DeadLetterQueueDestinationUUID")
s.Equal(expected.GetActiveZone(), got.GetActiveZone(), "Wrong ActiveZone")
s.Equal(expected.GetOptions(), got.GetOptions(), "Wrong Options")
}

func (s *CassandraSuite) TestDeleteConsumerGroupDeletesDLQ() {
Expand Down Expand Up @@ -1753,6 +1756,9 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() {
Visible: common.BoolPtr(false),
}

options := make(map[string]string)
options[common.FlagDisableNackThrottling] = "true"

cgName := s.generateName("/foo/bar_consumer")

createReq := &shared.CreateConsumerGroupRequest{
Expand All @@ -1767,6 +1773,7 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() {
IsMultiZone: common.BoolPtr(true),
ActiveZone: common.StringPtr("zone1"),
ZoneConfigs: []*shared.ConsumerGroupZoneConfig{zoneConfig},
Options: options,
}

expectedCG := &shared.ConsumerGroupDescription{
Expand All @@ -1782,6 +1789,7 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() {
IsMultiZone: common.BoolPtr(createReq.GetIsMultiZone()),
ActiveZone: common.StringPtr(createReq.GetActiveZone()),
ZoneConfigs: createReq.GetZoneConfigs(),
Options: options,
}

expectedCGOrig := new(shared.ConsumerGroupDescription)
Expand Down Expand Up @@ -1827,6 +1835,8 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() {
assert.Nil(err, "ReadConsumerGroup failed")
assertConsumerGroupsEqual(s, expectedCG, gotCG)

options[common.FlagDisableNackThrottling] = "false"

readReq.ConsumerGroupUUID = common.StringPtr(gotCG.GetConsumerGroupUUID())
gotCG, err = s.client.ReadConsumerGroupByUUID(nil, readReq)
assert.Nil(err, "ReadConsumerGroupByUUID failed")
Expand All @@ -1840,6 +1850,7 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() {
SkipOlderMessagesSeconds: common.Int32Ptr(100),
DelaySeconds: common.Int32Ptr(100),
OwnerEmail: common.StringPtr("consumer_test@uber.com"),
Options: options,
}

if pass%2 == 0 {
Expand All @@ -1855,6 +1866,7 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() {
expectedCG.DelaySeconds = common.Int32Ptr(updateReq.GetDelaySeconds())
expectedCG.OwnerEmail = common.StringPtr(updateReq.GetOwnerEmail())
expectedCG.ActiveZone = common.StringPtr(updateReq.GetActiveZone())
expectedCG.Options = options

gotCG = nil
gotCG, err = s.client.UpdateConsumerGroup(nil, updateReq)
Expand Down Expand Up @@ -2099,6 +2111,9 @@ func (s *CassandraSuite) TestListAllConsumerGroups() {
dstUUID := dstInfo.GetDestinationUUID()
groupMap := make(map[string]string)

options := make(map[string]string)
options[common.FlagDisableNackThrottling] = "true"

for i := 0; i < 10; i++ {
name := s.generateName(fmt.Sprintf("foobar-consumer-%v", i))
var createReq *shared.CreateConsumerGroupRequest
Expand All @@ -2112,6 +2127,7 @@ func (s *CassandraSuite) TestListAllConsumerGroups() {
SkipOlderMessagesSeconds: common.Int32Ptr(60),
DelaySeconds: common.Int32Ptr(60),
OwnerEmail: common.StringPtr("consumer_test@uber.com"),
Options: options,
}

_, err = s.client.CreateConsumerGroup(nil, createReq)
Expand Down Expand Up @@ -2142,10 +2158,10 @@ func (s *CassandraSuite) TestListAllConsumerGroups() {
s.Equal(int32(60), gotCG.GetSkipOlderMessagesSeconds())
s.Equal(int32(60), gotCG.GetDelaySeconds())
s.Equal(string("consumer_test@uber.com"), gotCG.GetOwnerEmail())
s.Equal(options, gotCG.GetOptions())
delete(groupMap, gotCG.GetConsumerGroupName())
}
}

}
if len(listRes.GetNextPageToken()) == 0 {
break
Expand Down
4 changes: 3 additions & 1 deletion clients/metadata/schema/metadata.cql
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ CREATE TYPE destination (
checksum_option int,
is_multi_zone boolean,
zone_configs list<frozen<destination_zone_config>>,
schema_version int
schema_version int,
options map<text, text>,
);

CREATE TABLE destinations (
Expand Down Expand Up @@ -220,6 +221,7 @@ CREATE TYPE consumer_group (
is_multi_zone boolean,
active_zone text,
zone_configs list<frozen<consumer_group_zone_config>>,
options map<text, text>,
);

CREATE TABLE consumer_groups (
Expand Down
2 changes: 2 additions & 0 deletions clients/metadata/schema/v16/201708090000_options.cql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TYPE consumer_group ADD options map<text, text>;
ALTER TYPE destination ADD options map<text, text>;
8 changes: 8 additions & 0 deletions clients/metadata/schema/v16/manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"CurrVersion": 16,
"MinCompatibleVersion": 8,
"Description": "Add options for cg and destination",
"SchemaUpdateCqlFiles": [
"201708090000_options.cql"
]
}
8 changes: 8 additions & 0 deletions cmd/tools/common/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ func SetCommonCommands(
Name: "zone_config, zc",
Usage: "Zone configs for multi-zone CG. For each zone, specify \"Zone,PreferedActiveZone\"; ex: \"zone1,false\"",
},
cli.BoolFlag{
Name: common.FlagDisableNackThrottling,
Usage: "Disable nack throttling for consumer group",
},
},
Action: func(c *cli.Context) {
if authEnabled {
Expand Down Expand Up @@ -366,6 +370,10 @@ func SetCommonCommands(
Name: "zone_config, zc",
Usage: "Zone configs for multi_zone consumer group. Format for each zone should be \"ZoneName,PreferedActiveZone\". For example: \"zone1,false\"",
},
cli.BoolFlag{
Name: common.FlagDisableNackThrottling,
Usage: "Disable nack throttling for consumer group",
},
},
Action: func(c *cli.Context) {
if authEnabled {
Expand Down
5 changes: 5 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,8 @@ const (
// KafkaPhantomExtentStorehost is placeholder/phantom storehost uuid used for Kafka extents
KafkaPhantomExtentStorehost = "00000000-0000-0000-0000-000000000000"
)

const (
// FlagDisableNackThrottling is the flag string for disabling Nack throttling
FlagDisableNackThrottling = "disable_nack_throttling"
)
2 changes: 1 addition & 1 deletion glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions services/frontendhost/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ func convertCreateCGRequestToInternal(createRequest *c.CreateConsumerGroupReques
internalCreateRequest.ZoneConfigs = append(internalCreateRequest.ZoneConfigs, convertCGZoneConfigToInternal(cgZoneCfg))
}
}
internalCreateRequest.Options = createRequest.GetOptions()

return internalCreateRequest, nil
}
Expand Down Expand Up @@ -427,6 +428,10 @@ func convertUpdateCGRequestToInternal(updateRequest *c.UpdateConsumerGroupReques
internalUpdateRequest.ZoneConfigs = append(internalUpdateRequest.ZoneConfigs, convertCGZoneConfigToInternal(cgZoneCfg))
}
}
if updateRequest.IsSetOptions() {
internalUpdateRequest.Options = updateRequest.GetOptions()

}
return internalUpdateRequest
}

Expand Down Expand Up @@ -492,6 +497,7 @@ func (h *Frontend) convertConsumerGroupFromInternal(ctx thrift.Context, _cgDesc
}
cgDesc.ZoneConfigs.ActiveZone = common.StringPtr(_cgDesc.GetActiveZone())
}
cgDesc.Options = _cgDesc.GetOptions()
return
}

Expand Down
Loading