diff --git a/clients/metadata/metadata_cassandra.go b/clients/metadata/metadata_cassandra.go index 5c28fb24..94f653b7 100644 --- a/clients/metadata/metadata_cassandra.go +++ b/clients/metadata/metadata_cassandra.go @@ -23,6 +23,7 @@ package metadata import ( "encoding/json" "fmt" + "reflect" "regexp" "strings" "time" @@ -162,6 +163,7 @@ const ( columnVisible = "visible" columnZone = "zone" columnZoneConfigs = "zone_configs" + columnOptions = "options" ) const userOperationTTL = "2592000" // 30 days @@ -588,6 +590,7 @@ func getUtilConsumerGroupDescription() *shared.ConsumerGroupDescription { result.IsMultiZone = common.BoolPtr(false) result.ActiveZone = common.StringPtr("") result.ZoneConfigs = shared.ConsumerGroupDescription_ZoneConfigs_DEFAULT + result.Options = make(map[string]bool, 0) return result } @@ -1182,7 +1185,8 @@ const ( columnOwnerEmail + `: ?, ` + columnIsMultiZone + `: ?, ` + columnActiveZone + `: ?, ` + - columnZoneConfigs + `: ? }` + columnZoneConfigs + `: ?, ` + + columnOptions + `: ? }` sqlInsertCGByUUID = `INSERT INTO ` + tableConsumerGroups + `(` + @@ -1213,7 +1217,8 @@ const ( columnConsumerGroup + `.` + columnOwnerEmail + "," + columnConsumerGroup + `.` + columnIsMultiZone + "," + columnConsumerGroup + `.` + columnActiveZone + "," + - columnConsumerGroup + `.` + columnZoneConfigs + + columnConsumerGroup + `.` + columnZoneConfigs + "," + + columnConsumerGroup + `.` + columnOptions + ` FROM ` + tableConsumerGroupsByName + ` WHERE ` + columnDestinationUUID + `=? and ` + columnName + `=?` @@ -1231,7 +1236,8 @@ const ( columnConsumerGroup + `.` + columnOwnerEmail + "," + columnConsumerGroup + `.` + columnIsMultiZone + "," + columnConsumerGroup + `.` + columnActiveZone + "," + - columnConsumerGroup + `.` + columnZoneConfigs + + columnConsumerGroup + `.` + columnZoneConfigs + "," + + columnConsumerGroup + `.` + columnOptions + ` FROM ` + tableConsumerGroups sqlGetCGByUUID = sqlGetCG + ` WHERE ` + columnUUID + `=?` @@ -1250,7 +1256,8 @@ const ( columnConsumerGroup + `.` + columnOwnerEmail + "," + columnConsumerGroup + `.` + columnIsMultiZone + "," + columnConsumerGroup + `.` + columnActiveZone + "," + - columnConsumerGroup + `.` + columnZoneConfigs + + columnConsumerGroup + `.` + columnZoneConfigs + "," + + columnConsumerGroup + `.` + columnOptions + ` FROM ` + tableConsumerGroupsByName + ` WHERE ` + columnDestinationUUID + `=?` @@ -1340,7 +1347,8 @@ func (s *CassandraMetadataService) CreateConsumerGroupUUID(ctx thrift.Context, r createRequest.GetOwnerEmail(), createRequest.GetIsMultiZone(), createRequest.GetActiveZone(), - marshalCgZoneConfigs(createRequest.GetZoneConfigs())).Exec() + marshalCgZoneConfigs(createRequest.GetZoneConfigs()), + createRequest.GetOptions()).Exec() if err != nil { return nil, &shared.InternalServiceError{ @@ -1366,7 +1374,8 @@ func (s *CassandraMetadataService) CreateConsumerGroupUUID(ctx thrift.Context, r createRequest.GetOwnerEmail(), createRequest.GetIsMultiZone(), createRequest.GetActiveZone(), - marshalCgZoneConfigs(createRequest.GetZoneConfigs())) + marshalCgZoneConfigs(createRequest.GetZoneConfigs()), + createRequest.GetOptions()) previous := make(map[string]interface{}) // We actually throw away the old values below, but passing nil causes a panic @@ -1411,6 +1420,7 @@ func (s *CassandraMetadataService) CreateConsumerGroupUUID(ctx thrift.Context, r IsMultiZone: common.BoolPtr(createRequest.GetIsMultiZone()), ActiveZone: common.StringPtr(createRequest.GetActiveZone()), ZoneConfigs: createRequest.GetZoneConfigs(), + Options: createRequest.GetOptions(), }, nil } @@ -1467,7 +1477,8 @@ func (s *CassandraMetadataService) readConsumerGroupByDstUUID(dstUUID string, cg result.OwnerEmail, result.IsMultiZone, result.ActiveZone, - &zoneConfigsData); err != nil { + &zoneConfigsData, + &result.Options); err != nil { if err == gocql.ErrNotFound { return nil, &shared.EntityNotExistsError{ Message: fmt.Sprintf("ConsumerGroup %s of destinationUUID %s does not exist", cgName, dstUUID), @@ -1538,7 +1549,8 @@ func (s *CassandraMetadataService) ReadConsumerGroupByUUID(ctx thrift.Context, r result.OwnerEmail, result.IsMultiZone, result.ActiveZone, - &zoneConfigsData); err != nil { + &zoneConfigsData, + &result.Options); err != nil { if err == gocql.ErrNotFound { return nil, &shared.EntityNotExistsError{ Message: fmt.Sprintf("ConsumerGroup %s does not exist", *request.ConsumerGroupUUID), @@ -1599,6 +1611,11 @@ func updateCGDescIfChanged(req *shared.UpdateConsumerGroupRequest, cgDesc *share cgDesc.IsMultiZone = common.BoolPtr(true) } + if req.IsSetOptions() && !reflect.DeepEqual(req.GetOptions(), cgDesc.GetOptions()) { + isChanged = true + cgDesc.Options = req.Options + } + return isChanged } @@ -1647,6 +1664,7 @@ func (s *CassandraMetadataService) UpdateConsumerGroup(ctx thrift.Context, reque newCG.GetIsMultiZone(), newCG.GetActiveZone(), marshalCgZoneConfigs(newCG.GetZoneConfigs()), + newCG.GetOptions(), // Query columns newCG.GetConsumerGroupUUID()) @@ -1667,6 +1685,7 @@ func (s *CassandraMetadataService) UpdateConsumerGroup(ctx thrift.Context, reque newCG.GetIsMultiZone(), newCG.GetActiveZone(), marshalCgZoneConfigs(newCG.GetZoneConfigs()), + newCG.GetOptions(), // Query columns newCG.GetDestinationUUID(), newCG.GetConsumerGroupName()) @@ -1787,6 +1806,7 @@ func (s *CassandraMetadataService) DeleteConsumerGroup(ctx thrift.Context, reque existingCG.GetIsMultiZone(), existingCG.GetActiveZone(), marshalCgZoneConfigs(existingCG.GetZoneConfigs()), + existingCG.GetOptions(), defaultDeleteTTLSeconds) batch.Query(sqlDeleteCGByName, @@ -1890,7 +1910,8 @@ func (s *CassandraMetadataService) ListConsumerGroups(ctx thrift.Context, reques cg.OwnerEmail, cg.IsMultiZone, cg.ActiveZone, - &zoneConfigsData) { + &zoneConfigsData, + &cg.Options) { // Get a new item within limit if cg.GetStatus() == shared.ConsumerGroupStatus_DELETED { @@ -1953,7 +1974,8 @@ func (s *CassandraMetadataService) ListAllConsumerGroups(ctx thrift.Context, req cg.OwnerEmail, cg.IsMultiZone, cg.ActiveZone, - &zoneConfigsData) { + &zoneConfigsData, + &cg.Options) { cg.ZoneConfigs = unmarshalCgZoneConfigs(zoneConfigsData) result.ConsumerGroups = append(result.ConsumerGroups, cg) diff --git a/clients/metadata/metadata_cassandra_test.go b/clients/metadata/metadata_cassandra_test.go index 03a647c3..c3e87d5b 100644 --- a/clients/metadata/metadata_cassandra_test.go +++ b/clients/metadata/metadata_cassandra_test.go @@ -52,7 +52,10 @@ type CassandraSuite struct { TestCluster } -const testPageSize = 2 +const ( + testPageSize = 2 + FlagDisableNackThrottling = "disable_nack_throttling" +) func TestCassandraSuite(t *testing.T) { s := new(CassandraSuite) @@ -1676,6 +1679,7 @@ func assertConsumerGroupsEqual(s *CassandraSuite, expected, got *shared.Consumer s.Equal(expected.GetOwnerEmail(), got.GetOwnerEmail(), "Wrong OwnerEmail") s.Equal(expected.GetDeadLetterQueueDestinationUUID(), got.GetDeadLetterQueueDestinationUUID(), "Wrong DeadLetterQueueDestinationUUID") s.Equal(expected.GetActiveZone(), got.GetActiveZone(), "Wrong ActiveZone") + s.Equal(expected.GetOptions(), got.GetOptions(), "Wrong Options") } func (s *CassandraSuite) TestDeleteConsumerGroupDeletesDLQ() { @@ -1753,6 +1757,9 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() { Visible: common.BoolPtr(false), } + options := make(map[string]bool) + options[FlagDisableNackThrottling] = true + cgName := s.generateName("/foo/bar_consumer") createReq := &shared.CreateConsumerGroupRequest{ @@ -1767,6 +1774,7 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() { IsMultiZone: common.BoolPtr(true), ActiveZone: common.StringPtr("zone1"), ZoneConfigs: []*shared.ConsumerGroupZoneConfig{zoneConfig}, + Options: options, } expectedCG := &shared.ConsumerGroupDescription{ @@ -1782,6 +1790,7 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() { IsMultiZone: common.BoolPtr(createReq.GetIsMultiZone()), ActiveZone: common.StringPtr(createReq.GetActiveZone()), ZoneConfigs: createReq.GetZoneConfigs(), + Options: options, } expectedCGOrig := new(shared.ConsumerGroupDescription) @@ -1827,6 +1836,8 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() { assert.Nil(err, "ReadConsumerGroup failed") assertConsumerGroupsEqual(s, expectedCG, gotCG) + options[FlagDisableNackThrottling] = false + readReq.ConsumerGroupUUID = common.StringPtr(gotCG.GetConsumerGroupUUID()) gotCG, err = s.client.ReadConsumerGroupByUUID(nil, readReq) assert.Nil(err, "ReadConsumerGroupByUUID failed") @@ -1840,6 +1851,7 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() { SkipOlderMessagesSeconds: common.Int32Ptr(100), DelaySeconds: common.Int32Ptr(100), OwnerEmail: common.StringPtr("consumer_test@uber.com"), + Options: options, } if pass%2 == 0 { @@ -1855,6 +1867,7 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() { expectedCG.DelaySeconds = common.Int32Ptr(updateReq.GetDelaySeconds()) expectedCG.OwnerEmail = common.StringPtr(updateReq.GetOwnerEmail()) expectedCG.ActiveZone = common.StringPtr(updateReq.GetActiveZone()) + expectedCG.Options = options gotCG = nil gotCG, err = s.client.UpdateConsumerGroup(nil, updateReq) @@ -2099,6 +2112,9 @@ func (s *CassandraSuite) TestListAllConsumerGroups() { dstUUID := dstInfo.GetDestinationUUID() groupMap := make(map[string]string) + options := make(map[string]bool) + options[FlagDisableNackThrottling] = true + for i := 0; i < 10; i++ { name := s.generateName(fmt.Sprintf("foobar-consumer-%v", i)) var createReq *shared.CreateConsumerGroupRequest @@ -2112,6 +2128,7 @@ func (s *CassandraSuite) TestListAllConsumerGroups() { SkipOlderMessagesSeconds: common.Int32Ptr(60), DelaySeconds: common.Int32Ptr(60), OwnerEmail: common.StringPtr("consumer_test@uber.com"), + Options: options, } _, err = s.client.CreateConsumerGroup(nil, createReq) @@ -2142,10 +2159,10 @@ func (s *CassandraSuite) TestListAllConsumerGroups() { s.Equal(int32(60), gotCG.GetSkipOlderMessagesSeconds()) s.Equal(int32(60), gotCG.GetDelaySeconds()) s.Equal(string("consumer_test@uber.com"), gotCG.GetOwnerEmail()) + s.Equal(options, gotCG.GetOptions()) delete(groupMap, gotCG.GetConsumerGroupName()) } } - } if len(listRes.GetNextPageToken()) == 0 { break diff --git a/clients/metadata/schema/metadata.cql b/clients/metadata/schema/metadata.cql index 11653730..7a12d62d 100644 --- a/clients/metadata/schema/metadata.cql +++ b/clients/metadata/schema/metadata.cql @@ -82,7 +82,8 @@ CREATE TYPE destination ( checksum_option int, is_multi_zone boolean, zone_configs list>, - schema_version int + schema_version int, + options map, ); CREATE TABLE destinations ( @@ -220,6 +221,7 @@ CREATE TYPE consumer_group ( is_multi_zone boolean, active_zone text, zone_configs list>, + options map, ); CREATE TABLE consumer_groups ( diff --git a/clients/metadata/schema/v16/201708090000_options.cql b/clients/metadata/schema/v16/201708090000_options.cql new file mode 100644 index 00000000..64429e1b --- /dev/null +++ b/clients/metadata/schema/v16/201708090000_options.cql @@ -0,0 +1,2 @@ +ALTER TYPE consumer_group ADD options map; +ALTER TYPE destination ADD options map; diff --git a/clients/metadata/schema/v16/manifest.json b/clients/metadata/schema/v16/manifest.json new file mode 100644 index 00000000..f003cb9e --- /dev/null +++ b/clients/metadata/schema/v16/manifest.json @@ -0,0 +1,8 @@ +{ + "CurrVersion": 16, + "MinCompatibleVersion": 8, + "Description": "Add options for cg and destination", + "SchemaUpdateCqlFiles": [ + "201708090000_options.cql" + ] +} diff --git a/cmd/tools/common/lib.go b/cmd/tools/common/lib.go index 5a8c41db..06052457 100644 --- a/cmd/tools/common/lib.go +++ b/cmd/tools/common/lib.go @@ -220,6 +220,10 @@ func SetCommonCommands( Name: "zone_config, zc", Usage: "Zone configs for multi-zone CG. For each zone, specify \"Zone,PreferedActiveZone\"; ex: \"zone1,false\"", }, + cli.BoolFlag{ + Name: toolscommon.FlagDisableNackThrottling, + Usage: "Disable nack throttling for consumer group", + }, }, Action: func(c *cli.Context) { if authEnabled { @@ -366,6 +370,10 @@ func SetCommonCommands( Name: "zone_config, zc", Usage: "Zone configs for multi_zone consumer group. Format for each zone should be \"ZoneName,PreferedActiveZone\". For example: \"zone1,false\"", }, + cli.BoolFlag{ + Name: toolscommon.FlagDisableNackThrottling, + Usage: "Disable nack throttling for consumer group", + }, }, Action: func(c *cli.Context) { if authEnabled { diff --git a/glide.lock b/glide.lock index eae8dd67..b2177719 100644 --- a/glide.lock +++ b/glide.lock @@ -134,7 +134,7 @@ imports: - common/websocket - stream - name: github.com/uber/cherami-thrift - version: 2cb0e2eeb6570800a2dd86544909bf2693f50e7b + version: 3fe5d05728858405b75cd8fc6e1ed7388c5de86e subpackages: - .generated/go/admin - .generated/go/cherami diff --git a/services/frontendhost/frontend.go b/services/frontendhost/frontend.go index 8b90c5e6..21bc8ead 100644 --- a/services/frontendhost/frontend.go +++ b/services/frontendhost/frontend.go @@ -391,6 +391,7 @@ func convertCreateCGRequestToInternal(createRequest *c.CreateConsumerGroupReques internalCreateRequest.ZoneConfigs = append(internalCreateRequest.ZoneConfigs, convertCGZoneConfigToInternal(cgZoneCfg)) } } + internalCreateRequest.Options = createRequest.GetOptions() return internalCreateRequest, nil } @@ -427,6 +428,10 @@ func convertUpdateCGRequestToInternal(updateRequest *c.UpdateConsumerGroupReques internalUpdateRequest.ZoneConfigs = append(internalUpdateRequest.ZoneConfigs, convertCGZoneConfigToInternal(cgZoneCfg)) } } + if updateRequest.IsSetOptions() { + internalUpdateRequest.Options = updateRequest.GetOptions() + + } return internalUpdateRequest } @@ -492,6 +497,7 @@ func (h *Frontend) convertConsumerGroupFromInternal(ctx thrift.Context, _cgDesc } cgDesc.ZoneConfigs.ActiveZone = common.StringPtr(_cgDesc.GetActiveZone()) } + cgDesc.Options = _cgDesc.GetOptions() return } diff --git a/services/frontendhost/frontend_test.go b/services/frontendhost/frontend_test.go index f7b77c22..74d9341b 100644 --- a/services/frontendhost/frontend_test.go +++ b/services/frontendhost/frontend_test.go @@ -35,6 +35,7 @@ import ( mockcommon "github.com/uber/cherami-server/test/mocks/common" mockctrl "github.com/uber/cherami-server/test/mocks/controllerhost" mockmeta "github.com/uber/cherami-server/test/mocks/metadata" + toolscommon "github.com/uber/cherami-server/tools/common" c "github.com/uber/cherami-thrift/.generated/go/cherami" "github.com/uber/cherami-thrift/.generated/go/controller" "github.com/uber/cherami-thrift/.generated/go/shared" @@ -122,6 +123,7 @@ func cgCreateRequestToDesc(createRequest *c.CreateConsumerGroupRequest) *shared. cgDesc.ZoneConfigs = append(cgDesc.ZoneConfigs, convertCGZoneConfigToInternal(cgZoneCfg)) } } + cgDesc.Options = createRequest.GetOptions() return cgDesc } @@ -136,6 +138,7 @@ func cgUpdateRequestToDesc(updateRequest *c.UpdateConsumerGroupRequest) *shared. cgDesc.SkipOlderMessagesSeconds = common.Int32Ptr(updateRequest.GetSkipOlderMessagesInSeconds()) cgDesc.DelaySeconds = common.Int32Ptr(updateRequest.GetDelaySeconds()) cgDesc.OwnerEmail = common.StringPtr(updateRequest.GetOwnerEmail()) + cgDesc.Options = updateRequest.GetOptions() return cgDesc } @@ -825,6 +828,9 @@ func (s *FrontendHostSuite) TestFrontendHostCreateConsumerGroup() { testCG := s.generateKey("/bar/CGName") frontendHost, ctx := s.utilGetContextAndFrontend() + options := make(map[string]bool) + options[toolscommon.FlagDisableNackThrottling] = true + req := c.NewCreateConsumerGroupRequest() req.DestinationPath = common.StringPtr(testPath) req.ConsumerGroupName = common.StringPtr(testCG) @@ -838,6 +844,8 @@ func (s *FrontendHostSuite) TestFrontendHostCreateConsumerGroup() { }, }, } + req.Options = options + cgDesc := cgCreateRequestToDesc(req) frontendHost.writeCacheDestinationPathForUUID(destinationUUID(cgDesc.GetDestinationUUID()), testPath) @@ -858,6 +866,7 @@ func (s *FrontendHostSuite) TestFrontendHostCreateConsumerGroup() { s.Equal(createReq.GetIsMultiZone(), req.GetIsMultiZone()) s.Equal(len(createReq.GetZoneConfigs()), len(req.GetZoneConfigs().GetConfigs())) s.Equal(createReq.GetZoneConfigs()[0].GetVisible(), req.GetZoneConfigs().GetConfigs()[0].GetVisible()) + s.Equal(createReq.GetOptions(), req.GetOptions()) }) cgd, err := frontendHost.CreateConsumerGroup(ctx, req) @@ -1124,6 +1133,9 @@ func (s *FrontendHostSuite) TestFrontendHostUpdateConsumerGroup() { testPath := s.generateKey("/foo/bax") frontendHost, ctx := s.utilGetContextAndFrontend() + options := make(map[string]bool) + options[toolscommon.FlagDisableNackThrottling] = true + req := new(c.UpdateConsumerGroupRequest) req.DestinationPath = common.StringPtr(testPath) req.ConsumerGroupName = common.StringPtr(s.generateKey("/CG/Name")) @@ -1133,6 +1145,8 @@ func (s *FrontendHostSuite) TestFrontendHostUpdateConsumerGroup() { req.DelaySeconds = common.Int32Ptr(5) req.Status = c.ConsumerGroupStatusPtr(c.ConsumerGroupStatus_DISABLED) req.OwnerEmail = common.StringPtr("consumer_front_test@uber.com") + req.Options = options + newCGDesc := cgUpdateRequestToDesc(req) frontendHost.writeCacheDestinationPathForUUID(destinationUUID(newCGDesc.GetDestinationUUID()), testPath) s.mockController.On("UpdateConsumerGroup", mock.Anything, mock.Anything).Return(newCGDesc, nil) @@ -1149,6 +1163,7 @@ func (s *FrontendHostSuite) TestFrontendHostUpdateConsumerGroup() { s.Equal(cgd.DelaySeconds, req.DelaySeconds) s.Equal(cgd.Status, req.Status) s.Equal(cgd.OwnerEmail, req.OwnerEmail) + s.Equal(cgd.Options, req.Options) } } diff --git a/services/outputhost/consconnection.go b/services/outputhost/consconnection.go index 67f5108e..84a523cc 100644 --- a/services/outputhost/consconnection.go +++ b/services/outputhost/consconnection.go @@ -175,7 +175,7 @@ func (conn *consConnection) sendCreditsToWritePump(localCredits *int32) { // readCreditsStream is the stream which keeps reading the credits from the client func (conn *consConnection) readCreditsStream() { - // start ticker to send accumulated credits to the write pump + // start ticker to send accumulated credits to the write pump // This is needed because imagine we are not able to send credits // received from the client immediately. In that case we need to // accumulate those credits and make sure we send those out to the diff --git a/services/outputhost/messagecache.go b/services/outputhost/messagecache.go index 35d4a549..b652ff08 100644 --- a/services/outputhost/messagecache.go +++ b/services/outputhost/messagecache.go @@ -32,6 +32,7 @@ import ( "github.com/uber/cherami-server/common" "github.com/uber/cherami-server/common/metrics" "github.com/uber/cherami-server/services/outputhost/load" + toolscommon "github.com/uber/cherami-server/tools/common" "github.com/uber/cherami-thrift/.generated/go/cherami" "github.com/uber/cherami-thrift/.generated/go/shared" ) @@ -959,7 +960,7 @@ func (msgCache *cgMsgCache) handleNack(ackID timestampedAckID, lclLg bark.Logger // updateConn is the utility routine to notify the connection // to either throttle up or throttle down. // If we get a NACK on this connection, we ask the connection to -// slow down (throttle up) +// slow down (throttle up), except it's disabled explicitly // If we get an ACK on this connection and we asked it to throttle // earlier, we ask the connection to stop throttling (throttle down!). func (msgCache *cgMsgCache) updateConn(connID int, event msgEvent, badConns map[int]int) { @@ -973,6 +974,14 @@ func (msgCache *cgMsgCache) updateConn(connID int, event msgEvent, badConns map[ delete(badConns, connID) } } else { + if event == eventNACK { + // if NACK throttling is disabled for this cg, then no-op + throttlingDisabled, ok := msgCache.cgCache.cachedCGDesc.Options[toolscommon.FlagDisableNackThrottling] + if ok && throttlingDisabled { // no-op + return + } + } + // make sure to throttle the appropriate connection and update the // bad connections map to keep track of the number of nacks msgCache.notifier.Notify(connID, NotifyMsg{notifyType: ThrottleUp}) diff --git a/services/replicator/metadataReconciler.go b/services/replicator/metadataReconciler.go index 1d038a7b..a8dec244 100644 --- a/services/replicator/metadataReconciler.go +++ b/services/replicator/metadataReconciler.go @@ -22,6 +22,7 @@ package replicator import ( "errors" + "reflect" "strings" "sync/atomic" "time" @@ -358,6 +359,7 @@ func (r *metadataReconciler) reconcileCg(localCgs []*shared.ConsumerGroupDescrip IsMultiZone: common.BoolPtr(remoteCg.GetIsMultiZone()), ActiveZone: common.StringPtr(remoteCg.GetActiveZone()), ZoneConfigs: remoteCg.GetZoneConfigs(), + Options: remoteCg.Options, }, ConsumerGroupUUID: common.StringPtr(remoteCg.GetConsumerGroupUUID()), } @@ -418,6 +420,10 @@ func (r *metadataReconciler) compareAndUpdateCg(remoteCg *shared.ConsumerGroupDe updateRequest.ZoneConfigs = remoteCg.GetZoneConfigs() cgUpdated = true } + if !reflect.DeepEqual(localCg.GetOptions(), remoteCg.GetOptions()) { + updateRequest.Options = remoteCg.GetOptions() + cgUpdated = true + } if cgUpdated { logger.Info(`Found cg gets updated in remote but not in local`) diff --git a/tools/common/lib.go b/tools/common/lib.go index debfaeaf..016c71dc 100644 --- a/tools/common/lib.go +++ b/tools/common/lib.go @@ -112,6 +112,9 @@ const ( // Kafka prefix is a required prefix for all Kafka type destinations and consumer groups kafkaPrefix = `/kafka_` + + // FlagDisableNackThrottling is the flag string for disabling Nack throttling + FlagDisableNackThrottling = "disable_nack_throttling" ) const ( @@ -502,6 +505,12 @@ func CreateConsumerGroupSecure( zoneConfigs := getCgZoneConfigs(c, mClient, cliHelper, path) isMultiZone := len(zoneConfigs.GetConfigs()) > 0 + options := make(map[string]bool) + disableNackThrottling := string(c.String(FlagDisableNackThrottling)) + if disableNackThrottling == "true" { + options[FlagDisableNackThrottling] = true + } + desc, err := cClient.CreateConsumerGroup(&cherami.CreateConsumerGroupRequest{ DestinationPath: &path, ConsumerGroupName: &name, @@ -513,6 +522,7 @@ func CreateConsumerGroupSecure( OwnerEmail: &ownerEmail, IsMultiZone: &isMultiZone, ZoneConfigs: &zoneConfigs, + Options: options, }) ExitIfError(err) @@ -626,6 +636,7 @@ func UpdateConsumerGroupSecure( OwnerEmail: getIfSetString(c, `owner_email`, &setCount), ActiveZone: getIfSetString(c, `active_zone`, &setCount), ZoneConfigs: getIfSetCgZoneConfig(c, mClient, cliHelper, path, &setCount), + Options: getIfSetOptions(c, &setCount), } if c.IsSet(`status`) { @@ -1081,6 +1092,7 @@ type cgJSONOutputFields struct { IsMultiZone bool `json:"is_multi_zone"` ZoneConfigs []*shared.ConsumerGroupZoneConfig `json:"zone_Configs"` ActiveZone string `json:"active_zone"` + Options map[string]bool `json:"options"` } func printCG(cg *shared.ConsumerGroupDescription) { @@ -1099,6 +1111,7 @@ func printCG(cg *shared.ConsumerGroupDescription) { IsMultiZone: cg.GetIsMultiZone(), ZoneConfigs: cg.GetZoneConfigs(), ActiveZone: cg.GetActiveZone(), + Options: cg.GetOptions(), } outputStr, _ := json.Marshal(output) fmt.Fprintln(os.Stdout, string(outputStr)) @@ -1630,6 +1643,21 @@ func getIfSetCgZoneConfig(c *cli.Context, mClient mcli.Client, cliHelper common. return } +func getIfSetOptions(c *cli.Context, setCount *int) (options map[string]bool) { + if c.IsSet(FlagDisableNackThrottling) { + disableNackThrottling := false + if string(c.String(FlagDisableNackThrottling)) == "true" { + disableNackThrottling = true + } + + options = make(map[string]bool) + options[FlagDisableNackThrottling] = disableNackThrottling + *setCount++ + return options + } + return +} + type storeClientCache struct { mClient mcli.Client cache map[string]*storehost.StoreClientImpl