diff --git a/services/replicator/replicator.go b/services/replicator/replicator.go index 7b372c0b..ff8735a5 100644 --- a/services/replicator/replicator.go +++ b/services/replicator/replicator.go @@ -39,6 +39,7 @@ import ( dconfig "github.com/uber/cherami-server/common/dconfigclient" mm "github.com/uber/cherami-server/common/metadata" "github.com/uber/cherami-server/common/metrics" + "github.com/uber/cherami-server/common/set" storeStream "github.com/uber/cherami-server/stream" "github.com/uber/cherami-thrift/.generated/go/admin" "github.com/uber/cherami-thrift/.generated/go/metadata" @@ -67,8 +68,7 @@ type ( storehostConn map[string]*outConnection storehostConnMutex sync.RWMutex - knownCgExtents map[string]struct{} - knownCgExtentsMutex sync.RWMutex + knownCgExtents set.Set metadataReconciler MetadataReconciler } @@ -130,7 +130,7 @@ func NewReplicator(serviceName string, sVice common.SCommon, metadataClient meta replicatorclientFactory: replicatorClientFactory, remoteReplicatorConn: make(map[string]*outConnection), storehostConn: make(map[string]*outConnection), - knownCgExtents: make(map[string]struct{}), + knownCgExtents: set.NewConcurrent(0), } r.metaClient = mm.NewMetadataMetricsMgr(metadataClient, r.m3Client, r.logger) @@ -1106,16 +1106,11 @@ func (r *Replicator) SetAckOffset(ctx thrift.Context, request *shared.SetAckOffs }) r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorRequests) - var cgExtentCreated bool - r.knownCgExtentsMutex.RLock() - _, cgExtentCreated = r.knownCgExtents[request.GetExtentUUID()] - r.knownCgExtentsMutex.RUnlock() - - if !cgExtentCreated { + if !r.knownCgExtents.Contains(request.GetExtentUUID()) { // make sure the cg extent is created locally before accepting the SetAckOffset call. // otherwise SetAckOffset will create the cg extent entry with no store uuid or output host uuid // and we may not be able to clean up the entry eventually. - extent, err := r.metaClient.ReadConsumerGroupExtent(nil, &metadata.ReadConsumerGroupExtentRequest{ + _, err := r.metaClient.ReadConsumerGroupExtent(nil, &metadata.ReadConsumerGroupExtentRequest{ ConsumerGroupUUID: common.StringPtr(request.GetConsumerGroupUUID()), ExtentUUID: common.StringPtr(request.GetExtentUUID()), }) @@ -1124,16 +1119,8 @@ func (r *Replicator) SetAckOffset(ctx thrift.Context, request *shared.SetAckOffs r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorFailures) return err } - if len(extent.GetExtent().GetStoreUUIDs()) < 1 { - err = fmt.Errorf(`empty store uuid from cg extent`) - lcllg.Error(`SetAckOffset: empty store uuid from cg extent`) - r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorFailures) - return err - } - r.knownCgExtentsMutex.Lock() - r.knownCgExtents[request.GetExtentUUID()] = struct{}{} - r.knownCgExtentsMutex.Unlock() + r.knownCgExtents.Insert(request.GetExtentUUID()) } err := r.metaClient.SetAckOffset(nil, request) diff --git a/services/replicator/replicator_test.go b/services/replicator/replicator_test.go index 045e0792..3b7afb2a 100644 --- a/services/replicator/replicator_test.go +++ b/services/replicator/replicator_test.go @@ -991,29 +991,6 @@ func (s *ReplicatorSuite) TestSetAckOffsetFailure_ReadExtentFail() { s.mockMeta.AssertExpectations(s.T()) } -func (s *ReplicatorSuite) TestSetAckOffsetFailure_NoStoreUUID() { - repliator, _ := NewReplicator("replicator-test", s.mockService, s.mockMeta, s.mockReplicatorClientFactory, s.cfg) - cgUUID := uuid.New() - extentUUID := uuid.New() - ackLevel := int64(20) - req := &shared.SetAckOffsetRequest{ - ConsumerGroupUUID: common.StringPtr(cgUUID), - ExtentUUID: common.StringPtr(extentUUID), - AckLevelAddress: common.Int64Ptr(ackLevel), - } - - s.mockMeta.On("ReadConsumerGroupExtent", mock.Anything, mock.Anything).Return(&metadata.ReadConsumerGroupExtentResult_{ - Extent: &shared.ConsumerGroupExtent{}, - }, nil).Run(func(args mock.Arguments) { - req := args.Get(1).(*metadata.ReadConsumerGroupExtentRequest) - s.Equal(extentUUID, req.GetExtentUUID()) - s.Equal(cgUUID, req.GetConsumerGroupUUID()) - }) - err := repliator.SetAckOffset(nil, req) - s.Error(err) - s.mockMeta.AssertExpectations(s.T()) -} - func (s *ReplicatorSuite) TestRemoteSetAckOffsetFailed() { repliator, _ := NewReplicator("replicator-test", s.mockService, s.mockMeta, s.mockReplicatorClientFactory, s.cfg) extentUUID := uuid.New()