Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Option to disable NACK throttling for consumer group
Browse files Browse the repository at this point in the history
  • Loading branch information
kobeyang committed Aug 14, 2017
1 parent 1341daf commit 3f3b84a
Show file tree
Hide file tree
Showing 13 changed files with 139 additions and 16 deletions.
42 changes: 32 additions & 10 deletions clients/metadata/metadata_cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package metadata
import (
"encoding/json"
"fmt"
"reflect"
"regexp"
"strings"
"time"
Expand Down Expand Up @@ -162,6 +163,7 @@ const (
columnVisible = "visible"
columnZone = "zone"
columnZoneConfigs = "zone_configs"
columnOptions = "options"
)

const userOperationTTL = "2592000" // 30 days
Expand Down Expand Up @@ -588,6 +590,7 @@ func getUtilConsumerGroupDescription() *shared.ConsumerGroupDescription {
result.IsMultiZone = common.BoolPtr(false)
result.ActiveZone = common.StringPtr("")
result.ZoneConfigs = shared.ConsumerGroupDescription_ZoneConfigs_DEFAULT
result.Options = make(map[string]bool, 0)

return result
}
Expand Down Expand Up @@ -1182,7 +1185,8 @@ const (
columnOwnerEmail + `: ?, ` +
columnIsMultiZone + `: ?, ` +
columnActiveZone + `: ?, ` +
columnZoneConfigs + `: ? }`
columnZoneConfigs + `: ?, ` +
columnOptions + `: ? }`

sqlInsertCGByUUID = `INSERT INTO ` + tableConsumerGroups +
`(` +
Expand Down Expand Up @@ -1213,7 +1217,8 @@ const (
columnConsumerGroup + `.` + columnOwnerEmail + "," +
columnConsumerGroup + `.` + columnIsMultiZone + "," +
columnConsumerGroup + `.` + columnActiveZone + "," +
columnConsumerGroup + `.` + columnZoneConfigs +
columnConsumerGroup + `.` + columnZoneConfigs + "," +
columnConsumerGroup + `.` + columnOptions +
` FROM ` + tableConsumerGroupsByName +
` WHERE ` + columnDestinationUUID + `=? and ` + columnName + `=?`

Expand All @@ -1231,7 +1236,8 @@ const (
columnConsumerGroup + `.` + columnOwnerEmail + "," +
columnConsumerGroup + `.` + columnIsMultiZone + "," +
columnConsumerGroup + `.` + columnActiveZone + "," +
columnConsumerGroup + `.` + columnZoneConfigs +
columnConsumerGroup + `.` + columnZoneConfigs + "," +
columnConsumerGroup + `.` + columnOptions +
` FROM ` + tableConsumerGroups

sqlGetCGByUUID = sqlGetCG + ` WHERE ` + columnUUID + `=?`
Expand All @@ -1250,7 +1256,8 @@ const (
columnConsumerGroup + `.` + columnOwnerEmail + "," +
columnConsumerGroup + `.` + columnIsMultiZone + "," +
columnConsumerGroup + `.` + columnActiveZone + "," +
columnConsumerGroup + `.` + columnZoneConfigs +
columnConsumerGroup + `.` + columnZoneConfigs + "," +
columnConsumerGroup + `.` + columnOptions +
` FROM ` + tableConsumerGroupsByName +
` WHERE ` + columnDestinationUUID + `=?`

Expand Down Expand Up @@ -1340,7 +1347,8 @@ func (s *CassandraMetadataService) CreateConsumerGroupUUID(ctx thrift.Context, r
createRequest.GetOwnerEmail(),
createRequest.GetIsMultiZone(),
createRequest.GetActiveZone(),
marshalCgZoneConfigs(createRequest.GetZoneConfigs())).Exec()
marshalCgZoneConfigs(createRequest.GetZoneConfigs()),
createRequest.GetOptions()).Exec()

if err != nil {
return nil, &shared.InternalServiceError{
Expand All @@ -1366,7 +1374,8 @@ func (s *CassandraMetadataService) CreateConsumerGroupUUID(ctx thrift.Context, r
createRequest.GetOwnerEmail(),
createRequest.GetIsMultiZone(),
createRequest.GetActiveZone(),
marshalCgZoneConfigs(createRequest.GetZoneConfigs()))
marshalCgZoneConfigs(createRequest.GetZoneConfigs()),
createRequest.GetOptions())

previous := make(map[string]interface{}) // We actually throw away the old values below, but passing nil causes a panic

Expand Down Expand Up @@ -1411,6 +1420,7 @@ func (s *CassandraMetadataService) CreateConsumerGroupUUID(ctx thrift.Context, r
IsMultiZone: common.BoolPtr(createRequest.GetIsMultiZone()),
ActiveZone: common.StringPtr(createRequest.GetActiveZone()),
ZoneConfigs: createRequest.GetZoneConfigs(),
Options: createRequest.GetOptions(),
}, nil
}

Expand Down Expand Up @@ -1467,7 +1477,8 @@ func (s *CassandraMetadataService) readConsumerGroupByDstUUID(dstUUID string, cg
result.OwnerEmail,
result.IsMultiZone,
result.ActiveZone,
&zoneConfigsData); err != nil {
&zoneConfigsData,
&result.Options); err != nil {
if err == gocql.ErrNotFound {
return nil, &shared.EntityNotExistsError{
Message: fmt.Sprintf("ConsumerGroup %s of destinationUUID %s does not exist", cgName, dstUUID),
Expand Down Expand Up @@ -1538,7 +1549,8 @@ func (s *CassandraMetadataService) ReadConsumerGroupByUUID(ctx thrift.Context, r
result.OwnerEmail,
result.IsMultiZone,
result.ActiveZone,
&zoneConfigsData); err != nil {
&zoneConfigsData,
&result.Options); err != nil {
if err == gocql.ErrNotFound {
return nil, &shared.EntityNotExistsError{
Message: fmt.Sprintf("ConsumerGroup %s does not exist", *request.ConsumerGroupUUID),
Expand Down Expand Up @@ -1599,6 +1611,11 @@ func updateCGDescIfChanged(req *shared.UpdateConsumerGroupRequest, cgDesc *share
cgDesc.IsMultiZone = common.BoolPtr(true)
}

if req.IsSetOptions() && !reflect.DeepEqual(req.GetOptions(), cgDesc.GetOptions()) {
isChanged = true
cgDesc.Options = req.Options
}

return isChanged
}

Expand Down Expand Up @@ -1647,6 +1664,7 @@ func (s *CassandraMetadataService) UpdateConsumerGroup(ctx thrift.Context, reque
newCG.GetIsMultiZone(),
newCG.GetActiveZone(),
marshalCgZoneConfigs(newCG.GetZoneConfigs()),
newCG.GetOptions(),
// Query columns
newCG.GetConsumerGroupUUID())

Expand All @@ -1667,6 +1685,7 @@ func (s *CassandraMetadataService) UpdateConsumerGroup(ctx thrift.Context, reque
newCG.GetIsMultiZone(),
newCG.GetActiveZone(),
marshalCgZoneConfigs(newCG.GetZoneConfigs()),
newCG.GetOptions(),
// Query columns
newCG.GetDestinationUUID(),
newCG.GetConsumerGroupName())
Expand Down Expand Up @@ -1787,6 +1806,7 @@ func (s *CassandraMetadataService) DeleteConsumerGroup(ctx thrift.Context, reque
existingCG.GetIsMultiZone(),
existingCG.GetActiveZone(),
marshalCgZoneConfigs(existingCG.GetZoneConfigs()),
existingCG.GetOptions(),
defaultDeleteTTLSeconds)

batch.Query(sqlDeleteCGByName,
Expand Down Expand Up @@ -1890,7 +1910,8 @@ func (s *CassandraMetadataService) ListConsumerGroups(ctx thrift.Context, reques
cg.OwnerEmail,
cg.IsMultiZone,
cg.ActiveZone,
&zoneConfigsData) {
&zoneConfigsData,
&cg.Options) {

// Get a new item within limit
if cg.GetStatus() == shared.ConsumerGroupStatus_DELETED {
Expand Down Expand Up @@ -1953,7 +1974,8 @@ func (s *CassandraMetadataService) ListAllConsumerGroups(ctx thrift.Context, req
cg.OwnerEmail,
cg.IsMultiZone,
cg.ActiveZone,
&zoneConfigsData) {
&zoneConfigsData,
&cg.Options) {

cg.ZoneConfigs = unmarshalCgZoneConfigs(zoneConfigsData)
result.ConsumerGroups = append(result.ConsumerGroups, cg)
Expand Down
21 changes: 19 additions & 2 deletions clients/metadata/metadata_cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ type CassandraSuite struct {
TestCluster
}

const testPageSize = 2
const (
testPageSize = 2
FlagDisableNackThrottling = "disable_nack_throttling"
)

func TestCassandraSuite(t *testing.T) {
s := new(CassandraSuite)
Expand Down Expand Up @@ -1676,6 +1679,7 @@ func assertConsumerGroupsEqual(s *CassandraSuite, expected, got *shared.Consumer
s.Equal(expected.GetOwnerEmail(), got.GetOwnerEmail(), "Wrong OwnerEmail")
s.Equal(expected.GetDeadLetterQueueDestinationUUID(), got.GetDeadLetterQueueDestinationUUID(), "Wrong DeadLetterQueueDestinationUUID")
s.Equal(expected.GetActiveZone(), got.GetActiveZone(), "Wrong ActiveZone")
s.Equal(expected.GetOptions(), got.GetOptions(), "Wrong Options")
}

func (s *CassandraSuite) TestDeleteConsumerGroupDeletesDLQ() {
Expand Down Expand Up @@ -1753,6 +1757,9 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() {
Visible: common.BoolPtr(false),
}

options := make(map[string]bool)
options[FlagDisableNackThrottling] = true

cgName := s.generateName("/foo/bar_consumer")

createReq := &shared.CreateConsumerGroupRequest{
Expand All @@ -1767,6 +1774,7 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() {
IsMultiZone: common.BoolPtr(true),
ActiveZone: common.StringPtr("zone1"),
ZoneConfigs: []*shared.ConsumerGroupZoneConfig{zoneConfig},
Options: options,
}

expectedCG := &shared.ConsumerGroupDescription{
Expand All @@ -1782,6 +1790,7 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() {
IsMultiZone: common.BoolPtr(createReq.GetIsMultiZone()),
ActiveZone: common.StringPtr(createReq.GetActiveZone()),
ZoneConfigs: createReq.GetZoneConfigs(),
Options: options,
}

expectedCGOrig := new(shared.ConsumerGroupDescription)
Expand Down Expand Up @@ -1827,6 +1836,8 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() {
assert.Nil(err, "ReadConsumerGroup failed")
assertConsumerGroupsEqual(s, expectedCG, gotCG)

options[FlagDisableNackThrottling] = false

readReq.ConsumerGroupUUID = common.StringPtr(gotCG.GetConsumerGroupUUID())
gotCG, err = s.client.ReadConsumerGroupByUUID(nil, readReq)
assert.Nil(err, "ReadConsumerGroupByUUID failed")
Expand All @@ -1840,6 +1851,7 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() {
SkipOlderMessagesSeconds: common.Int32Ptr(100),
DelaySeconds: common.Int32Ptr(100),
OwnerEmail: common.StringPtr("consumer_test@uber.com"),
Options: options,
}

if pass%2 == 0 {
Expand All @@ -1855,6 +1867,7 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() {
expectedCG.DelaySeconds = common.Int32Ptr(updateReq.GetDelaySeconds())
expectedCG.OwnerEmail = common.StringPtr(updateReq.GetOwnerEmail())
expectedCG.ActiveZone = common.StringPtr(updateReq.GetActiveZone())
expectedCG.Options = options

gotCG = nil
gotCG, err = s.client.UpdateConsumerGroup(nil, updateReq)
Expand Down Expand Up @@ -2099,6 +2112,9 @@ func (s *CassandraSuite) TestListAllConsumerGroups() {
dstUUID := dstInfo.GetDestinationUUID()
groupMap := make(map[string]string)

options := make(map[string]bool)
options[FlagDisableNackThrottling] = true

for i := 0; i < 10; i++ {
name := s.generateName(fmt.Sprintf("foobar-consumer-%v", i))
var createReq *shared.CreateConsumerGroupRequest
Expand All @@ -2112,6 +2128,7 @@ func (s *CassandraSuite) TestListAllConsumerGroups() {
SkipOlderMessagesSeconds: common.Int32Ptr(60),
DelaySeconds: common.Int32Ptr(60),
OwnerEmail: common.StringPtr("consumer_test@uber.com"),
Options: options,
}

_, err = s.client.CreateConsumerGroup(nil, createReq)
Expand Down Expand Up @@ -2142,10 +2159,10 @@ func (s *CassandraSuite) TestListAllConsumerGroups() {
s.Equal(int32(60), gotCG.GetSkipOlderMessagesSeconds())
s.Equal(int32(60), gotCG.GetDelaySeconds())
s.Equal(string("consumer_test@uber.com"), gotCG.GetOwnerEmail())
s.Equal(options, gotCG.GetOptions())
delete(groupMap, gotCG.GetConsumerGroupName())
}
}

}
if len(listRes.GetNextPageToken()) == 0 {
break
Expand Down
4 changes: 3 additions & 1 deletion clients/metadata/schema/metadata.cql
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ CREATE TYPE destination (
checksum_option int,
is_multi_zone boolean,
zone_configs list<frozen<destination_zone_config>>,
schema_version int
schema_version int,
options map<text, boolean>,
);

CREATE TABLE destinations (
Expand Down Expand Up @@ -220,6 +221,7 @@ CREATE TYPE consumer_group (
is_multi_zone boolean,
active_zone text,
zone_configs list<frozen<consumer_group_zone_config>>,
options map<text, boolean>,
);

CREATE TABLE consumer_groups (
Expand Down
2 changes: 2 additions & 0 deletions clients/metadata/schema/v16/201708090000_options.cql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TYPE consumer_group ADD options map<text, boolean>;
ALTER TYPE destination ADD options map<text, boolean>;
8 changes: 8 additions & 0 deletions clients/metadata/schema/v16/manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"CurrVersion": 16,
"MinCompatibleVersion": 8,
"Description": "Add options for cg and destination",
"SchemaUpdateCqlFiles": [
"201708090000_options.cql"
]
}
8 changes: 8 additions & 0 deletions cmd/tools/common/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ func SetCommonCommands(
Name: "zone_config, zc",
Usage: "Zone configs for multi-zone CG. For each zone, specify \"Zone,PreferedActiveZone\"; ex: \"zone1,false\"",
},
cli.BoolFlag{
Name: toolscommon.FlagDisableNackThrottling,
Usage: "Disable nack throttling for consumer group",
},
},
Action: func(c *cli.Context) {
if authEnabled {
Expand Down Expand Up @@ -366,6 +370,10 @@ func SetCommonCommands(
Name: "zone_config, zc",
Usage: "Zone configs for multi_zone consumer group. Format for each zone should be \"ZoneName,PreferedActiveZone\". For example: \"zone1,false\"",
},
cli.BoolFlag{
Name: toolscommon.FlagDisableNackThrottling,
Usage: "Disable nack throttling for consumer group",
},
},
Action: func(c *cli.Context) {
if authEnabled {
Expand Down
2 changes: 1 addition & 1 deletion glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions services/frontendhost/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ func convertCreateCGRequestToInternal(createRequest *c.CreateConsumerGroupReques
internalCreateRequest.ZoneConfigs = append(internalCreateRequest.ZoneConfigs, convertCGZoneConfigToInternal(cgZoneCfg))
}
}
internalCreateRequest.Options = createRequest.GetOptions()

return internalCreateRequest, nil
}
Expand Down Expand Up @@ -427,6 +428,10 @@ func convertUpdateCGRequestToInternal(updateRequest *c.UpdateConsumerGroupReques
internalUpdateRequest.ZoneConfigs = append(internalUpdateRequest.ZoneConfigs, convertCGZoneConfigToInternal(cgZoneCfg))
}
}
if updateRequest.IsSetOptions() {
internalUpdateRequest.Options = updateRequest.GetOptions()

}
return internalUpdateRequest
}

Expand Down Expand Up @@ -492,6 +497,7 @@ func (h *Frontend) convertConsumerGroupFromInternal(ctx thrift.Context, _cgDesc
}
cgDesc.ZoneConfigs.ActiveZone = common.StringPtr(_cgDesc.GetActiveZone())
}
cgDesc.Options = _cgDesc.GetOptions()
return
}

Expand Down
Loading

0 comments on commit 3f3b84a

Please sign in to comment.