diff --git a/common/namespace/attr_validator.go b/common/namespace/attr_validator.go index 615798cd19e..0b2be2d4480 100644 --- a/common/namespace/attr_validator.go +++ b/common/namespace/attr_validator.go @@ -63,7 +63,6 @@ func (d *AttrValidatorImpl) ValidateNamespaceConfig(config *persistencespb.Names func (d *AttrValidatorImpl) ValidateNamespaceReplicationConfigForLocalNamespace( replicationConfig *persistencespb.NamespaceReplicationConfig, ) error { - activeCluster := replicationConfig.ActiveClusterName clusters := replicationConfig.Clusters @@ -90,7 +89,6 @@ func (d *AttrValidatorImpl) ValidateNamespaceReplicationConfigForLocalNamespace( func (d *AttrValidatorImpl) ValidateNamespaceReplicationConfigForGlobalNamespace( replicationConfig *persistencespb.NamespaceReplicationConfig, ) error { - activeCluster := replicationConfig.ActiveClusterName clusters := replicationConfig.Clusters @@ -120,7 +118,6 @@ func (d *AttrValidatorImpl) ValidateNamespaceReplicationConfigForGlobalNamespace func (d *AttrValidatorImpl) validateClusterName( clusterName string, ) error { - if info, ok := d.clusterMetadata.GetAllClusterInfo()[clusterName]; !ok || !info.Enabled { return serviceerror.NewInvalidArgument(fmt.Sprintf("Invalid cluster name: %v", clusterName)) } diff --git a/common/namespace/namespace.go b/common/namespace/namespace.go index 45c555f544d..9064d36699d 100644 --- a/common/namespace/namespace.go +++ b/common/namespace/namespace.go @@ -32,11 +32,11 @@ import ( "github.com/google/uuid" enumspb "go.temporal.io/api/enums/v1" namespacepb "go.temporal.io/api/namespace/v1" - replicationpb "go.temporal.io/api/replication/v1" "go.temporal.io/api/serviceerror" "go.temporal.io/server/api/adminservice/v1" persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common" + "go.temporal.io/server/common/namespace/nsreplication" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/util" ) @@ -139,8 +139,8 @@ func FromAdminClientApiResponse(response *adminservice.GetNamespaceResponse) *Na replicationConfig := &persistencespb.NamespaceReplicationConfig{ ActiveClusterName: response.GetReplicationConfig().GetActiveClusterName(), State: response.GetReplicationConfig().GetState(), - Clusters: ConvertClusterReplicationConfigFromProto(response.GetReplicationConfig().GetClusters()), - FailoverHistory: convertFailoverHistoryToPersistenceProto(response.GetFailoverHistory()), + Clusters: nsreplication.ConvertClusterReplicationConfigFromProto(response.GetReplicationConfig().GetClusters()), + FailoverHistory: nsreplication.ConvertFailoverHistoryToPersistenceProto(response.GetFailoverHistory()), } return &Namespace{ info: info, @@ -262,13 +262,6 @@ func (ns *Namespace) IsOnCluster(clusterName string) bool { return false } -// FailoverHistory returns the a copy of failover history for this namespace. -func (ns *Namespace) FailoverHistory() []*replicationpb.FailoverStatus { - return convertFailoverHistoryToReplicationProto( - ns.replicationConfig.GetFailoverHistory(), - ) -} - // ConfigVersion return the namespace config version func (ns *Namespace) ConfigVersion() int64 { return ns.configVersion diff --git a/common/namespace/dlq_message_handler.go b/common/namespace/nsreplication/dlq_message_handler.go similarity index 90% rename from common/namespace/dlq_message_handler.go rename to common/namespace/nsreplication/dlq_message_handler.go index 7ae9f9813ce..a1fd7dbc58d 100644 --- a/common/namespace/dlq_message_handler.go +++ b/common/namespace/nsreplication/dlq_message_handler.go @@ -22,9 +22,9 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination dlq_message_handler_mock.go +//go:generate mockgen -copyright_file ../../../LICENSE -package $GOPACKAGE -source $GOFILE -destination dlq_message_handler_mock.go -package namespace +package nsreplication import ( "context" @@ -45,15 +45,15 @@ type ( } dlqMessageHandlerImpl struct { - replicationHandler ReplicationTaskExecutor + replicationHandler TaskExecutor namespaceReplicationQueue persistence.NamespaceReplicationQueue logger log.Logger } ) -// NewDLQMessageHandler returns a DLQTaskHandler instance +// NewDLQMessageHandler returns a DLQMessageHandler instance func NewDLQMessageHandler( - replicationHandler ReplicationTaskExecutor, + replicationHandler TaskExecutor, namespaceReplicationQueue persistence.NamespaceReplicationQueue, logger log.Logger, ) DLQMessageHandler { @@ -64,7 +64,7 @@ func NewDLQMessageHandler( } } -// ReadMessages reads namespace replication DLQ messages +// Read reads namespace replication DLQ messages func (d *dlqMessageHandlerImpl) Read( ctx context.Context, lastMessageID int64, @@ -86,12 +86,11 @@ func (d *dlqMessageHandlerImpl) Read( ) } -// PurgeMessages purges namespace replication DLQ messages +// Purge purges namespace replication DLQ messages func (d *dlqMessageHandlerImpl) Purge( ctx context.Context, lastMessageID int64, ) error { - ackLevel, err := d.namespaceReplicationQueue.GetDLQAckLevel(ctx) if err != nil { return err @@ -115,14 +114,13 @@ func (d *dlqMessageHandlerImpl) Purge( return nil } -// MergeMessages merges namespace replication DLQ messages +// Merge merges namespace replication DLQ messages func (d *dlqMessageHandlerImpl) Merge( ctx context.Context, lastMessageID int64, pageSize int, pageToken []byte, ) ([]byte, error) { - ackLevel, err := d.namespaceReplicationQueue.GetDLQAckLevel(ctx) if err != nil { return nil, err diff --git a/common/namespace/dlq_message_handler_mock.go b/common/namespace/nsreplication/dlq_message_handler_mock.go similarity index 95% rename from common/namespace/dlq_message_handler_mock.go rename to common/namespace/nsreplication/dlq_message_handler_mock.go index 58a5e2dcd2a..265bf4aa3d6 100644 --- a/common/namespace/dlq_message_handler_mock.go +++ b/common/namespace/nsreplication/dlq_message_handler_mock.go @@ -27,11 +27,11 @@ // // Generated by this command: // -// mockgen -copyright_file ../../LICENSE -package namespace -source dlq_message_handler.go -destination dlq_message_handler_mock.go +// mockgen -copyright_file ../../../LICENSE -package nsreplication -source dlq_message_handler.go -destination dlq_message_handler_mock.go // -// Package namespace is a generated GoMock package. -package namespace +// Package nsreplication is a generated GoMock package. +package nsreplication import ( context "context" diff --git a/common/namespace/dlq_message_handler_test.go b/common/namespace/nsreplication/dlq_message_handler_test.go similarity index 98% rename from common/namespace/dlq_message_handler_test.go rename to common/namespace/nsreplication/dlq_message_handler_test.go index 87a7b478174..7519de77d2d 100644 --- a/common/namespace/dlq_message_handler_test.go +++ b/common/namespace/nsreplication/dlq_message_handler_test.go @@ -22,7 +22,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package namespace +package nsreplication import ( "context" @@ -46,7 +46,7 @@ type ( *require.Assertions controller *gomock.Controller - mockReplicationTaskExecutor *MockReplicationTaskExecutor + mockReplicationTaskExecutor *MockTaskExecutor mockReplicationQueue *persistence.MockNamespaceReplicationQueue dlqMessageHandler *dlqMessageHandlerImpl } @@ -69,7 +69,7 @@ func (s *dlqMessageHandlerSuite) SetupTest() { s.controller = gomock.NewController(s.T()) logger := log.NewTestLogger() - s.mockReplicationTaskExecutor = NewMockReplicationTaskExecutor(s.controller) + s.mockReplicationTaskExecutor = NewMockTaskExecutor(s.controller) s.mockReplicationQueue = persistence.NewMockNamespaceReplicationQueue(s.controller) s.dlqMessageHandler = NewDLQMessageHandler( diff --git a/common/namespace/replication_task_executor.go b/common/namespace/nsreplication/replication_task_executor.go similarity index 91% rename from common/namespace/replication_task_executor.go rename to common/namespace/nsreplication/replication_task_executor.go index 9aaffe98f1f..60ed82b4606 100644 --- a/common/namespace/replication_task_executor.go +++ b/common/namespace/nsreplication/replication_task_executor.go @@ -22,9 +22,9 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination replication_task_handler_mock.go +//go:generate mockgen -copyright_file ../../../LICENSE -package $GOPACKAGE -source $GOFILE -destination replication_task_handler_mock.go -package namespace +package nsreplication import ( "context" @@ -66,26 +66,26 @@ var ( // NOTE: the counterpart of namespace replication transmission logic is in service/frontend package type ( - // ReplicationTaskExecutor is the interface for executing namespace replication tasks - ReplicationTaskExecutor interface { + // TaskExecutor is the interface for executing namespace replication tasks + TaskExecutor interface { Execute(ctx context.Context, task *replicationspb.NamespaceTaskAttributes) error } - namespaceReplicationTaskExecutorImpl struct { + taskExecutorImpl struct { currentCluster string metadataManager persistence.MetadataManager logger log.Logger } ) -// NewReplicationTaskExecutor creates a new instance of namespace replicator -func NewReplicationTaskExecutor( +// NewTaskExecutor creates a new instance of namespace replicator +func NewTaskExecutor( currentCluster string, metadataManagerV2 persistence.MetadataManager, logger log.Logger, -) ReplicationTaskExecutor { +) TaskExecutor { - return &namespaceReplicationTaskExecutorImpl{ + return &taskExecutorImpl{ currentCluster: currentCluster, metadataManager: metadataManagerV2, logger: logger, @@ -93,7 +93,7 @@ func NewReplicationTaskExecutor( } // Execute handles receiving of the namespace replication task -func (h *namespaceReplicationTaskExecutorImpl) Execute( +func (h *taskExecutorImpl) Execute( ctx context.Context, task *replicationspb.NamespaceTaskAttributes, ) error { @@ -123,7 +123,7 @@ func checkClusterIncludedInReplicationConfig(clusterName string, repCfg []*repli return false } -func (h *namespaceReplicationTaskExecutorImpl) shouldProcessTask(ctx context.Context, task *replicationspb.NamespaceTaskAttributes) (bool, error) { +func (h *taskExecutorImpl) shouldProcessTask(ctx context.Context, task *replicationspb.NamespaceTaskAttributes) (bool, error) { resp, err := h.metadataManager.GetNamespace(ctx, &persistence.GetNamespaceRequest{ Name: task.Info.GetName(), }) @@ -148,7 +148,7 @@ func (h *namespaceReplicationTaskExecutorImpl) shouldProcessTask(ctx context.Con } // handleNamespaceCreationReplicationTask handles the namespace creation replication task -func (h *namespaceReplicationTaskExecutorImpl) handleNamespaceCreationReplicationTask( +func (h *taskExecutorImpl) handleNamespaceCreationReplicationTask( ctx context.Context, task *replicationspb.NamespaceTaskAttributes, ) error { @@ -251,7 +251,7 @@ func (h *namespaceReplicationTaskExecutorImpl) handleNamespaceCreationReplicatio } // handleNamespaceUpdateReplicationTask handles the namespace update replication task -func (h *namespaceReplicationTaskExecutorImpl) handleNamespaceUpdateReplicationTask( +func (h *taskExecutorImpl) handleNamespaceUpdateReplicationTask( ctx context.Context, task *replicationspb.NamespaceTaskAttributes, ) error { @@ -318,7 +318,7 @@ func (h *namespaceReplicationTaskExecutorImpl) handleNamespaceUpdateReplicationT request.Namespace.ReplicationConfig.ActiveClusterName = task.ReplicationConfig.GetActiveClusterName() request.Namespace.FailoverVersion = task.GetFailoverVersion() request.Namespace.FailoverNotificationVersion = notificationVersion - request.Namespace.ReplicationConfig.FailoverHistory = convertFailoverHistoryToPersistenceProto(task.GetFailoverHistory()) + request.Namespace.ReplicationConfig.FailoverHistory = ConvertFailoverHistoryToPersistenceProto(task.GetFailoverHistory()) } if !recordUpdated { @@ -328,7 +328,7 @@ func (h *namespaceReplicationTaskExecutorImpl) handleNamespaceUpdateReplicationT return h.metadataManager.UpdateNamespace(ctx, request) } -func (h *namespaceReplicationTaskExecutorImpl) validateNamespaceReplicationTask(task *replicationspb.NamespaceTaskAttributes) error { +func (h *taskExecutorImpl) validateNamespaceReplicationTask(task *replicationspb.NamespaceTaskAttributes) error { if task == nil { return ErrEmptyNamespaceReplicationTask } @@ -356,7 +356,7 @@ func ConvertClusterReplicationConfigFromProto( return output } -func convertFailoverHistoryToPersistenceProto(failoverHistory []*replicationpb.FailoverStatus) []*persistencespb.FailoverStatus { +func ConvertFailoverHistoryToPersistenceProto(failoverHistory []*replicationpb.FailoverStatus) []*persistencespb.FailoverStatus { var res []*persistencespb.FailoverStatus for _, status := range failoverHistory { res = append(res, &persistencespb.FailoverStatus{ @@ -367,7 +367,7 @@ func convertFailoverHistoryToPersistenceProto(failoverHistory []*replicationpb.F return res } -func (h *namespaceReplicationTaskExecutorImpl) validateNamespaceStatus(input enumspb.NamespaceState) error { +func (h *taskExecutorImpl) validateNamespaceStatus(input enumspb.NamespaceState) error { switch input { case enumspb.NAMESPACE_STATE_REGISTERED, enumspb.NAMESPACE_STATE_DEPRECATED: return nil diff --git a/common/namespace/replication_task_executor_test.go b/common/namespace/nsreplication/replication_task_executor_test.go similarity index 99% rename from common/namespace/replication_task_executor_test.go rename to common/namespace/nsreplication/replication_task_executor_test.go index 02cca4c0be6..d3c5fdb1c23 100644 --- a/common/namespace/replication_task_executor_test.go +++ b/common/namespace/nsreplication/replication_task_executor_test.go @@ -22,7 +22,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package namespace +package nsreplication import ( "context" @@ -52,7 +52,7 @@ type ( controller *gomock.Controller mockMetadataMgr *persistence.MockMetadataManager - namespaceReplicator *namespaceReplicationTaskExecutorImpl + namespaceReplicator *taskExecutorImpl } ) @@ -72,11 +72,11 @@ func (s *namespaceReplicationTaskExecutorSuite) SetupTest() { s.controller = gomock.NewController(s.T()) s.mockMetadataMgr = persistence.NewMockMetadataManager(s.controller) logger := log.NewTestLogger() - s.namespaceReplicator = NewReplicationTaskExecutor( + s.namespaceReplicator = NewTaskExecutor( "some random standby cluster name", s.mockMetadataMgr, logger, - ).(*namespaceReplicationTaskExecutorImpl) + ).(*taskExecutorImpl) } func (s *namespaceReplicationTaskExecutorSuite) TearDownTest() { @@ -473,7 +473,7 @@ func (s *namespaceReplicationTaskExecutorSuite) TestExecute_UpdateNamespaceTask_ ReplicationConfig: &persistencespb.NamespaceReplicationConfig{ ActiveClusterName: updateTask.ReplicationConfig.ActiveClusterName, Clusters: []string{updateClusterActive, updateClusterStandby}, - FailoverHistory: convertFailoverHistoryToPersistenceProto(failoverHistory), + FailoverHistory: ConvertFailoverHistoryToPersistenceProto(failoverHistory), }, ConfigVersion: updateConfigVersion, FailoverNotificationVersion: updateFailoverVersion, diff --git a/common/namespace/replication_task_handler_mock.go b/common/namespace/nsreplication/replication_task_handler_mock.go similarity index 61% rename from common/namespace/replication_task_handler_mock.go rename to common/namespace/nsreplication/replication_task_handler_mock.go index 57082eaf976..623b42a4feb 100644 --- a/common/namespace/replication_task_handler_mock.go +++ b/common/namespace/nsreplication/replication_task_handler_mock.go @@ -27,11 +27,11 @@ // // Generated by this command: // -// mockgen -copyright_file ../../LICENSE -package namespace -source replication_task_executor.go -destination replication_task_handler_mock.go +// mockgen -copyright_file ../../../LICENSE -package nsreplication -source replication_task_executor.go -destination replication_task_handler_mock.go // -// Package namespace is a generated GoMock package. -package namespace +// Package nsreplication is a generated GoMock package. +package nsreplication import ( context "context" @@ -41,31 +41,31 @@ import ( gomock "go.uber.org/mock/gomock" ) -// MockReplicationTaskExecutor is a mock of ReplicationTaskExecutor interface. -type MockReplicationTaskExecutor struct { +// MockTaskExecutor is a mock of TaskExecutor interface. +type MockTaskExecutor struct { ctrl *gomock.Controller - recorder *MockReplicationTaskExecutorMockRecorder + recorder *MockTaskExecutorMockRecorder } -// MockReplicationTaskExecutorMockRecorder is the mock recorder for MockReplicationTaskExecutor. -type MockReplicationTaskExecutorMockRecorder struct { - mock *MockReplicationTaskExecutor +// MockTaskExecutorMockRecorder is the mock recorder for MockTaskExecutor. +type MockTaskExecutorMockRecorder struct { + mock *MockTaskExecutor } -// NewMockReplicationTaskExecutor creates a new mock instance. -func NewMockReplicationTaskExecutor(ctrl *gomock.Controller) *MockReplicationTaskExecutor { - mock := &MockReplicationTaskExecutor{ctrl: ctrl} - mock.recorder = &MockReplicationTaskExecutorMockRecorder{mock} +// NewMockTaskExecutor creates a new mock instance. +func NewMockTaskExecutor(ctrl *gomock.Controller) *MockTaskExecutor { + mock := &MockTaskExecutor{ctrl: ctrl} + mock.recorder = &MockTaskExecutorMockRecorder{mock} return mock } // EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockReplicationTaskExecutor) EXPECT() *MockReplicationTaskExecutorMockRecorder { +func (m *MockTaskExecutor) EXPECT() *MockTaskExecutorMockRecorder { return m.recorder } // Execute mocks base method. -func (m *MockReplicationTaskExecutor) Execute(ctx context.Context, task *repication.NamespaceTaskAttributes) error { +func (m *MockTaskExecutor) Execute(ctx context.Context, task *repication.NamespaceTaskAttributes) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Execute", ctx, task) ret0, _ := ret[0].(error) @@ -73,7 +73,7 @@ func (m *MockReplicationTaskExecutor) Execute(ctx context.Context, task *repicat } // Execute indicates an expected call of Execute. -func (mr *MockReplicationTaskExecutorMockRecorder) Execute(ctx, task any) *gomock.Call { +func (mr *MockTaskExecutorMockRecorder) Execute(ctx, task any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Execute", reflect.TypeOf((*MockReplicationTaskExecutor)(nil).Execute), ctx, task) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Execute", reflect.TypeOf((*MockTaskExecutor)(nil).Execute), ctx, task) } diff --git a/common/namespace/transmission_task_handler.go b/common/namespace/nsreplication/transmission_task_handler.go similarity index 94% rename from common/namespace/transmission_task_handler.go rename to common/namespace/nsreplication/transmission_task_handler.go index 3453004371e..e692ee27e1f 100644 --- a/common/namespace/transmission_task_handler.go +++ b/common/namespace/nsreplication/transmission_task_handler.go @@ -22,7 +22,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package namespace +package nsreplication import ( "context" @@ -56,25 +56,25 @@ type ( ) error } - namespaceReplicatorImpl struct { + replicator struct { namespaceReplicationQueue persistence.NamespaceReplicationQueue logger log.Logger } ) -// NewNamespaceReplicator create a new instance of namespace replicator -func NewNamespaceReplicator( +// NewReplicator create a new instance of namespace replicator +func NewReplicator( namespaceReplicationQueue persistence.NamespaceReplicationQueue, logger log.Logger, ) Replicator { - return &namespaceReplicatorImpl{ + return &replicator{ namespaceReplicationQueue: namespaceReplicationQueue, logger: logger, } } // HandleTransmissionTask handle transmission of the namespace replication task -func (namespaceReplicator *namespaceReplicatorImpl) HandleTransmissionTask( +func (r *replicator) HandleTransmissionTask( ctx context.Context, namespaceOperation enumsspb.NamespaceOperation, info *persistencespb.NamespaceInfo, @@ -129,7 +129,7 @@ func (namespaceReplicator *namespaceReplicatorImpl) HandleTransmissionTask( }, } - return namespaceReplicator.namespaceReplicationQueue.Publish( + return r.namespaceReplicationQueue.Publish( ctx, &replicationspb.ReplicationTask{ TaskType: taskType, diff --git a/common/namespace/transmission_task_handler_test.go b/common/namespace/nsreplication/transmission_task_handler_test.go similarity index 99% rename from common/namespace/transmission_task_handler_test.go rename to common/namespace/nsreplication/transmission_task_handler_test.go index a30c41566f5..edb7ebfb790 100644 --- a/common/namespace/transmission_task_handler_test.go +++ b/common/namespace/nsreplication/transmission_task_handler_test.go @@ -22,7 +22,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package namespace +package nsreplication import ( "context" @@ -49,7 +49,7 @@ type ( controller *gomock.Controller - namespaceReplicator *namespaceReplicatorImpl + namespaceReplicator *replicator namespaceReplicationQueue *persistence.MockNamespaceReplicationQueue } ) @@ -69,10 +69,10 @@ func (s *transmissionTaskSuite) TearDownSuite() { func (s *transmissionTaskSuite) SetupTest() { s.controller = gomock.NewController(s.T()) s.namespaceReplicationQueue = persistence.NewMockNamespaceReplicationQueue(s.controller) - s.namespaceReplicator = NewNamespaceReplicator( + s.namespaceReplicator = NewReplicator( s.namespaceReplicationQueue, log.NewTestLogger(), - ).(*namespaceReplicatorImpl) + ).(*replicator) } func (s *transmissionTaskSuite) TearDownTest() { diff --git a/service/frontend/admin_handler.go b/service/frontend/admin_handler.go index 59bbec43610..b982bf6c276 100644 --- a/service/frontend/admin_handler.go +++ b/service/frontend/admin_handler.go @@ -70,6 +70,7 @@ import ( "go.temporal.io/server/common/membership" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/namespace/nsreplication" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/serialization" "go.temporal.io/server/common/persistence/visibility" @@ -108,7 +109,7 @@ type ( numberOfHistoryShards int32 ESClient esclient.Client config *Config - namespaceDLQHandler namespace.DLQMessageHandler + namespaceDLQHandler nsreplication.DLQMessageHandler eventSerializer serialization.Serializer visibilityMgr manager.VisibilityManager persistenceExecutionName string @@ -183,7 +184,7 @@ var ( func NewAdminHandler( args NewAdminHandlerArgs, ) *AdminHandler { - namespaceReplicationTaskExecutor := namespace.NewReplicationTaskExecutor( + namespaceReplicationTaskExecutor := nsreplication.NewTaskExecutor( args.ClusterMetadata.GetCurrentClusterName(), args.PersistenceMetadataManager, args.Logger, @@ -208,7 +209,7 @@ func NewAdminHandler( status: common.DaemonStatusInitialized, numberOfHistoryShards: args.PersistenceConfig.NumHistoryShards, config: args.Config, - namespaceDLQHandler: namespace.NewDLQMessageHandler( + namespaceDLQHandler: nsreplication.NewDLQMessageHandler( namespaceReplicationTaskExecutor, args.NamespaceReplicationQueue, args.Logger, diff --git a/service/frontend/namespace_handler.go b/service/frontend/namespace_handler.go index 46d6cd050bc..92f63e725a8 100644 --- a/service/frontend/namespace_handler.go +++ b/service/frontend/namespace_handler.go @@ -47,6 +47,7 @@ import ( "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/namespace/nsreplication" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/primitives" "go.temporal.io/server/common/primitives/timestamp" @@ -62,7 +63,7 @@ type ( logger log.Logger metadataMgr persistence.MetadataManager clusterMetadata cluster.Metadata - namespaceReplicator namespace.Replicator + namespaceReplicator nsreplication.Replicator namespaceAttrValidator *namespace.AttrValidatorImpl archivalMetadata archiver.ArchivalMetadata archiverProvider provider.ArchiverProvider @@ -90,7 +91,7 @@ func newNamespaceHandler( logger log.Logger, metadataMgr persistence.MetadataManager, clusterMetadata cluster.Metadata, - namespaceReplicator namespace.Replicator, + namespaceReplicator nsreplication.Replicator, archivalMetadata archiver.ArchivalMetadata, archiverProvider provider.ArchiverProvider, timeSource clock.TimeSource, diff --git a/service/frontend/namespace_handler_test.go b/service/frontend/namespace_handler_test.go index 86c019634f8..3cfc6cb0c7e 100644 --- a/service/frontend/namespace_handler_test.go +++ b/service/frontend/namespace_handler_test.go @@ -46,7 +46,7 @@ import ( "go.temporal.io/server/common/config" dc "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" - "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/namespace/nsreplication" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/testing/protoassert" "go.uber.org/mock/gomock" @@ -64,7 +64,7 @@ type ( mockMetadataMgr *persistence.MockMetadataManager mockClusterMetadata *cluster.MockMetadata mockProducer *persistence.MockNamespaceReplicationQueue - mockNamespaceReplicator namespace.Replicator + mockNamespaceReplicator nsreplication.Replicator archivalMetadata archiver.ArchivalMetadata mockArchiverProvider *provider.MockArchiverProvider fakeClock *clock.EventTimeSource @@ -95,7 +95,7 @@ func (s *namespaceHandlerCommonSuite) SetupTest() { s.mockMetadataMgr = persistence.NewMockMetadataManager(s.controller) s.mockClusterMetadata = cluster.NewMockMetadata(s.controller) s.mockProducer = persistence.NewMockNamespaceReplicationQueue(s.controller) - s.mockNamespaceReplicator = namespace.NewNamespaceReplicator(s.mockProducer, logger) + s.mockNamespaceReplicator = nsreplication.NewReplicator(s.mockProducer, logger) s.archivalMetadata = archiver.NewArchivalMetadata( dcCollection, "", @@ -375,7 +375,7 @@ func (s *namespaceHandlerCommonSuite) TestListNamespace() { s.Equal(expectedResult[name].GetConfig().GetVisibilityArchivalUri(), ns.GetConfig().GetVisibilityArchivalUri()) s.Equal(expectedResult[name].GetConfig().GetBadBinaries(), ns.GetConfig().GetBadBinaries()) s.Equal(expectedResult[name].GetReplicationConfig().GetActiveClusterName(), ns.GetReplicationConfig().GetActiveClusterName()) - s.Equal(expectedResult[name].GetReplicationConfig().GetClusters(), namespace.ConvertClusterReplicationConfigFromProto(ns.GetReplicationConfig().GetClusters())) + s.Equal(expectedResult[name].GetReplicationConfig().GetClusters(), nsreplication.ConvertClusterReplicationConfigFromProto(ns.GetReplicationConfig().GetClusters())) s.Equal(expectedResult[name].GetReplicationConfig().GetState(), ns.GetReplicationConfig().GetState()) s.Equal(expectedResult[name].GetFailoverVersion(), ns.GetFailoverVersion()) } diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index 56cd93f50fb..35502585e95 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -70,6 +70,7 @@ import ( "go.temporal.io/server/common/membership" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/namespace/nsreplication" "go.temporal.io/server/common/payload" "go.temporal.io/server/common/payloads" "go.temporal.io/server/common/persistence" @@ -187,7 +188,7 @@ func NewWorkflowHandler( logger, persistenceMetadataManager, clusterMetadata, - namespace.NewNamespaceReplicator(namespaceReplicationQueue, logger), + nsreplication.NewReplicator(namespaceReplicationQueue, logger), archivalMetadata, archiverProvider, timeSource, diff --git a/service/history/queue_factory_base_test.go b/service/history/queue_factory_base_test.go index 63bf3375b9b..1b9fa4631e6 100644 --- a/service/history/queue_factory_base_test.go +++ b/service/history/queue_factory_base_test.go @@ -29,6 +29,8 @@ import ( "testing" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/trace/noop" "go.temporal.io/server/client" "go.temporal.io/server/common/clock" "go.temporal.io/server/common/cluster" @@ -154,6 +156,7 @@ func getModuleDependencies(controller *gomock.Controller, c *moduleTestCase) fx. lazyLoadedOwnershipBasedQuotaScaler, fx.Annotate(serializer, fx.As(new(serialization.Serializer))), fx.Annotate(historyFetcher, fx.As(new(eventhandler.HistoryPaginatedFetcher))), + fx.Annotate(noop.NewTracerProvider(), fx.As(new(trace.TracerProvider))), ) } diff --git a/service/history/replication/eager_namespace_refresher.go b/service/history/replication/eager_namespace_refresher.go index b1b142d2c75..a185d458903 100644 --- a/service/history/replication/eager_namespace_refresher.go +++ b/service/history/replication/eager_namespace_refresher.go @@ -39,6 +39,7 @@ import ( "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/namespace/nsreplication" "go.temporal.io/server/common/persistence" ) @@ -56,7 +57,7 @@ type ( logger log.Logger lock sync.Mutex clientBean client.Bean - replicationTaskExecutor namespace.ReplicationTaskExecutor + replicationTaskExecutor nsreplication.TaskExecutor currentCluster string metricsHandler metrics.Handler } @@ -67,7 +68,7 @@ func NewEagerNamespaceRefresher( namespaceRegistry namespace.Registry, logger log.Logger, clientBean client.Bean, - replicationTaskExecutor namespace.ReplicationTaskExecutor, + replicationTaskExecutor nsreplication.TaskExecutor, currentCluster string, metricsHandler metrics.Handler) EagerNamespaceRefresher { return &eagerNamespaceRefresherImpl{ diff --git a/service/history/replication/eager_namespace_refresher_test.go b/service/history/replication/eager_namespace_refresher_test.go index 8b461640bdc..f3c05f2a090 100644 --- a/service/history/replication/eager_namespace_refresher_test.go +++ b/service/history/replication/eager_namespace_refresher_test.go @@ -46,6 +46,7 @@ import ( "go.temporal.io/server/common/log" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/namespace/nsreplication" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/service/history/shard" @@ -66,7 +67,7 @@ type ( eagerNamespaceRefresher EagerNamespaceRefresher logger log.Logger clientBean *client.MockBean - mockReplicationTaskExecutor *namespace.MockReplicationTaskExecutor + mockReplicationTaskExecutor *nsreplication.MockTaskExecutor currentCluster string mockMetricsHandler metrics.Handler remoteAdminClient *adminservicemock.MockAdminServiceClient @@ -84,7 +85,7 @@ func (s *EagerNamespaceRefresherSuite) SetupTest() { s.remoteAdminClient = adminservicemock.NewMockAdminServiceClient(s.controller) s.clientBean.EXPECT().GetRemoteAdminClient(gomock.Any()).Return(s.remoteAdminClient, nil).AnyTimes() scope := tally.NewTestScope("test", nil) - s.mockReplicationTaskExecutor = namespace.NewMockReplicationTaskExecutor(s.controller) + s.mockReplicationTaskExecutor = nsreplication.NewMockTaskExecutor(s.controller) s.mockMetricsHandler = metrics.NewTallyMetricsHandler(metrics.ClientConfig{}, scope).WithTags( metrics.ServiceNameTag("serviceName")) s.eagerNamespaceRefresher = NewEagerNamespaceRefresher( diff --git a/service/history/replication/fx.go b/service/history/replication/fx.go index 8fefb8b4b4f..2622db62a79 100644 --- a/service/history/replication/fx.go +++ b/service/history/replication/fx.go @@ -38,6 +38,7 @@ import ( "go.temporal.io/server/common/log" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/namespace/nsreplication" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/serialization" ctasks "go.temporal.io/server/common/tasks" @@ -97,7 +98,7 @@ func eagerNamespaceRefresherProvider( namespaceRegistry, logger, clientBean, - namespace.NewReplicationTaskExecutor( + nsreplication.NewTaskExecutor( clusterMetadata.GetCurrentClusterName(), metadataManager, logger, diff --git a/service/worker/fx.go b/service/worker/fx.go index 353307eb5d9..4cf12846a7a 100644 --- a/service/worker/fx.go +++ b/service/worker/fx.go @@ -37,6 +37,7 @@ import ( "go.temporal.io/server/common/membership" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/namespace/nsreplication" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/visibility" "go.temporal.io/server/common/persistence/visibility/manager" @@ -99,8 +100,8 @@ var Module = fx.Options( clusterMetadata cluster.Metadata, metadataManager persistence.MetadataManager, logger log.Logger, - ) namespace.ReplicationTaskExecutor { - return namespace.NewReplicationTaskExecutor( + ) nsreplication.TaskExecutor { + return nsreplication.NewTaskExecutor( clusterMetadata.GetCurrentClusterName(), metadataManager, logger, diff --git a/service/worker/replicator/namespace_replication_message_processor.go b/service/worker/replicator/namespace_replication_message_processor.go index 9c17a79a23a..55dd0cc2c15 100644 --- a/service/worker/replicator/namespace_replication_message_processor.go +++ b/service/worker/replicator/namespace_replication_message_processor.go @@ -43,6 +43,7 @@ import ( "go.temporal.io/server/common/membership" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/namespace/nsreplication" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/rpc" ) @@ -62,7 +63,7 @@ func newNamespaceReplicationMessageProcessor( logger log.Logger, remotePeer adminservice.AdminServiceClient, metricsHandler metrics.Handler, - namespaceTaskExecutor namespace.ReplicationTaskExecutor, + namespaceTaskExecutor nsreplication.TaskExecutor, hostInfo membership.HostInfo, serviceResolver membership.ServiceResolver, namespaceReplicationQueue persistence.NamespaceReplicationQueue, @@ -102,7 +103,7 @@ type ( sourceCluster string logger log.Logger remotePeer adminservice.AdminServiceClient - namespaceTaskExecutor namespace.ReplicationTaskExecutor + namespaceTaskExecutor nsreplication.TaskExecutor metricsHandler metrics.Handler retryPolicy backoff.RetryPolicy lastProcessedMessageID int64 diff --git a/service/worker/replicator/replicator.go b/service/worker/replicator/replicator.go index bebc5bf35c5..b2024ae41bd 100644 --- a/service/worker/replicator/replicator.go +++ b/service/worker/replicator/replicator.go @@ -42,6 +42,7 @@ import ( "go.temporal.io/server/common/membership" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/namespace/nsreplication" "go.temporal.io/server/common/persistence" "go.temporal.io/server/internal/goro" ) @@ -53,7 +54,7 @@ type ( Replicator struct { status int32 clusterMetadata cluster.Metadata - namespaceReplicationTaskExecutor namespace.ReplicationTaskExecutor + namespaceReplicationTaskExecutor nsreplication.TaskExecutor clientBean client.Bean logger log.Logger metricsHandler metrics.Handler @@ -82,7 +83,7 @@ func NewReplicator( hostInfo membership.HostInfo, serviceResolver membership.ServiceResolver, namespaceReplicationQueue persistence.NamespaceReplicationQueue, - namespaceReplicationTaskExecutor namespace.ReplicationTaskExecutor, + namespaceReplicationTaskExecutor nsreplication.TaskExecutor, matchingClient matchingservice.MatchingServiceClient, namespaceRegistry namespace.Registry, ) *Replicator { diff --git a/service/worker/service.go b/service/worker/service.go index 2acc51d1ccc..032d98cc2d8 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -39,6 +39,7 @@ import ( "go.temporal.io/server/common/membership" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/namespace/nsreplication" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/visibility/manager" esclient "go.temporal.io/server/common/persistence/visibility/store/elasticsearch/client" @@ -81,7 +82,7 @@ type ( perNamespaceWorkerManager *perNamespaceWorkerManager scanner *scanner.Scanner matchingClient matchingservice.MatchingServiceClient - namespaceReplicationTaskExecutor namespace.ReplicationTaskExecutor + namespaceReplicationTaskExecutor nsreplication.TaskExecutor } // Config contains all the service config for worker @@ -136,7 +137,7 @@ func NewService( perNamespaceWorkerManager *perNamespaceWorkerManager, visibilityManager manager.VisibilityManager, matchingClient resource.MatchingClient, - namespaceReplicationTaskExecutor namespace.ReplicationTaskExecutor, + namespaceReplicationTaskExecutor nsreplication.TaskExecutor, ) (*Service, error) { workerServiceResolver, err := membershipMonitor.GetResolver(primitives.WorkerService) if err != nil { diff --git a/tests/testcore/onebox.go b/tests/testcore/onebox.go index 11f9f8fca91..a5c034e1fca 100644 --- a/tests/testcore/onebox.go +++ b/tests/testcore/onebox.go @@ -58,6 +58,7 @@ import ( "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/metrics/metricstest" "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/namespace/nsreplication" "go.temporal.io/server/common/persistence" persistenceClient "go.temporal.io/server/common/persistence/client" "go.temporal.io/server/common/persistence/visibility" @@ -119,7 +120,7 @@ type ( esConfig *esclient.Config esClient esclient.Client mockAdminClient map[string]adminservice.AdminServiceClient - namespaceReplicationTaskExecutor namespace.ReplicationTaskExecutor + namespaceReplicationTaskExecutor nsreplication.TaskExecutor tlsConfigProvider *encryption.FixedTLSConfigProvider captureMetricsHandler *metricstest.CaptureHandler hostsByProtocolByService map[transferProtocol]map[primitives.ServiceName]static.Hosts @@ -177,7 +178,7 @@ type ( ESConfig *esclient.Config ESClient esclient.Client MockAdminClient map[string]adminservice.AdminServiceClient - NamespaceReplicationTaskExecutor namespace.ReplicationTaskExecutor + NamespaceReplicationTaskExecutor nsreplication.TaskExecutor DynamicConfigOverrides map[dynamicconfig.Key]interface{} TLSConfigProvider *encryption.FixedTLSConfigProvider CaptureMetricsHandler *metricstest.CaptureHandler diff --git a/tests/testcore/test_cluster.go b/tests/testcore/test_cluster.go index a5b90804566..56393bf7bff 100644 --- a/tests/testcore/test_cluster.go +++ b/tests/testcore/test_cluster.go @@ -53,7 +53,7 @@ import ( "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/membership/static" "go.temporal.io/server/common/metrics/metricstest" - "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/namespace/nsreplication" "go.temporal.io/server/common/persistence" persistencetests "go.temporal.io/server/common/persistence/persistence-tests" "go.temporal.io/server/common/persistence/sql/sqlplugin/mysql" @@ -350,7 +350,7 @@ func newClusterWithPersistenceTestBaseFactory(t *testing.T, options *TestCluster MatchingConfig: options.MatchingConfig, WorkerConfig: options.WorkerConfig, MockAdminClient: options.MockAdminClient, - NamespaceReplicationTaskExecutor: namespace.NewReplicationTaskExecutor(options.ClusterMetadata.CurrentClusterName, testBase.MetadataManager, logger), + NamespaceReplicationTaskExecutor: nsreplication.NewTaskExecutor(options.ClusterMetadata.CurrentClusterName, testBase.MetadataManager, logger), DynamicConfigOverrides: options.DynamicConfigOverrides, TLSConfigProvider: tlsConfigProvider, ServiceFxOptions: options.ServiceFxOptions, diff --git a/tests/xdc/history_replication_dlq_test.go b/tests/xdc/history_replication_dlq_test.go index 837250471b7..add4cc972bf 100644 --- a/tests/xdc/history_replication_dlq_test.go +++ b/tests/xdc/history_replication_dlq_test.go @@ -51,7 +51,7 @@ import ( replicationspb "go.temporal.io/server/api/replication/v1" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" - "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/namespace/nsreplication" "go.temporal.io/server/common/persistence/serialization" "go.temporal.io/server/common/primitives" "go.temporal.io/server/service/history/replication" @@ -115,7 +115,7 @@ type ( } testNamespaceReplicationTaskExecutor struct { *namespaceReplicationTaskExecutorParams - replicationTaskExecutor namespace.ReplicationTaskExecutor + replicationTaskExecutor nsreplication.TaskExecutor } testExecutableTaskConverter struct { *replicationTaskExecutorParams @@ -210,7 +210,7 @@ func (s *historyReplicationDLQSuite) SetupSuite() { ), testcore.WithFxOptionsForService(primitives.WorkerService, fx.Decorate( - func(executor namespace.ReplicationTaskExecutor) namespace.ReplicationTaskExecutor { + func(executor nsreplication.TaskExecutor) nsreplication.TaskExecutor { return &testNamespaceReplicationTaskExecutor{ replicationTaskExecutor: executor, namespaceReplicationTaskExecutorParams: &s.namespaceReplicationTaskExecutors, diff --git a/tests/xdc/history_replication_signals_and_updates_test.go b/tests/xdc/history_replication_signals_and_updates_test.go index e808fb4fbbb..4cae7e48cad 100644 --- a/tests/xdc/history_replication_signals_and_updates_test.go +++ b/tests/xdc/history_replication_signals_and_updates_test.go @@ -48,7 +48,7 @@ import ( replicationspb "go.temporal.io/server/api/replication/v1" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" - "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/namespace/nsreplication" "go.temporal.io/server/common/payloads" "go.temporal.io/server/common/persistence/serialization" "go.temporal.io/server/common/primitives" @@ -70,7 +70,7 @@ type ( // that push their tasks into test-specific (i.e. workflow-specific) buffers. hrsuTestSuite struct { xdcBaseSuite - namespaceTaskExecutor namespace.ReplicationTaskExecutor + namespaceTaskExecutor nsreplication.TaskExecutor // The injection is performed once, at the level of the test suite, but we need the modified executors to be // able to route tasks to test-specific (i.e. workflow-specific) buffers. The following two maps serve that // purpose (each test registers itself in these maps as it starts). Workflow ID and namespace name are both @@ -100,7 +100,7 @@ type ( } // Used to inject a modified namespace replication task executor. hrsuTestNamespaceReplicationTaskExecutor struct { - replicationTaskExecutor namespace.ReplicationTaskExecutor + replicationTaskExecutor nsreplication.TaskExecutor s *hrsuTestSuite } // Used to inject a modified history event replication task executor. @@ -136,7 +136,7 @@ func (s *hrsuTestSuite) SetupSuite() { []string{"cluster1", "cluster2"}, testcore.WithFxOptionsForService(primitives.WorkerService, fx.Decorate( - func(executor namespace.ReplicationTaskExecutor) namespace.ReplicationTaskExecutor { + func(executor nsreplication.TaskExecutor) nsreplication.TaskExecutor { s.namespaceTaskExecutor = executor return &hrsuTestNamespaceReplicationTaskExecutor{ replicationTaskExecutor: executor,