From 60f78011eef8afc1dffad8157f5a69529c860b68 Mon Sep 17 00:00:00 2001 From: Wei Han Date: Mon, 17 Jul 2017 13:37:55 -0700 Subject: [PATCH 1/3] Replicator: Donot accept SetAckOffset before the cg extent is created --- services/replicator/replicator.go | 21 ++++++++- services/replicator/replicator_test.go | 60 +++++++++++++++++++++++++- 2 files changed, 78 insertions(+), 3 deletions(-) diff --git a/services/replicator/replicator.go b/services/replicator/replicator.go index 7202b929..4b49d198 100644 --- a/services/replicator/replicator.go +++ b/services/replicator/replicator.go @@ -1102,7 +1102,26 @@ func (r *Replicator) SetAckOffset(ctx thrift.Context, request *shared.SetAckOffs }) r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorRequests) - err := r.metaClient.SetAckOffset(nil, request) + // make sure the cg extent is created locally before accepting the SetAckOffset call. + // otherwise SetAckOffset will create the cg extent entry with no store uuid or output host uuid + // and we may not be able to clean up the entry eventually. + extent, err := r.metaClient.ReadConsumerGroupExtent(nil, &metadata.ReadConsumerGroupExtentRequest{ + ConsumerGroupUUID: common.StringPtr(request.GetConsumerGroupUUID()), + ExtentUUID: common.StringPtr(request.GetExtentUUID()), + }) + if err != nil { + lcllg.WithField(common.TagErr, err).Error(`SetAckOffset: Failed to read cg extent locally`) + r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorFailures) + return err + } + if len(extent.GetExtent().GetStoreUUIDs()) < 1 { + err = fmt.Errorf(`empty store uuid from cg extent`) + lcllg.Error(`SetAckOffset: empty store uuid from cg extent`) + r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorFailures) + return err + } + + err = r.metaClient.SetAckOffset(nil, request) if err != nil { lcllg.WithField(common.TagErr, err).Error(`Error calling metadata to set ack offset`) r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorFailures) diff --git a/services/replicator/replicator_test.go b/services/replicator/replicator_test.go index d8a356ed..045e0792 100644 --- a/services/replicator/replicator_test.go +++ b/services/replicator/replicator_test.go @@ -941,13 +941,25 @@ func (s *ReplicatorSuite) TestCreateRemoteConsumerGroupExtentFailure() { func (s *ReplicatorSuite) TestSetAckOffset() { repliator, _ := NewReplicator("replicator-test", s.mockService, s.mockMeta, s.mockReplicatorClientFactory, s.cfg) + cgUUID := uuid.New() extentUUID := uuid.New() + storeUUID := []string{uuid.New(), uuid.New(), uuid.New()} ackLevel := int64(20) req := &shared.SetAckOffsetRequest{ - ExtentUUID: common.StringPtr(extentUUID), - AckLevelAddress: common.Int64Ptr(ackLevel), + ConsumerGroupUUID: common.StringPtr(cgUUID), + ExtentUUID: common.StringPtr(extentUUID), + AckLevelAddress: common.Int64Ptr(ackLevel), } + s.mockMeta.On("ReadConsumerGroupExtent", mock.Anything, mock.Anything).Return(&metadata.ReadConsumerGroupExtentResult_{ + Extent: &shared.ConsumerGroupExtent{ + StoreUUIDs: storeUUID, + }, + }, nil).Run(func(args mock.Arguments) { + req := args.Get(1).(*metadata.ReadConsumerGroupExtentRequest) + s.Equal(extentUUID, req.GetExtentUUID()) + s.Equal(cgUUID, req.GetConsumerGroupUUID()) + }) s.mockMeta.On("SetAckOffset", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) { req := args.Get(1).(*shared.SetAckOffsetRequest) s.Equal(extentUUID, req.GetExtentUUID()) @@ -958,6 +970,50 @@ func (s *ReplicatorSuite) TestSetAckOffset() { s.mockMeta.AssertExpectations(s.T()) } +func (s *ReplicatorSuite) TestSetAckOffsetFailure_ReadExtentFail() { + repliator, _ := NewReplicator("replicator-test", s.mockService, s.mockMeta, s.mockReplicatorClientFactory, s.cfg) + cgUUID := uuid.New() + extentUUID := uuid.New() + ackLevel := int64(20) + req := &shared.SetAckOffsetRequest{ + ConsumerGroupUUID: common.StringPtr(cgUUID), + ExtentUUID: common.StringPtr(extentUUID), + AckLevelAddress: common.Int64Ptr(ackLevel), + } + + s.mockMeta.On("ReadConsumerGroupExtent", mock.Anything, mock.Anything).Return(nil, &shared.InternalServiceError{Message: "test2"}).Run(func(args mock.Arguments) { + req := args.Get(1).(*metadata.ReadConsumerGroupExtentRequest) + s.Equal(extentUUID, req.GetExtentUUID()) + s.Equal(cgUUID, req.GetConsumerGroupUUID()) + }) + err := repliator.SetAckOffset(nil, req) + s.Error(err) + s.mockMeta.AssertExpectations(s.T()) +} + +func (s *ReplicatorSuite) TestSetAckOffsetFailure_NoStoreUUID() { + repliator, _ := NewReplicator("replicator-test", s.mockService, s.mockMeta, s.mockReplicatorClientFactory, s.cfg) + cgUUID := uuid.New() + extentUUID := uuid.New() + ackLevel := int64(20) + req := &shared.SetAckOffsetRequest{ + ConsumerGroupUUID: common.StringPtr(cgUUID), + ExtentUUID: common.StringPtr(extentUUID), + AckLevelAddress: common.Int64Ptr(ackLevel), + } + + s.mockMeta.On("ReadConsumerGroupExtent", mock.Anything, mock.Anything).Return(&metadata.ReadConsumerGroupExtentResult_{ + Extent: &shared.ConsumerGroupExtent{}, + }, nil).Run(func(args mock.Arguments) { + req := args.Get(1).(*metadata.ReadConsumerGroupExtentRequest) + s.Equal(extentUUID, req.GetExtentUUID()) + s.Equal(cgUUID, req.GetConsumerGroupUUID()) + }) + err := repliator.SetAckOffset(nil, req) + s.Error(err) + s.mockMeta.AssertExpectations(s.T()) +} + func (s *ReplicatorSuite) TestRemoteSetAckOffsetFailed() { repliator, _ := NewReplicator("replicator-test", s.mockService, s.mockMeta, s.mockReplicatorClientFactory, s.cfg) extentUUID := uuid.New() From a57a9c6722ac93ea90e81aab6debc111ccbd9e5b Mon Sep 17 00:00:00 2001 From: Wei Han Date: Mon, 17 Jul 2017 13:54:34 -0700 Subject: [PATCH 2/3] cache the cg extent locally to reduce load on cassandra --- services/replicator/replicator.go | 51 ++++++++++++++++++++----------- 1 file changed, 33 insertions(+), 18 deletions(-) diff --git a/services/replicator/replicator.go b/services/replicator/replicator.go index 4b49d198..7b372c0b 100644 --- a/services/replicator/replicator.go +++ b/services/replicator/replicator.go @@ -67,6 +67,9 @@ type ( storehostConn map[string]*outConnection storehostConnMutex sync.RWMutex + knownCgExtents map[string]struct{} + knownCgExtentsMutex sync.RWMutex + metadataReconciler MetadataReconciler } ) @@ -127,6 +130,7 @@ func NewReplicator(serviceName string, sVice common.SCommon, metadataClient meta replicatorclientFactory: replicatorClientFactory, remoteReplicatorConn: make(map[string]*outConnection), storehostConn: make(map[string]*outConnection), + knownCgExtents: make(map[string]struct{}), } r.metaClient = mm.NewMetadataMetricsMgr(metadataClient, r.m3Client, r.logger) @@ -1102,26 +1106,37 @@ func (r *Replicator) SetAckOffset(ctx thrift.Context, request *shared.SetAckOffs }) r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorRequests) - // make sure the cg extent is created locally before accepting the SetAckOffset call. - // otherwise SetAckOffset will create the cg extent entry with no store uuid or output host uuid - // and we may not be able to clean up the entry eventually. - extent, err := r.metaClient.ReadConsumerGroupExtent(nil, &metadata.ReadConsumerGroupExtentRequest{ - ConsumerGroupUUID: common.StringPtr(request.GetConsumerGroupUUID()), - ExtentUUID: common.StringPtr(request.GetExtentUUID()), - }) - if err != nil { - lcllg.WithField(common.TagErr, err).Error(`SetAckOffset: Failed to read cg extent locally`) - r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorFailures) - return err - } - if len(extent.GetExtent().GetStoreUUIDs()) < 1 { - err = fmt.Errorf(`empty store uuid from cg extent`) - lcllg.Error(`SetAckOffset: empty store uuid from cg extent`) - r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorFailures) - return err + var cgExtentCreated bool + r.knownCgExtentsMutex.RLock() + _, cgExtentCreated = r.knownCgExtents[request.GetExtentUUID()] + r.knownCgExtentsMutex.RUnlock() + + if !cgExtentCreated { + // make sure the cg extent is created locally before accepting the SetAckOffset call. + // otherwise SetAckOffset will create the cg extent entry with no store uuid or output host uuid + // and we may not be able to clean up the entry eventually. + extent, err := r.metaClient.ReadConsumerGroupExtent(nil, &metadata.ReadConsumerGroupExtentRequest{ + ConsumerGroupUUID: common.StringPtr(request.GetConsumerGroupUUID()), + ExtentUUID: common.StringPtr(request.GetExtentUUID()), + }) + if err != nil { + lcllg.WithField(common.TagErr, err).Error(`SetAckOffset: Failed to read cg extent locally`) + r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorFailures) + return err + } + if len(extent.GetExtent().GetStoreUUIDs()) < 1 { + err = fmt.Errorf(`empty store uuid from cg extent`) + lcllg.Error(`SetAckOffset: empty store uuid from cg extent`) + r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorFailures) + return err + } + + r.knownCgExtentsMutex.Lock() + r.knownCgExtents[request.GetExtentUUID()] = struct{}{} + r.knownCgExtentsMutex.Unlock() } - err = r.metaClient.SetAckOffset(nil, request) + err := r.metaClient.SetAckOffset(nil, request) if err != nil { lcllg.WithField(common.TagErr, err).Error(`Error calling metadata to set ack offset`) r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorFailures) From 81a507af982c2eae3dcfc4cfbc8c752549823ade Mon Sep 17 00:00:00 2001 From: Wei Han Date: Tue, 18 Jul 2017 16:28:02 -0700 Subject: [PATCH 3/3] address comments --- services/replicator/replicator.go | 25 ++++++------------------- services/replicator/replicator_test.go | 23 ----------------------- 2 files changed, 6 insertions(+), 42 deletions(-) diff --git a/services/replicator/replicator.go b/services/replicator/replicator.go index 7b372c0b..ff8735a5 100644 --- a/services/replicator/replicator.go +++ b/services/replicator/replicator.go @@ -39,6 +39,7 @@ import ( dconfig "github.com/uber/cherami-server/common/dconfigclient" mm "github.com/uber/cherami-server/common/metadata" "github.com/uber/cherami-server/common/metrics" + "github.com/uber/cherami-server/common/set" storeStream "github.com/uber/cherami-server/stream" "github.com/uber/cherami-thrift/.generated/go/admin" "github.com/uber/cherami-thrift/.generated/go/metadata" @@ -67,8 +68,7 @@ type ( storehostConn map[string]*outConnection storehostConnMutex sync.RWMutex - knownCgExtents map[string]struct{} - knownCgExtentsMutex sync.RWMutex + knownCgExtents set.Set metadataReconciler MetadataReconciler } @@ -130,7 +130,7 @@ func NewReplicator(serviceName string, sVice common.SCommon, metadataClient meta replicatorclientFactory: replicatorClientFactory, remoteReplicatorConn: make(map[string]*outConnection), storehostConn: make(map[string]*outConnection), - knownCgExtents: make(map[string]struct{}), + knownCgExtents: set.NewConcurrent(0), } r.metaClient = mm.NewMetadataMetricsMgr(metadataClient, r.m3Client, r.logger) @@ -1106,16 +1106,11 @@ func (r *Replicator) SetAckOffset(ctx thrift.Context, request *shared.SetAckOffs }) r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorRequests) - var cgExtentCreated bool - r.knownCgExtentsMutex.RLock() - _, cgExtentCreated = r.knownCgExtents[request.GetExtentUUID()] - r.knownCgExtentsMutex.RUnlock() - - if !cgExtentCreated { + if !r.knownCgExtents.Contains(request.GetExtentUUID()) { // make sure the cg extent is created locally before accepting the SetAckOffset call. // otherwise SetAckOffset will create the cg extent entry with no store uuid or output host uuid // and we may not be able to clean up the entry eventually. - extent, err := r.metaClient.ReadConsumerGroupExtent(nil, &metadata.ReadConsumerGroupExtentRequest{ + _, err := r.metaClient.ReadConsumerGroupExtent(nil, &metadata.ReadConsumerGroupExtentRequest{ ConsumerGroupUUID: common.StringPtr(request.GetConsumerGroupUUID()), ExtentUUID: common.StringPtr(request.GetExtentUUID()), }) @@ -1124,16 +1119,8 @@ func (r *Replicator) SetAckOffset(ctx thrift.Context, request *shared.SetAckOffs r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorFailures) return err } - if len(extent.GetExtent().GetStoreUUIDs()) < 1 { - err = fmt.Errorf(`empty store uuid from cg extent`) - lcllg.Error(`SetAckOffset: empty store uuid from cg extent`) - r.m3Client.IncCounter(metrics.ReplicatorSetAckOffsetScope, metrics.ReplicatorFailures) - return err - } - r.knownCgExtentsMutex.Lock() - r.knownCgExtents[request.GetExtentUUID()] = struct{}{} - r.knownCgExtentsMutex.Unlock() + r.knownCgExtents.Insert(request.GetExtentUUID()) } err := r.metaClient.SetAckOffset(nil, request) diff --git a/services/replicator/replicator_test.go b/services/replicator/replicator_test.go index 045e0792..3b7afb2a 100644 --- a/services/replicator/replicator_test.go +++ b/services/replicator/replicator_test.go @@ -991,29 +991,6 @@ func (s *ReplicatorSuite) TestSetAckOffsetFailure_ReadExtentFail() { s.mockMeta.AssertExpectations(s.T()) } -func (s *ReplicatorSuite) TestSetAckOffsetFailure_NoStoreUUID() { - repliator, _ := NewReplicator("replicator-test", s.mockService, s.mockMeta, s.mockReplicatorClientFactory, s.cfg) - cgUUID := uuid.New() - extentUUID := uuid.New() - ackLevel := int64(20) - req := &shared.SetAckOffsetRequest{ - ConsumerGroupUUID: common.StringPtr(cgUUID), - ExtentUUID: common.StringPtr(extentUUID), - AckLevelAddress: common.Int64Ptr(ackLevel), - } - - s.mockMeta.On("ReadConsumerGroupExtent", mock.Anything, mock.Anything).Return(&metadata.ReadConsumerGroupExtentResult_{ - Extent: &shared.ConsumerGroupExtent{}, - }, nil).Run(func(args mock.Arguments) { - req := args.Get(1).(*metadata.ReadConsumerGroupExtentRequest) - s.Equal(extentUUID, req.GetExtentUUID()) - s.Equal(cgUUID, req.GetConsumerGroupUUID()) - }) - err := repliator.SetAckOffset(nil, req) - s.Error(err) - s.mockMeta.AssertExpectations(s.T()) -} - func (s *ReplicatorSuite) TestRemoteSetAckOffsetFailed() { repliator, _ := NewReplicator("replicator-test", s.mockService, s.mockMeta, s.mockReplicatorClientFactory, s.cfg) extentUUID := uuid.New()