diff --git a/clients/metadata/metadata_cassandra.go b/clients/metadata/metadata_cassandra.go index 30ed2750..1cee612a 100644 --- a/clients/metadata/metadata_cassandra.go +++ b/clients/metadata/metadata_cassandra.go @@ -23,6 +23,7 @@ package metadata import ( "encoding/json" "fmt" + "reflect" "regexp" "strings" "time" @@ -162,6 +163,7 @@ const ( columnVisible = "visible" columnZone = "zone" columnZoneConfigs = "zone_configs" + columnOptions = "options" ) const userOperationTTL = "2592000" // 30 days @@ -589,6 +591,7 @@ func getUtilConsumerGroupDescription() *shared.ConsumerGroupDescription { result.IsMultiZone = common.BoolPtr(false) result.ActiveZone = common.StringPtr("") result.ZoneConfigs = shared.ConsumerGroupDescription_ZoneConfigs_DEFAULT + result.Options = make(map[string]string, 0) return result } @@ -1183,7 +1186,8 @@ const ( columnOwnerEmail + `: ?, ` + columnIsMultiZone + `: ?, ` + columnActiveZone + `: ?, ` + - columnZoneConfigs + `: ? }` + columnZoneConfigs + `: ?, ` + + columnOptions + `: ? }` sqlInsertCGByUUID = `INSERT INTO ` + tableConsumerGroups + `(` + @@ -1214,7 +1218,8 @@ const ( columnConsumerGroup + `.` + columnOwnerEmail + "," + columnConsumerGroup + `.` + columnIsMultiZone + "," + columnConsumerGroup + `.` + columnActiveZone + "," + - columnConsumerGroup + `.` + columnZoneConfigs + + columnConsumerGroup + `.` + columnZoneConfigs + "," + + columnConsumerGroup + `.` + columnOptions + ` FROM ` + tableConsumerGroupsByName + ` WHERE ` + columnDestinationUUID + `=? and ` + columnName + `=?` @@ -1232,7 +1237,8 @@ const ( columnConsumerGroup + `.` + columnOwnerEmail + "," + columnConsumerGroup + `.` + columnIsMultiZone + "," + columnConsumerGroup + `.` + columnActiveZone + "," + - columnConsumerGroup + `.` + columnZoneConfigs + + columnConsumerGroup + `.` + columnZoneConfigs + "," + + columnConsumerGroup + `.` + columnOptions + ` FROM ` + tableConsumerGroups sqlGetCGByUUID = sqlGetCG + ` WHERE ` + columnUUID + `=?` @@ -1251,7 +1257,8 @@ const ( columnConsumerGroup + `.` + columnOwnerEmail + "," + columnConsumerGroup + `.` + columnIsMultiZone + "," + columnConsumerGroup + `.` + columnActiveZone + "," + - columnConsumerGroup + `.` + columnZoneConfigs + + columnConsumerGroup + `.` + columnZoneConfigs + "," + + columnConsumerGroup + `.` + columnOptions + ` FROM ` + tableConsumerGroupsByName + ` WHERE ` + columnDestinationUUID + `=?` @@ -1341,7 +1348,8 @@ func (s *CassandraMetadataService) CreateConsumerGroupUUID(ctx thrift.Context, r createRequest.GetOwnerEmail(), createRequest.GetIsMultiZone(), createRequest.GetActiveZone(), - marshalCgZoneConfigs(createRequest.GetZoneConfigs())).Exec() + marshalCgZoneConfigs(createRequest.GetZoneConfigs()), + createRequest.GetOptions()).Exec() if err != nil { return nil, &shared.InternalServiceError{ @@ -1367,7 +1375,8 @@ func (s *CassandraMetadataService) CreateConsumerGroupUUID(ctx thrift.Context, r createRequest.GetOwnerEmail(), createRequest.GetIsMultiZone(), createRequest.GetActiveZone(), - marshalCgZoneConfigs(createRequest.GetZoneConfigs())) + marshalCgZoneConfigs(createRequest.GetZoneConfigs()), + createRequest.GetOptions()) previous := make(map[string]interface{}) // We actually throw away the old values below, but passing nil causes a panic @@ -1412,6 +1421,7 @@ func (s *CassandraMetadataService) CreateConsumerGroupUUID(ctx thrift.Context, r IsMultiZone: common.BoolPtr(createRequest.GetIsMultiZone()), ActiveZone: common.StringPtr(createRequest.GetActiveZone()), ZoneConfigs: createRequest.GetZoneConfigs(), + Options: createRequest.GetOptions(), }, nil } @@ -1468,7 +1478,8 @@ func (s *CassandraMetadataService) readConsumerGroupByDstUUID(dstUUID string, cg result.OwnerEmail, result.IsMultiZone, result.ActiveZone, - &zoneConfigsData); err != nil { + &zoneConfigsData, + &result.Options); err != nil { if err == gocql.ErrNotFound { return nil, &shared.EntityNotExistsError{ Message: fmt.Sprintf("ConsumerGroup %s of destinationUUID %s does not exist", cgName, dstUUID), @@ -1539,7 +1550,8 @@ func (s *CassandraMetadataService) ReadConsumerGroupByUUID(ctx thrift.Context, r result.OwnerEmail, result.IsMultiZone, result.ActiveZone, - &zoneConfigsData); err != nil { + &zoneConfigsData, + &result.Options); err != nil { if err == gocql.ErrNotFound { return nil, &shared.EntityNotExistsError{ Message: fmt.Sprintf("ConsumerGroup %s does not exist", *request.ConsumerGroupUUID), @@ -1600,6 +1612,11 @@ func updateCGDescIfChanged(req *shared.UpdateConsumerGroupRequest, cgDesc *share cgDesc.IsMultiZone = common.BoolPtr(true) } + if req.IsSetOptions() && !reflect.DeepEqual(req.GetOptions(), cgDesc.GetOptions()) { + isChanged = true + cgDesc.Options = req.Options + } + return isChanged } @@ -1648,6 +1665,7 @@ func (s *CassandraMetadataService) UpdateConsumerGroup(ctx thrift.Context, reque newCG.GetIsMultiZone(), newCG.GetActiveZone(), marshalCgZoneConfigs(newCG.GetZoneConfigs()), + newCG.GetOptions(), // Query columns newCG.GetConsumerGroupUUID()) @@ -1668,6 +1686,7 @@ func (s *CassandraMetadataService) UpdateConsumerGroup(ctx thrift.Context, reque newCG.GetIsMultiZone(), newCG.GetActiveZone(), marshalCgZoneConfigs(newCG.GetZoneConfigs()), + newCG.GetOptions(), // Query columns newCG.GetDestinationUUID(), newCG.GetConsumerGroupName()) @@ -1788,6 +1807,7 @@ func (s *CassandraMetadataService) DeleteConsumerGroup(ctx thrift.Context, reque existingCG.GetIsMultiZone(), existingCG.GetActiveZone(), marshalCgZoneConfigs(existingCG.GetZoneConfigs()), + existingCG.GetOptions(), defaultDeleteTTLSeconds) batch.Query(sqlDeleteCGByName, @@ -1891,7 +1911,8 @@ func (s *CassandraMetadataService) ListConsumerGroups(ctx thrift.Context, reques cg.OwnerEmail, cg.IsMultiZone, cg.ActiveZone, - &zoneConfigsData) { + &zoneConfigsData, + &cg.Options) { // Get a new item within limit if cg.GetStatus() == shared.ConsumerGroupStatus_DELETED { @@ -1954,7 +1975,8 @@ func (s *CassandraMetadataService) ListAllConsumerGroups(ctx thrift.Context, req cg.OwnerEmail, cg.IsMultiZone, cg.ActiveZone, - &zoneConfigsData) { + &zoneConfigsData, + &cg.Options) { cg.ZoneConfigs = unmarshalCgZoneConfigs(zoneConfigsData) result.ConsumerGroups = append(result.ConsumerGroups, cg) diff --git a/clients/metadata/metadata_cassandra_test.go b/clients/metadata/metadata_cassandra_test.go index 03a647c3..bb4e345e 100644 --- a/clients/metadata/metadata_cassandra_test.go +++ b/clients/metadata/metadata_cassandra_test.go @@ -52,7 +52,9 @@ type CassandraSuite struct { TestCluster } -const testPageSize = 2 +const ( + testPageSize = 2 +) func TestCassandraSuite(t *testing.T) { s := new(CassandraSuite) @@ -1676,6 +1678,7 @@ func assertConsumerGroupsEqual(s *CassandraSuite, expected, got *shared.Consumer s.Equal(expected.GetOwnerEmail(), got.GetOwnerEmail(), "Wrong OwnerEmail") s.Equal(expected.GetDeadLetterQueueDestinationUUID(), got.GetDeadLetterQueueDestinationUUID(), "Wrong DeadLetterQueueDestinationUUID") s.Equal(expected.GetActiveZone(), got.GetActiveZone(), "Wrong ActiveZone") + s.Equal(expected.GetOptions(), got.GetOptions(), "Wrong Options") } func (s *CassandraSuite) TestDeleteConsumerGroupDeletesDLQ() { @@ -1753,6 +1756,9 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() { Visible: common.BoolPtr(false), } + options := make(map[string]string) + options[common.FlagDisableNackThrottling] = "true" + cgName := s.generateName("/foo/bar_consumer") createReq := &shared.CreateConsumerGroupRequest{ @@ -1767,6 +1773,7 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() { IsMultiZone: common.BoolPtr(true), ActiveZone: common.StringPtr("zone1"), ZoneConfigs: []*shared.ConsumerGroupZoneConfig{zoneConfig}, + Options: options, } expectedCG := &shared.ConsumerGroupDescription{ @@ -1782,6 +1789,7 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() { IsMultiZone: common.BoolPtr(createReq.GetIsMultiZone()), ActiveZone: common.StringPtr(createReq.GetActiveZone()), ZoneConfigs: createReq.GetZoneConfigs(), + Options: options, } expectedCGOrig := new(shared.ConsumerGroupDescription) @@ -1827,6 +1835,8 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() { assert.Nil(err, "ReadConsumerGroup failed") assertConsumerGroupsEqual(s, expectedCG, gotCG) + options[common.FlagDisableNackThrottling] = "false" + readReq.ConsumerGroupUUID = common.StringPtr(gotCG.GetConsumerGroupUUID()) gotCG, err = s.client.ReadConsumerGroupByUUID(nil, readReq) assert.Nil(err, "ReadConsumerGroupByUUID failed") @@ -1840,6 +1850,7 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() { SkipOlderMessagesSeconds: common.Int32Ptr(100), DelaySeconds: common.Int32Ptr(100), OwnerEmail: common.StringPtr("consumer_test@uber.com"), + Options: options, } if pass%2 == 0 { @@ -1855,6 +1866,7 @@ func (s *CassandraSuite) TestConsumerGroupCRUD() { expectedCG.DelaySeconds = common.Int32Ptr(updateReq.GetDelaySeconds()) expectedCG.OwnerEmail = common.StringPtr(updateReq.GetOwnerEmail()) expectedCG.ActiveZone = common.StringPtr(updateReq.GetActiveZone()) + expectedCG.Options = options gotCG = nil gotCG, err = s.client.UpdateConsumerGroup(nil, updateReq) @@ -2099,6 +2111,9 @@ func (s *CassandraSuite) TestListAllConsumerGroups() { dstUUID := dstInfo.GetDestinationUUID() groupMap := make(map[string]string) + options := make(map[string]string) + options[common.FlagDisableNackThrottling] = "true" + for i := 0; i < 10; i++ { name := s.generateName(fmt.Sprintf("foobar-consumer-%v", i)) var createReq *shared.CreateConsumerGroupRequest @@ -2112,6 +2127,7 @@ func (s *CassandraSuite) TestListAllConsumerGroups() { SkipOlderMessagesSeconds: common.Int32Ptr(60), DelaySeconds: common.Int32Ptr(60), OwnerEmail: common.StringPtr("consumer_test@uber.com"), + Options: options, } _, err = s.client.CreateConsumerGroup(nil, createReq) @@ -2142,10 +2158,10 @@ func (s *CassandraSuite) TestListAllConsumerGroups() { s.Equal(int32(60), gotCG.GetSkipOlderMessagesSeconds()) s.Equal(int32(60), gotCG.GetDelaySeconds()) s.Equal(string("consumer_test@uber.com"), gotCG.GetOwnerEmail()) + s.Equal(options, gotCG.GetOptions()) delete(groupMap, gotCG.GetConsumerGroupName()) } } - } if len(listRes.GetNextPageToken()) == 0 { break diff --git a/clients/metadata/schema/metadata.cql b/clients/metadata/schema/metadata.cql index 11653730..01939c63 100644 --- a/clients/metadata/schema/metadata.cql +++ b/clients/metadata/schema/metadata.cql @@ -82,7 +82,8 @@ CREATE TYPE destination ( checksum_option int, is_multi_zone boolean, zone_configs list>, - schema_version int + schema_version int, + options map, ); CREATE TABLE destinations ( @@ -220,6 +221,7 @@ CREATE TYPE consumer_group ( is_multi_zone boolean, active_zone text, zone_configs list>, + options map, ); CREATE TABLE consumer_groups ( diff --git a/clients/metadata/schema/v16/201708090000_options.cql b/clients/metadata/schema/v16/201708090000_options.cql new file mode 100644 index 00000000..fa0f82a1 --- /dev/null +++ b/clients/metadata/schema/v16/201708090000_options.cql @@ -0,0 +1,2 @@ +ALTER TYPE consumer_group ADD options map; +ALTER TYPE destination ADD options map; diff --git a/clients/metadata/schema/v16/manifest.json b/clients/metadata/schema/v16/manifest.json new file mode 100644 index 00000000..f003cb9e --- /dev/null +++ b/clients/metadata/schema/v16/manifest.json @@ -0,0 +1,8 @@ +{ + "CurrVersion": 16, + "MinCompatibleVersion": 8, + "Description": "Add options for cg and destination", + "SchemaUpdateCqlFiles": [ + "201708090000_options.cql" + ] +} diff --git a/cmd/tools/common/lib.go b/cmd/tools/common/lib.go index 5a8c41db..ffa86050 100644 --- a/cmd/tools/common/lib.go +++ b/cmd/tools/common/lib.go @@ -220,6 +220,10 @@ func SetCommonCommands( Name: "zone_config, zc", Usage: "Zone configs for multi-zone CG. For each zone, specify \"Zone,PreferedActiveZone\"; ex: \"zone1,false\"", }, + cli.BoolFlag{ + Name: common.FlagDisableNackThrottling, + Usage: "Disable nack throttling for consumer group", + }, }, Action: func(c *cli.Context) { if authEnabled { @@ -366,6 +370,10 @@ func SetCommonCommands( Name: "zone_config, zc", Usage: "Zone configs for multi_zone consumer group. Format for each zone should be \"ZoneName,PreferedActiveZone\". For example: \"zone1,false\"", }, + cli.BoolFlag{ + Name: common.FlagDisableNackThrottling, + Usage: "Disable nack throttling for consumer group", + }, }, Action: func(c *cli.Context) { if authEnabled { diff --git a/common/constants.go b/common/constants.go index ab655357..f91f3122 100644 --- a/common/constants.go +++ b/common/constants.go @@ -70,3 +70,8 @@ const ( // KafkaPhantomExtentStorehost is placeholder/phantom storehost uuid used for Kafka extents KafkaPhantomExtentStorehost = "00000000-0000-0000-0000-000000000000" ) + +const ( + // FlagDisableNackThrottling is the flag string for disabling Nack throttling + FlagDisableNackThrottling = "disable_nack_throttling" +) diff --git a/glide.lock b/glide.lock index eae8dd67..b6032e87 100644 --- a/glide.lock +++ b/glide.lock @@ -134,7 +134,7 @@ imports: - common/websocket - stream - name: github.com/uber/cherami-thrift - version: 2cb0e2eeb6570800a2dd86544909bf2693f50e7b + version: 98e566b96cbe7142446508e5991a11bc394ab343 subpackages: - .generated/go/admin - .generated/go/cherami diff --git a/services/frontendhost/frontend.go b/services/frontendhost/frontend.go index 8b90c5e6..21bc8ead 100644 --- a/services/frontendhost/frontend.go +++ b/services/frontendhost/frontend.go @@ -391,6 +391,7 @@ func convertCreateCGRequestToInternal(createRequest *c.CreateConsumerGroupReques internalCreateRequest.ZoneConfigs = append(internalCreateRequest.ZoneConfigs, convertCGZoneConfigToInternal(cgZoneCfg)) } } + internalCreateRequest.Options = createRequest.GetOptions() return internalCreateRequest, nil } @@ -427,6 +428,10 @@ func convertUpdateCGRequestToInternal(updateRequest *c.UpdateConsumerGroupReques internalUpdateRequest.ZoneConfigs = append(internalUpdateRequest.ZoneConfigs, convertCGZoneConfigToInternal(cgZoneCfg)) } } + if updateRequest.IsSetOptions() { + internalUpdateRequest.Options = updateRequest.GetOptions() + + } return internalUpdateRequest } @@ -492,6 +497,7 @@ func (h *Frontend) convertConsumerGroupFromInternal(ctx thrift.Context, _cgDesc } cgDesc.ZoneConfigs.ActiveZone = common.StringPtr(_cgDesc.GetActiveZone()) } + cgDesc.Options = _cgDesc.GetOptions() return } diff --git a/services/frontendhost/frontend_test.go b/services/frontendhost/frontend_test.go index f7b77c22..6546f135 100644 --- a/services/frontendhost/frontend_test.go +++ b/services/frontendhost/frontend_test.go @@ -122,6 +122,7 @@ func cgCreateRequestToDesc(createRequest *c.CreateConsumerGroupRequest) *shared. cgDesc.ZoneConfigs = append(cgDesc.ZoneConfigs, convertCGZoneConfigToInternal(cgZoneCfg)) } } + cgDesc.Options = createRequest.GetOptions() return cgDesc } @@ -136,6 +137,7 @@ func cgUpdateRequestToDesc(updateRequest *c.UpdateConsumerGroupRequest) *shared. cgDesc.SkipOlderMessagesSeconds = common.Int32Ptr(updateRequest.GetSkipOlderMessagesInSeconds()) cgDesc.DelaySeconds = common.Int32Ptr(updateRequest.GetDelaySeconds()) cgDesc.OwnerEmail = common.StringPtr(updateRequest.GetOwnerEmail()) + cgDesc.Options = updateRequest.GetOptions() return cgDesc } @@ -825,6 +827,9 @@ func (s *FrontendHostSuite) TestFrontendHostCreateConsumerGroup() { testCG := s.generateKey("/bar/CGName") frontendHost, ctx := s.utilGetContextAndFrontend() + options := make(map[string]string) + options[common.FlagDisableNackThrottling] = "true" + req := c.NewCreateConsumerGroupRequest() req.DestinationPath = common.StringPtr(testPath) req.ConsumerGroupName = common.StringPtr(testCG) @@ -838,6 +843,8 @@ func (s *FrontendHostSuite) TestFrontendHostCreateConsumerGroup() { }, }, } + req.Options = options + cgDesc := cgCreateRequestToDesc(req) frontendHost.writeCacheDestinationPathForUUID(destinationUUID(cgDesc.GetDestinationUUID()), testPath) @@ -858,6 +865,7 @@ func (s *FrontendHostSuite) TestFrontendHostCreateConsumerGroup() { s.Equal(createReq.GetIsMultiZone(), req.GetIsMultiZone()) s.Equal(len(createReq.GetZoneConfigs()), len(req.GetZoneConfigs().GetConfigs())) s.Equal(createReq.GetZoneConfigs()[0].GetVisible(), req.GetZoneConfigs().GetConfigs()[0].GetVisible()) + s.Equal(createReq.GetOptions(), req.GetOptions()) }) cgd, err := frontendHost.CreateConsumerGroup(ctx, req) @@ -1124,6 +1132,9 @@ func (s *FrontendHostSuite) TestFrontendHostUpdateConsumerGroup() { testPath := s.generateKey("/foo/bax") frontendHost, ctx := s.utilGetContextAndFrontend() + options := make(map[string]string) + options[common.FlagDisableNackThrottling] = "true" + req := new(c.UpdateConsumerGroupRequest) req.DestinationPath = common.StringPtr(testPath) req.ConsumerGroupName = common.StringPtr(s.generateKey("/CG/Name")) @@ -1133,6 +1144,8 @@ func (s *FrontendHostSuite) TestFrontendHostUpdateConsumerGroup() { req.DelaySeconds = common.Int32Ptr(5) req.Status = c.ConsumerGroupStatusPtr(c.ConsumerGroupStatus_DISABLED) req.OwnerEmail = common.StringPtr("consumer_front_test@uber.com") + req.Options = options + newCGDesc := cgUpdateRequestToDesc(req) frontendHost.writeCacheDestinationPathForUUID(destinationUUID(newCGDesc.GetDestinationUUID()), testPath) s.mockController.On("UpdateConsumerGroup", mock.Anything, mock.Anything).Return(newCGDesc, nil) @@ -1149,6 +1162,7 @@ func (s *FrontendHostSuite) TestFrontendHostUpdateConsumerGroup() { s.Equal(cgd.DelaySeconds, req.DelaySeconds) s.Equal(cgd.Status, req.Status) s.Equal(cgd.OwnerEmail, req.OwnerEmail) + s.Equal(cgd.Options, req.Options) } } diff --git a/services/outputhost/consconnection.go b/services/outputhost/consconnection.go index 67f5108e..84a523cc 100644 --- a/services/outputhost/consconnection.go +++ b/services/outputhost/consconnection.go @@ -175,7 +175,7 @@ func (conn *consConnection) sendCreditsToWritePump(localCredits *int32) { // readCreditsStream is the stream which keeps reading the credits from the client func (conn *consConnection) readCreditsStream() { - // start ticker to send accumulated credits to the write pump + // start ticker to send accumulated credits to the write pump // This is needed because imagine we are not able to send credits // received from the client immediately. In that case we need to // accumulate those credits and make sure we send those out to the diff --git a/services/outputhost/messagecache.go b/services/outputhost/messagecache.go index 35d4a549..a2b84cff 100644 --- a/services/outputhost/messagecache.go +++ b/services/outputhost/messagecache.go @@ -307,7 +307,7 @@ func getEventString(event msgEvent) string { } } -func (msgCache *cgMsgCache) utilHandleDeliveredMsg(cMsg cacheMsg, badConns map[int]int) { +func (msgCache *cgMsgCache) utilHandleDeliveredMsg(cMsg cacheMsg) { msgCache.startTimer(eventCache) msg := cMsg.msg @@ -379,7 +379,7 @@ func (msgCache *cgMsgCache) utilHandleRedeliveredMsg(cMsg cacheMsg) { } } -func (msgCache *cgMsgCache) utilHandleRedeliveryTicker(badConns map[int]int) { +func (msgCache *cgMsgCache) utilHandleRedeliveryTicker() { msgCache.startTimer(eventTimer) var redeliveries int64 @@ -432,7 +432,7 @@ func (msgCache *cgMsgCache) utilHandleRedeliveryTicker(badConns map[int]int) { // 2. We are not stalled if cm.n > msgCache.GetMaxDeliveryCount() && !stalled && cm.dlqInhibit <= 0 { msgCache.changeState(ackID, stateDLQDelivered, nil, eventTimer) - msgCache.publishToDLQ(cm, badConns) + msgCache.publishToDLQ(cm) } else { select { case msgCache.msgsRedeliveryCh <- cm.msg: @@ -477,7 +477,7 @@ func (msgCache *cgMsgCache) utilHandleRedeliveryTicker(badConns map[int]int) { } } -func (msgCache *cgMsgCache) utilHandleNackMsg(ackID timestampedAckID, badConns map[int]int) { +func (msgCache *cgMsgCache) utilHandleNackMsg(ackID timestampedAckID) { lclLg := msgCache.lclLg msgCache.startTimer(eventNACK) @@ -488,7 +488,7 @@ func (msgCache *cgMsgCache) utilHandleNackMsg(ackID timestampedAckID, badConns m i := int64(1) // we always handle at least one ack, even if the channel is empty afterwards nackDrain: for ; i < ackChannelSize*2; i++ { // Empty the channel twice, at most, to prevent starvation - msgCache.handleNack(ackID, lclLg, badConns) + msgCache.handleNack(ackID, lclLg) select { case ackID = <-msgCache.nackMsgCh: continue nackDrain @@ -500,7 +500,7 @@ nackDrain: msgCache.consumerM3Client.AddCounter(metrics.ConsConnectionScope, metrics.OutputhostCGMessageSentNAck, i) } -func (msgCache *cgMsgCache) utilHandleAckMsg(ackID timestampedAckID, badConns map[int]int) { +func (msgCache *cgMsgCache) utilHandleAckMsg(ackID timestampedAckID) { lclLg := msgCache.lclLg msgCache.startTimer(eventACK) @@ -511,7 +511,7 @@ func (msgCache *cgMsgCache) utilHandleAckMsg(ackID timestampedAckID, badConns ma i := int64(1) // we always handle at least one ack, even if the channel is empty afterwards ackDrain: for ; i < ackChannelSize*2; i++ { // Empty the channel twice, at most, to prevent starvation - msgCache.handleAck(ackID, lclLg, badConns) + msgCache.handleAck(ackID, lclLg) select { case ackID = <-msgCache.ackMsgCh: continue ackDrain @@ -739,7 +739,6 @@ func (msgCache *cgMsgCache) start() { func (msgCache *cgMsgCache) manageMessageDeliveryCache() { msgCache.blockCheckingTimer = common.NewTimer(blockCheckingTimeout) msgCache.dlqPublishCh = msgCache.dlq.getPublishCh() - badConns := make(map[int]int) // this is the map of all bad connections, i.e, connections which get Nacks and timeouts lastPumpHealthLog := common.UnixNanoTime(0) var creditBatchSize int32 @@ -765,11 +764,11 @@ func (msgCache *cgMsgCache) manageMessageDeliveryCache() { select { case cMsg := <-msgCache.msgCacheCh: // this means we got a new message - msgCache.utilHandleDeliveredMsg(cMsg, badConns) + msgCache.utilHandleDeliveredMsg(cMsg) case cMsg := <-msgCache.msgCacheRedeliveredCh: msgCache.utilHandleRedeliveredMsg(cMsg) case <-msgCache.redeliveryTicker.C: - msgCache.utilHandleRedeliveryTicker(badConns) + msgCache.utilHandleRedeliveryTicker() // Incase we have a very low throughput destination, we might not // accumulate enough messages. So renew credits irrespective if we have // accumulated some acks. @@ -778,9 +777,9 @@ func (msgCache *cgMsgCache) manageMessageDeliveryCache() { } msgCache.refreshCgConfig(msgCache.maxOutstandingMsgs) case ackID := <-msgCache.nackMsgCh: - msgCache.utilHandleNackMsg(ackID, badConns) + msgCache.utilHandleNackMsg(ackID) case ackID := <-msgCache.ackMsgCh: - msgCache.utilHandleAckMsg(ackID, badConns) + msgCache.utilHandleAckMsg(ackID) case extUUID := <-msgCache.creditRequestCh: // an extent is requesting credits, which means even if we have some // credits to grant, grant it to this extent @@ -853,7 +852,7 @@ func (msgCache *cgMsgCache) getOutstandingMsgs() int32 { return int32(msgCache.countStateDelivered + msgCache.countStateEarlyNACK) } -func (msgCache *cgMsgCache) handleAck(ackID timestampedAckID, lclLg bark.Logger, badConns map[int]int) { +func (msgCache *cgMsgCache) handleAck(ackID timestampedAckID, lclLg bark.Logger) { var badMessage bool cm := msgCache.getState(ackID.AckID) now := common.Now() @@ -917,13 +916,13 @@ func (msgCache *cgMsgCache) handleAck(ackID timestampedAckID, lclLg bark.Logger, } // TODO: don't updateConn if we already had this ACK (stateConsumed); a worker in a bad loop could just ack the same message over and over - msgCache.updateConn(cm.lastConnID, eventACK, badConns) + msgCache.updateConn(cm.lastConnID, eventACK) } msgCache.numAcks++ } -func (msgCache *cgMsgCache) handleNack(ackID timestampedAckID, lclLg bark.Logger, badConns map[int]int) { +func (msgCache *cgMsgCache) handleNack(ackID timestampedAckID, lclLg bark.Logger) { cm := msgCache.getState(ackID.AckID) switch cm.currentState { @@ -953,30 +952,28 @@ func (msgCache *cgMsgCache) handleNack(ackID timestampedAckID, lclLg bark.Logger // the notifier interface will let the appropriate connection know about this. // the connections will take care of throttling based on the number of nacks // received per second. - msgCache.updateConn(cm.lastConnID, eventNACK, badConns) + msgCache.updateConn(cm.lastConnID, eventNACK) } // updateConn is the utility routine to notify the connection // to either throttle up or throttle down. // If we get a NACK on this connection, we ask the connection to -// slow down (throttle up) +// slow down (throttle up), except it's disabled explicitly // If we get an ACK on this connection and we asked it to throttle // earlier, we ask the connection to stop throttling (throttle down!). -func (msgCache *cgMsgCache) updateConn(connID int, event msgEvent, badConns map[int]int) { - val, ok := badConns[connID] +func (msgCache *cgMsgCache) updateConn(connID int, event msgEvent) { if event == eventACK { - // if this connection was in the list of bad connections, - // then make sure to notify the connection asking to throttle - // down and remove this one from the bad connections map - if ok { - msgCache.notifier.Notify(connID, NotifyMsg{notifyType: ThrottleDown}) - delete(badConns, connID) - } + msgCache.notifier.Notify(connID, NotifyMsg{notifyType: ThrottleDown}) } else { + // if NACK throttling is disabled for this cg, then no-op + throttlingDisabled, ok := msgCache.cgCache.cachedCGDesc.Options[common.FlagDisableNackThrottling] + if ok && throttlingDisabled == "true" { // no-op + return + } + // make sure to throttle the appropriate connection and update the // bad connections map to keep track of the number of nacks msgCache.notifier.Notify(connID, NotifyMsg{notifyType: ThrottleUp}) - badConns[connID] = val + 1 } } @@ -1036,7 +1033,7 @@ func newMessageDeliveryCache( return msgCache } -func (msgCache *cgMsgCache) publishToDLQ(cm *cachedMessage, badConns map[int]int) { +func (msgCache *cgMsgCache) publishToDLQ(cm *cachedMessage) { msg := cm.msg msgCache.blockCheckingTimer.Reset(blockCheckingTimeout) select { @@ -1048,7 +1045,7 @@ func (msgCache *cgMsgCache) publishToDLQ(cm *cachedMessage, badConns map[int]int // the notifier interface will let the appropriate connection know about this. // the connections will take care of throttling based on the number of nacks // received per second. - msgCache.updateConn(cm.lastConnID, eventNACK, badConns) + msgCache.updateConn(cm.lastConnID, eventNACK) } func (msgCache *cgMsgCache) startTimer(e msgEvent) { diff --git a/services/outputhost/messagecache_test.go b/services/outputhost/messagecache_test.go index b83b6c11..c820877a 100644 --- a/services/outputhost/messagecache_test.go +++ b/services/outputhost/messagecache_test.go @@ -117,14 +117,14 @@ func (s *MessageCacheSuite) TestTimerQueueCleanupAfterRedelivery() { } cMsg.msg.AckId = common.StringPtr(ackIDStr) - s.msgCache.utilHandleDeliveredMsg(cMsg, nil) + s.msgCache.utilHandleDeliveredMsg(cMsg) ackIDMap[ackIDStr] = struct{}{} ackID++ } time.Sleep(time.Second) - s.msgCache.utilHandleRedeliveryTicker(nil) + s.msgCache.utilHandleRedeliveryTicker() s.Equal(100, len(s.msgRedeliveryCh), "Unexpected message cache redelivery") for i := 0; i < 100; i++ { @@ -134,7 +134,7 @@ func (s *MessageCacheSuite) TestTimerQueueCleanupAfterRedelivery() { delete(ackIDMap, m.GetAckId()) } - s.msgCache.utilHandleRedeliveryTicker(nil) + s.msgCache.utilHandleRedeliveryTicker() s.Equal(0, len(s.msgRedeliveryCh), "Unexpected message cache redelivery") } diff --git a/services/replicator/metadataReconciler.go b/services/replicator/metadataReconciler.go index 1d038a7b..a8dec244 100644 --- a/services/replicator/metadataReconciler.go +++ b/services/replicator/metadataReconciler.go @@ -22,6 +22,7 @@ package replicator import ( "errors" + "reflect" "strings" "sync/atomic" "time" @@ -358,6 +359,7 @@ func (r *metadataReconciler) reconcileCg(localCgs []*shared.ConsumerGroupDescrip IsMultiZone: common.BoolPtr(remoteCg.GetIsMultiZone()), ActiveZone: common.StringPtr(remoteCg.GetActiveZone()), ZoneConfigs: remoteCg.GetZoneConfigs(), + Options: remoteCg.Options, }, ConsumerGroupUUID: common.StringPtr(remoteCg.GetConsumerGroupUUID()), } @@ -418,6 +420,10 @@ func (r *metadataReconciler) compareAndUpdateCg(remoteCg *shared.ConsumerGroupDe updateRequest.ZoneConfigs = remoteCg.GetZoneConfigs() cgUpdated = true } + if !reflect.DeepEqual(localCg.GetOptions(), remoteCg.GetOptions()) { + updateRequest.Options = remoteCg.GetOptions() + cgUpdated = true + } if cgUpdated { logger.Info(`Found cg gets updated in remote but not in local`) diff --git a/tools/common/lib.go b/tools/common/lib.go index debfaeaf..7929192a 100644 --- a/tools/common/lib.go +++ b/tools/common/lib.go @@ -502,6 +502,12 @@ func CreateConsumerGroupSecure( zoneConfigs := getCgZoneConfigs(c, mClient, cliHelper, path) isMultiZone := len(zoneConfigs.GetConfigs()) > 0 + options := make(map[string]string) + disableNackThrottling := strings.ToLower(c.String(common.FlagDisableNackThrottling)) + if disableNackThrottling == "true" { + options[common.FlagDisableNackThrottling] = "true" + } + desc, err := cClient.CreateConsumerGroup(&cherami.CreateConsumerGroupRequest{ DestinationPath: &path, ConsumerGroupName: &name, @@ -513,6 +519,7 @@ func CreateConsumerGroupSecure( OwnerEmail: &ownerEmail, IsMultiZone: &isMultiZone, ZoneConfigs: &zoneConfigs, + Options: options, }) ExitIfError(err) @@ -626,6 +633,7 @@ func UpdateConsumerGroupSecure( OwnerEmail: getIfSetString(c, `owner_email`, &setCount), ActiveZone: getIfSetString(c, `active_zone`, &setCount), ZoneConfigs: getIfSetCgZoneConfig(c, mClient, cliHelper, path, &setCount), + Options: getIfSetOptions(c, &setCount), } if c.IsSet(`status`) { @@ -1081,6 +1089,7 @@ type cgJSONOutputFields struct { IsMultiZone bool `json:"is_multi_zone"` ZoneConfigs []*shared.ConsumerGroupZoneConfig `json:"zone_Configs"` ActiveZone string `json:"active_zone"` + Options map[string]string `json:"options"` } func printCG(cg *shared.ConsumerGroupDescription) { @@ -1099,6 +1108,7 @@ func printCG(cg *shared.ConsumerGroupDescription) { IsMultiZone: cg.GetIsMultiZone(), ZoneConfigs: cg.GetZoneConfigs(), ActiveZone: cg.GetActiveZone(), + Options: cg.GetOptions(), } outputStr, _ := json.Marshal(output) fmt.Fprintln(os.Stdout, string(outputStr)) @@ -1630,6 +1640,21 @@ func getIfSetCgZoneConfig(c *cli.Context, mClient mcli.Client, cliHelper common. return } +func getIfSetOptions(c *cli.Context, setCount *int) (options map[string]string) { + if c.IsSet(common.FlagDisableNackThrottling) { + disableNackThrottling := "false" + if strings.ToLower(c.String(common.FlagDisableNackThrottling)) == "true" { + disableNackThrottling = "true" + } + + options = make(map[string]string) + options[common.FlagDisableNackThrottling] = disableNackThrottling + *setCount++ + return options + } + return +} + type storeClientCache struct { mClient mcli.Client cache map[string]*storehost.StoreClientImpl